partner_chains_db_sync_data_sources/governed_map/
mod.rs1use crate::DataSourceError::ExpectedDataNotFound;
3use crate::Result;
4use crate::block::BlockDataSourceImpl;
5use crate::db_model::{DbSyncConfigurationProvider, GovernedMapAction, TxInConfiguration};
6use crate::{metrics::McFollowerMetrics, observed_async_trait};
7use db_sync_sqlx::{Asset, BlockNumber};
8use itertools::Itertools;
9use log::warn;
10use partner_chains_plutus_data::governed_map::GovernedMapDatum;
11use sidechain_domain::byte_string::ByteString;
12use sidechain_domain::*;
13use sp_governed_map::{GovernedMapDataSource, MainChainScriptsV1};
14use sqlx::PgPool;
15use std::cmp::{max, min};
16use std::sync::{Arc, Mutex};
17
18#[cfg(test)]
19mod tests;
20
21pub struct GovernedMapDataSourceImpl {
25 pub pool: PgPool,
27 pub metrics_opt: Option<McFollowerMetrics>,
29 db_sync_config: DbSyncConfigurationProvider,
31}
32
33impl GovernedMapDataSourceImpl {
34 pub async fn new(
36 pool: PgPool,
37 metrics_opt: Option<McFollowerMetrics>,
38 ) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
39 crate::db_model::create_idx_tx_out_address(&pool).await?;
40 Ok(Self {
41 pool: pool.clone(),
42 metrics_opt,
43 db_sync_config: DbSyncConfigurationProvider::new(pool),
44 })
45 }
46}
47
48observed_async_trait!(
49impl GovernedMapDataSource for GovernedMapDataSourceImpl {
50 async fn get_state_at_block(
51 &self,
52 mc_block: McBlockHash,
53 scripts: MainChainScriptsV1,
54 ) -> std::result::Result<BTreeMap<String, ByteString>, Box<dyn std::error::Error + Send + Sync>>
55 {
56 Ok(get_mappings_entries(
57 &self.pool,
58 mc_block,
59 scripts,
60 self.db_sync_config.get_tx_in_config().await?
61 ).await?.into())
62 }
63
64 async fn get_mapping_changes(
65 &self,
66 since_mc_block: Option<McBlockHash>,
67 up_to_mc_block: McBlockHash,
68 scripts: MainChainScriptsV1,
69 ) -> std::result::Result<
70 Vec<(String, Option<ByteString>)>,
71 Box<dyn std::error::Error + Send + Sync>,
72 > {
73 let current_mappings = self.get_state_at_block(up_to_mc_block, scripts.clone()).await?;
74 let Some(since_mc_block) = since_mc_block else {
75 let changes =
76 current_mappings.into_iter().map(|(key, value)| (key, Some(value))).collect();
77 return Ok(changes);
78 };
79 let previous_mappings = self.get_state_at_block(since_mc_block, scripts.clone()).await?;
80 let mut changes = vec![];
81 for (key, value) in current_mappings.iter() {
82 if previous_mappings.get(key) != Some(value) {
83 changes.push((key.clone(), Some(value.clone())));
84 }
85 }
86 for key in previous_mappings.keys() {
87 if !current_mappings.contains_key(key) {
88 changes.push((key.clone(), None));
89 }
90 }
91
92 Ok(changes)
93 }
94}
95);
96
97async fn get_mappings_entries(
98 pool: &PgPool,
99 hash: McBlockHash,
100 scripts: MainChainScriptsV1,
101 tx_in_config: TxInConfiguration,
102) -> Result<BTreeMap<String, ByteString>> {
103 let Some(block) = crate::db_model::get_block_by_hash(pool, hash.clone()).await? else {
104 return Err(ExpectedDataNotFound(format!("Block hash: {hash}")));
105 };
106 let entries = crate::db_model::get_datums_at_address_with_token(
107 pool,
108 &scripts.validator_address.into(),
109 block.block_no,
110 Asset::new(scripts.asset_policy_id),
111 tx_in_config,
112 )
113 .await?;
114
115 let mut mappings = BTreeMap::new();
116 for entry in entries {
117 match GovernedMapDatum::try_from(entry.datum.0) {
118 Ok(GovernedMapDatum { key, value }) => {
119 mappings.insert(key, value);
120 },
121 Err(err) => warn!("Failed decoding map entry: {err}"),
122 }
123 }
124
125 Ok(mappings)
126}
127
128pub struct GovernedMapDataSourceCachedImpl {
132 pub pool: PgPool,
134 pub metrics_opt: Option<McFollowerMetrics>,
136 cache_size: u16,
138 cache: Arc<Mutex<Cache>>,
140 blocks: Arc<BlockDataSourceImpl>,
142 db_sync_config: DbSyncConfigurationProvider,
144}
145
146impl GovernedMapDataSourceCachedImpl {
147 pub async fn new(
149 pool: PgPool,
150 metrics_opt: Option<McFollowerMetrics>,
151 cache_size: u16,
152 blocks: Arc<BlockDataSourceImpl>,
153 ) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
154 crate::db_model::create_idx_tx_out_address(&pool).await?;
155 let cache = Default::default();
156 Ok(Self {
157 pool: pool.clone(),
158 metrics_opt,
159 cache_size,
160 cache,
161 blocks,
162 db_sync_config: DbSyncConfigurationProvider::new(pool),
163 })
164 }
165}
166
167observed_async_trait!(
168impl GovernedMapDataSource for GovernedMapDataSourceCachedImpl {
169 async fn get_state_at_block(
170 &self,
171 mc_block: McBlockHash,
172 scripts: MainChainScriptsV1,
173 ) -> std::result::Result<BTreeMap<String, ByteString>, Box<dyn std::error::Error + Send + Sync>>
174 {
175 Ok(get_mappings_entries(&self.pool, mc_block, scripts, self.db_sync_config.get_tx_in_config().await?).await?.into())
176 }
177
178 async fn get_mapping_changes(
179 &self,
180 since_mc_block: Option<McBlockHash>,
181 up_to_mc_block: McBlockHash,
182 scripts: MainChainScriptsV1,
183 ) -> std::result::Result<
184 Vec<(String, Option<ByteString>)>,
185 Box<dyn std::error::Error + Send + Sync>,
186 > {
187 if let Ok(mut cache) = self.cache.lock() {
188 cache.set_main_chain_scripts(scripts.clone());
189 };
190
191 let since_block_number = match since_mc_block {
192 Some(hash) => Some(
193 crate::db_model::get_block_by_hash(&self.pool, hash.clone())
194 .await?
195 .ok_or_else(|| Box::new(ExpectedDataNotFound(format!("Block hash: {hash}"))))?
196 .block_no,
197 ),
198 None => None,
199 };
200
201 let Some(up_to_block) = self.blocks.get_block_by_hash(up_to_mc_block.clone()).await? else {
202 return Err(Box::new(ExpectedDataNotFound(format!("Block hash: {up_to_mc_block}"))));
203 };
204
205 let up_to_block_number = BlockNumber(up_to_block.number.0);
206
207 if let Some(cached_changes) =
208 self.get_changes_from_cache(since_block_number, up_to_block_number).await?
209 {
210 return Ok(cached_changes);
211 }
212
213 let latest_block_timestamp = self.blocks.get_latest_block_info().await?.timestamp;
214 let latest_stable_block =
215 match self.blocks.get_latest_stable_block_for(latest_block_timestamp.into()).await? {
216 Some(block) => BlockNumber(block.number.0),
217 None => up_to_block_number,
218 };
219 let since_block_plus =
220 BlockNumber(since_block_number.unwrap_or(BlockNumber(0)).0 + self.cache_size as u32);
221 let max_search_block = min(latest_stable_block, max(up_to_block_number, since_block_plus));
222
223 let changes = self
224 .get_changes_in_range_to_cache(since_block_number, max_search_block, scripts)
225 .await?;
226
227 if let Ok(mut cache) = self.cache.lock() {
228 cache.update(changes.clone());
229 }
230
231 Ok(filter_changes_in_range(changes, since_block_number, up_to_block_number))
232 }
233}
234);
235
236impl GovernedMapDataSourceCachedImpl {
237 async fn get_changes_from_cache(
238 &self,
239 since_block: Option<BlockNumber>,
240 up_to_block: BlockNumber,
241 ) -> Result<Option<Vec<(String, Option<ByteString>)>>> {
242 if let Ok(cache) = self.cache.lock() {
243 Ok(cache.get_changes_in_range(since_block, up_to_block))
244 } else {
245 Ok(None)
246 }
247 }
248
249 async fn get_changes_in_range_to_cache(
250 &self,
251 since_block: Option<BlockNumber>,
252 up_to_block: BlockNumber,
253 scripts: MainChainScriptsV1,
254 ) -> Result<Vec<Change>> {
255 let changes = crate::db_model::get_governed_map_changes(
256 &self.pool,
257 &scripts.validator_address.into(),
258 since_block,
259 up_to_block,
260 Asset::new(scripts.asset_policy_id),
261 self.db_sync_config.get_tx_in_config().await?,
262 )
263 .await?;
264
265 let mut result = Vec::new();
266
267 for change in changes {
268 let GovernedMapDatum { key, value } = match GovernedMapDatum::try_from(change.datum.0) {
269 Ok(datum) => datum,
270 Err(err) => {
271 warn!("Failed decoding map entry: {err}");
272 continue;
273 },
274 };
275 match change.action {
276 GovernedMapAction::Spend => result.push(Change::new(change.block_no, key, None)),
277 GovernedMapAction::Create => {
278 result.push(Change::new(change.block_no, key, Some(value)))
279 },
280 };
281 }
282 Ok(result)
283 }
284}
285
286#[derive(derive_new::new, Clone)]
287struct Change {
288 block_no: BlockNumber,
289 key: String,
290 value: Option<ByteString>,
291}
292
293#[derive(Default)]
294pub(crate) struct Cache {
295 highest_block_number: Option<BlockNumber>,
296 lowest_block_number: Option<BlockNumber>,
297 changes: Vec<Change>,
298 address: Option<MainchainAddress>,
299 policy_id: Option<PolicyId>,
300}
301
302fn filter_changes_in_range(
303 changes: Vec<Change>,
304 since_block: Option<BlockNumber>,
305 up_to_block: BlockNumber,
306) -> Vec<(String, Option<ByteString>)> {
307 changes
308 .into_iter()
309 .filter(|change| {
310 change.block_no.0 <= up_to_block.0
311 && since_block.map(|b| change.block_no.0 > b.0).unwrap_or(true)
312 })
313 .map(|change| (change.key, change.value))
314 .collect()
315}
316
317impl Cache {
318 fn get_changes_in_range(
319 &self,
320 since_block: Option<BlockNumber>,
321 up_to_block: BlockNumber,
322 ) -> Option<Vec<(String, Option<ByteString>)>> {
323 let Some(highest_block_number) = self.highest_block_number else {
324 return None;
325 };
326 let Some(lowest_block_number) = self.lowest_block_number else {
327 return None;
328 };
329
330 if highest_block_number.0 < up_to_block.0
331 || since_block.map(|b| b.0 < lowest_block_number.0).unwrap_or(false)
332 {
333 return None;
334 }
335
336 Some(filter_changes_in_range(self.changes.clone(), since_block, up_to_block))
337 }
338
339 fn update(&mut self, changes: Vec<Change>) {
340 self.changes = changes;
341 let (lowest_block_number, highest_block_number) = self
342 .changes
343 .iter()
344 .minmax_by_key(|change| change.block_no.0)
345 .into_option()
346 .map(|(min, max)| (min.block_no, max.block_no))
347 .unwrap_or((BlockNumber(0), BlockNumber(0)));
348 self.lowest_block_number = Some(lowest_block_number);
349 self.highest_block_number = Some(highest_block_number);
350 }
351
352 fn set_main_chain_scripts(&mut self, scripts: MainChainScriptsV1) {
353 if self.address != Some(scripts.validator_address.clone())
354 || self.policy_id != Some(scripts.asset_policy_id.clone())
355 {
356 self.changes.clear();
357 self.highest_block_number = None;
358 self.lowest_block_number = None;
359 self.address = Some(scripts.validator_address);
360 self.policy_id = Some(scripts.asset_policy_id);
361 }
362 }
363}