partner_chains_db_sync_data_sources/governed_map/
mod.rs1use crate::DataSourceError::ExpectedDataNotFound;
3use crate::Result;
4use crate::block::BlockDataSourceImpl;
5use crate::db_model::{DbSyncConfigurationProvider, GovernedMapAction, TxInConfiguration};
6use crate::metrics::{McFollowerMetrics, observed_async_trait};
7use db_sync_sqlx::{Asset, BlockNumber};
8use itertools::Itertools;
9use log::warn;
10use partner_chains_plutus_data::governed_map::GovernedMapDatum;
11use sidechain_domain::byte_string::ByteString;
12use sidechain_domain::*;
13use sp_governed_map::{GovernedMapDataSource, MainChainScriptsV1};
14use sqlx::PgPool;
15use std::cmp::{max, min};
16use std::sync::{Arc, Mutex};
17
18#[cfg(test)]
19mod tests;
20
21pub struct GovernedMapDataSourceImpl {
25 pub pool: PgPool,
27 pub metrics_opt: Option<McFollowerMetrics>,
29 db_sync_config: DbSyncConfigurationProvider,
31}
32
33impl GovernedMapDataSourceImpl {
34 pub async fn new(
36 pool: PgPool,
37 metrics_opt: Option<McFollowerMetrics>,
38 ) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
39 crate::db_model::create_idx_tx_out_address(&pool).await?;
40 Ok(Self {
41 pool: pool.clone(),
42 metrics_opt,
43 db_sync_config: DbSyncConfigurationProvider::new(pool),
44 })
45 }
46}
47
48observed_async_trait!(
49impl GovernedMapDataSource for GovernedMapDataSourceImpl {
50 async fn get_state_at_block(
51 &self,
52 mc_block: McBlockHash,
53 scripts: MainChainScriptsV1,
54 ) -> std::result::Result<BTreeMap<String, ByteString>, Box<dyn std::error::Error + Send + Sync>>
55 {
56 Ok(get_mappings_entries(
57 &self.pool,
58 mc_block,
59 scripts,
60 self.db_sync_config.get_tx_in_config().await?
61 ).await?.into())
62 }
63
64 async fn get_mapping_changes(
65 &self,
66 since_mc_block: Option<McBlockHash>,
67 up_to_mc_block: McBlockHash,
68 scripts: MainChainScriptsV1,
69 ) -> std::result::Result<
70 Vec<(String, Option<ByteString>)>,
71 Box<dyn std::error::Error + Send + Sync>,
72 > {
73 let current_mappings = self.get_state_at_block(up_to_mc_block, scripts.clone()).await?;
74 let Some(since_mc_block) = since_mc_block else {
75 let changes =
76 current_mappings.into_iter().map(|(key, value)| (key, Some(value))).collect();
77 return Ok(changes);
78 };
79 let previous_mappings = self.get_state_at_block(since_mc_block, scripts.clone()).await?;
80 let mut changes = vec![];
81 for (key, value) in current_mappings.iter() {
82 if previous_mappings.get(key) != Some(value) {
83 changes.push((key.clone(), Some(value.clone())));
84 }
85 }
86 for key in previous_mappings.keys() {
87 if !current_mappings.contains_key(key) {
88 changes.push((key.clone(), None));
89 }
90 }
91
92 Ok(changes)
93 }
94}
95);
96
97async fn get_mappings_entries(
98 pool: &PgPool,
99 hash: McBlockHash,
100 scripts: MainChainScriptsV1,
101 tx_in_config: TxInConfiguration,
102) -> Result<BTreeMap<String, ByteString>> {
103 let Some(block) = crate::db_model::get_block_by_hash(pool, hash.clone()).await? else {
104 return Err(ExpectedDataNotFound(format!("Block hash: {hash}")));
105 };
106 let entries = crate::db_model::get_datums_at_address_with_token(
107 pool,
108 &scripts.validator_address.into(),
109 block.block_no,
110 Asset::new(scripts.asset_policy_id),
111 tx_in_config,
112 )
113 .await?;
114
115 let mut mappings = BTreeMap::new();
116 for entry in entries {
117 match GovernedMapDatum::try_from(entry.datum.0) {
118 Ok(GovernedMapDatum { key, value }) => {
119 mappings.insert(key, value);
120 },
121 Err(err) => warn!("Failed decoding map entry: {err}"),
122 }
123 }
124
125 Ok(mappings)
126}
127
128pub struct GovernedMapDataSourceCachedImpl {
132 pub pool: PgPool,
134 pub metrics_opt: Option<McFollowerMetrics>,
136 cache_size: u16,
138 cache: Arc<Mutex<Cache>>,
140 blocks: Arc<BlockDataSourceImpl>,
142 db_sync_config: DbSyncConfigurationProvider,
144}
145
146impl GovernedMapDataSourceCachedImpl {
147 pub async fn new(
149 pool: PgPool,
150 metrics_opt: Option<McFollowerMetrics>,
151 cache_size: u16,
152 blocks: Arc<BlockDataSourceImpl>,
153 ) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
154 crate::db_model::create_idx_tx_out_address(&pool).await?;
155 let cache = Default::default();
156 Ok(Self {
157 pool: pool.clone(),
158 metrics_opt,
159 cache_size,
160 cache,
161 blocks,
162 db_sync_config: DbSyncConfigurationProvider::new(pool),
163 })
164 }
165}
166
167observed_async_trait!(
168impl GovernedMapDataSource for GovernedMapDataSourceCachedImpl {
169 async fn get_state_at_block(
170 &self,
171 mc_block: McBlockHash,
172 scripts: MainChainScriptsV1,
173 ) -> std::result::Result<BTreeMap<String, ByteString>, Box<dyn std::error::Error + Send + Sync>>
174 {
175 Ok(get_mappings_entries(&self.pool, mc_block, scripts, self.db_sync_config.get_tx_in_config().await?).await?.into())
176 }
177
178 async fn get_mapping_changes(
179 &self,
180 since_mc_block: Option<McBlockHash>,
181 up_to_mc_block: McBlockHash,
182 scripts: MainChainScriptsV1,
183 ) -> std::result::Result<
184 Vec<(String, Option<ByteString>)>,
185 Box<dyn std::error::Error + Send + Sync>,
186 > {
187 if let Ok(mut cache) = self.cache.lock() {
188 cache.set_main_chain_scripts(scripts.clone());
189 };
190
191 let since_block_number = match since_mc_block {
192 Some(hash) => Some(
193 crate::db_model::get_block_by_hash(&self.pool, hash.clone())
194 .await?
195 .ok_or_else(|| Box::new(ExpectedDataNotFound(format!("Block hash: {hash}"))))?
196 .block_no,
197 ),
198 None => None,
199 };
200
201 let Some(up_to_block) = self.blocks.get_block_by_hash(up_to_mc_block.clone()).await? else {
202 return Err(Box::new(ExpectedDataNotFound(format!("Block hash: {up_to_mc_block}"))));
203 };
204
205 let up_to_block_number = BlockNumber(up_to_block.number.0);
206
207 if let Some(cached_changes) =
208 self.get_changes_from_cache(since_block_number, up_to_block_number).await?
209 {
210 return Ok(cached_changes);
211 }
212
213 let latest_block_timestamp = self.blocks.get_latest_block_info().await?.timestamp;
214 let latest_stable_block =
215 match self.blocks.get_latest_stable_block_for(latest_block_timestamp.into()).await? {
216 Some(block) => BlockNumber(block.number.0),
217 None => up_to_block_number,
218 };
219 let since_block_plus = since_block_number.unwrap_or_default().saturating_add(self.cache_size);
220 let max_search_block = min(latest_stable_block, max(up_to_block_number, since_block_plus));
221
222 let changes = self
223 .get_changes_in_range_to_cache(since_block_number, max_search_block, scripts)
224 .await?;
225
226 if let Ok(mut cache) = self.cache.lock() {
227 cache.update(changes.clone());
228 }
229
230 Ok(filter_changes_in_range(changes, since_block_number, up_to_block_number))
231 }
232}
233);
234
235impl GovernedMapDataSourceCachedImpl {
236 async fn get_changes_from_cache(
237 &self,
238 since_block: Option<BlockNumber>,
239 up_to_block: BlockNumber,
240 ) -> Result<Option<Vec<(String, Option<ByteString>)>>> {
241 if let Ok(cache) = self.cache.lock() {
242 Ok(cache.get_changes_in_range(since_block, up_to_block))
243 } else {
244 Ok(None)
245 }
246 }
247
248 async fn get_changes_in_range_to_cache(
249 &self,
250 since_block: Option<BlockNumber>,
251 up_to_block: BlockNumber,
252 scripts: MainChainScriptsV1,
253 ) -> Result<Vec<Change>> {
254 let changes = crate::db_model::get_governed_map_changes(
255 &self.pool,
256 &scripts.validator_address.into(),
257 since_block,
258 up_to_block,
259 Asset::new(scripts.asset_policy_id),
260 self.db_sync_config.get_tx_in_config().await?,
261 )
262 .await?;
263
264 let mut result = Vec::new();
265
266 for change in changes {
267 let GovernedMapDatum { key, value } = match GovernedMapDatum::try_from(change.datum.0) {
268 Ok(datum) => datum,
269 Err(err) => {
270 warn!("Failed decoding map entry: {err}");
271 continue;
272 },
273 };
274 match change.action {
275 GovernedMapAction::Spend => result.push(Change::new(change.block_no, key, None)),
276 GovernedMapAction::Create => {
277 result.push(Change::new(change.block_no, key, Some(value)))
278 },
279 };
280 }
281 Ok(result)
282 }
283}
284
285#[derive(derive_new::new, Clone)]
286struct Change {
287 block_no: BlockNumber,
288 key: String,
289 value: Option<ByteString>,
290}
291
292#[derive(Default)]
293pub(crate) struct Cache {
294 highest_block_number: Option<BlockNumber>,
295 lowest_block_number: Option<BlockNumber>,
296 changes: Vec<Change>,
297 address: Option<MainchainAddress>,
298 policy_id: Option<PolicyId>,
299}
300
301fn filter_changes_in_range(
302 changes: Vec<Change>,
303 since_block: Option<BlockNumber>,
304 up_to_block: BlockNumber,
305) -> Vec<(String, Option<ByteString>)> {
306 changes
307 .into_iter()
308 .filter(|change| {
309 change.block_no <= up_to_block
310 && since_block.map(|b| change.block_no > b).unwrap_or(true)
311 })
312 .map(|change| (change.key, change.value))
313 .collect()
314}
315
316impl Cache {
317 fn get_changes_in_range(
318 &self,
319 since_block: Option<BlockNumber>,
320 up_to_block: BlockNumber,
321 ) -> Option<Vec<(String, Option<ByteString>)>> {
322 let Some(highest_block_number) = self.highest_block_number else {
323 return None;
324 };
325 let Some(lowest_block_number) = self.lowest_block_number else {
326 return None;
327 };
328
329 if highest_block_number < up_to_block
330 || since_block.map(|b| b < lowest_block_number).unwrap_or(false)
331 {
332 return None;
333 }
334
335 Some(filter_changes_in_range(self.changes.clone(), since_block, up_to_block))
336 }
337
338 fn update(&mut self, changes: Vec<Change>) {
339 self.changes = changes;
340 let (lowest_block_number, highest_block_number) = self
341 .changes
342 .iter()
343 .minmax_by_key(|change| change.block_no.0)
344 .into_option()
345 .map(|(min, max)| (min.block_no, max.block_no))
346 .unwrap_or((BlockNumber(0), BlockNumber(0)));
347 self.lowest_block_number = Some(lowest_block_number);
348 self.highest_block_number = Some(highest_block_number);
349 }
350
351 fn set_main_chain_scripts(&mut self, scripts: MainChainScriptsV1) {
352 if self.address != Some(scripts.validator_address.clone())
353 || self.policy_id != Some(scripts.asset_policy_id.clone())
354 {
355 self.changes.clear();
356 self.highest_block_number = None;
357 self.lowest_block_number = None;
358 self.address = Some(scripts.validator_address);
359 self.policy_id = Some(scripts.asset_policy_id);
360 }
361 }
362}