partner_chains_db_sync_data_sources/governed_map/
mod.rs1use crate::DataSourceError::ExpectedDataNotFound;
3use crate::Result;
4use crate::block::BlockDataSourceImpl;
5use crate::db_model::GovernedMapAction;
6use crate::{metrics::McFollowerMetrics, observed_async_trait};
7use db_sync_sqlx::{Asset, BlockNumber};
8use itertools::Itertools;
9use log::warn;
10use partner_chains_plutus_data::governed_map::GovernedMapDatum;
11use sidechain_domain::byte_string::ByteString;
12use sidechain_domain::*;
13use sp_governed_map::{GovernedMapDataSource, MainChainScriptsV1};
14use sqlx::PgPool;
15use std::cmp::{max, min};
16use std::sync::{Arc, Mutex};
17
18#[cfg(test)]
19mod tests;
20
21pub struct GovernedMapDataSourceImpl {
25 pub pool: PgPool,
27 pub metrics_opt: Option<McFollowerMetrics>,
29}
30
31impl GovernedMapDataSourceImpl {
32 pub async fn new(
34 pool: PgPool,
35 metrics_opt: Option<McFollowerMetrics>,
36 ) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
37 crate::db_model::create_idx_tx_out_address(&pool).await?;
38 Ok(Self { pool, metrics_opt })
39 }
40}
41
42observed_async_trait!(
43impl GovernedMapDataSource for GovernedMapDataSourceImpl {
44 async fn get_state_at_block(
45 &self,
46 mc_block: McBlockHash,
47 scripts: MainChainScriptsV1,
48 ) -> std::result::Result<BTreeMap<String, ByteString>, Box<dyn std::error::Error + Send + Sync>>
49 {
50 Ok(get_mappings_entries(&self.pool, mc_block, scripts).await?.into())
51 }
52
53 async fn get_mapping_changes(
54 &self,
55 since_mc_block: Option<McBlockHash>,
56 up_to_mc_block: McBlockHash,
57 scripts: MainChainScriptsV1,
58 ) -> std::result::Result<
59 Vec<(String, Option<ByteString>)>,
60 Box<dyn std::error::Error + Send + Sync>,
61 > {
62 let current_mappings = self.get_state_at_block(up_to_mc_block, scripts.clone()).await?;
63 let Some(since_mc_block) = since_mc_block else {
64 let changes =
65 current_mappings.into_iter().map(|(key, value)| (key, Some(value))).collect();
66 return Ok(changes);
67 };
68 let previous_mappings = self.get_state_at_block(since_mc_block, scripts.clone()).await?;
69 let mut changes = vec![];
70 for (key, value) in current_mappings.iter() {
71 if previous_mappings.get(key) != Some(value) {
72 changes.push((key.clone(), Some(value.clone())));
73 }
74 }
75 for key in previous_mappings.keys() {
76 if !current_mappings.contains_key(key) {
77 changes.push((key.clone(), None));
78 }
79 }
80
81 Ok(changes)
82 }
83}
84);
85
86async fn get_mappings_entries(
87 pool: &PgPool,
88 hash: McBlockHash,
89 scripts: MainChainScriptsV1,
90) -> Result<BTreeMap<String, ByteString>> {
91 let Some(block) = crate::db_model::get_block_by_hash(pool, hash.clone()).await? else {
92 return Err(ExpectedDataNotFound(format!("Block hash: {hash}")));
93 };
94 let entries = crate::db_model::get_datums_at_address_with_token(
95 pool,
96 &scripts.validator_address.into(),
97 block.block_no,
98 Asset::new(scripts.asset_policy_id),
99 )
100 .await?;
101
102 let mut mappings = BTreeMap::new();
103 for entry in entries {
104 match GovernedMapDatum::try_from(entry.datum.0) {
105 Ok(GovernedMapDatum { key, value }) => {
106 mappings.insert(key, value);
107 },
108 Err(err) => warn!("Failed decoding map entry: {err}"),
109 }
110 }
111
112 Ok(mappings)
113}
114
115pub struct GovernedMapDataSourceCachedImpl {
119 pub pool: PgPool,
121 pub metrics_opt: Option<McFollowerMetrics>,
123 cache_size: u16,
125 cache: Arc<Mutex<Cache>>,
127 blocks: Arc<BlockDataSourceImpl>,
129}
130
131impl GovernedMapDataSourceCachedImpl {
132 pub async fn new(
134 pool: PgPool,
135 metrics_opt: Option<McFollowerMetrics>,
136 cache_size: u16,
137 blocks: Arc<BlockDataSourceImpl>,
138 ) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
139 crate::db_model::create_idx_tx_out_address(&pool).await?;
140 let cache = Default::default();
141 Ok(Self { pool, metrics_opt, cache_size, cache, blocks })
142 }
143}
144
145observed_async_trait!(
146impl GovernedMapDataSource for GovernedMapDataSourceCachedImpl {
147 async fn get_state_at_block(
148 &self,
149 mc_block: McBlockHash,
150 scripts: MainChainScriptsV1,
151 ) -> std::result::Result<BTreeMap<String, ByteString>, Box<dyn std::error::Error + Send + Sync>>
152 {
153 Ok(get_mappings_entries(&self.pool, mc_block, scripts).await?.into())
154 }
155
156 async fn get_mapping_changes(
157 &self,
158 since_mc_block: Option<McBlockHash>,
159 up_to_mc_block: McBlockHash,
160 scripts: MainChainScriptsV1,
161 ) -> std::result::Result<
162 Vec<(String, Option<ByteString>)>,
163 Box<dyn std::error::Error + Send + Sync>,
164 > {
165 if let Ok(mut cache) = self.cache.lock() {
166 cache.set_main_chain_scripts(scripts.clone());
167 };
168
169 let since_block_number = match since_mc_block {
170 Some(hash) => Some(
171 crate::db_model::get_block_by_hash(&self.pool, hash.clone())
172 .await?
173 .ok_or_else(|| Box::new(ExpectedDataNotFound(format!("Block hash: {hash}"))))?
174 .block_no,
175 ),
176 None => None,
177 };
178
179 let Some(up_to_block) = self.blocks.get_block_by_hash(up_to_mc_block.clone()).await? else {
180 return Err(Box::new(ExpectedDataNotFound(format!("Block hash: {up_to_mc_block}"))));
181 };
182
183 let up_to_block_number = BlockNumber(up_to_block.number.0);
184
185 if let Some(cached_changes) =
186 self.get_changes_from_cache(since_block_number, up_to_block_number).await?
187 {
188 return Ok(cached_changes);
189 }
190
191 let latest_block_timestamp = self.blocks.get_latest_block_info().await?.timestamp;
192 let latest_stable_block =
193 match self.blocks.get_latest_stable_block_for(latest_block_timestamp.into()).await? {
194 Some(block) => BlockNumber(block.number.0),
195 None => up_to_block_number,
196 };
197 let since_block_plus =
198 BlockNumber(since_block_number.unwrap_or(BlockNumber(0)).0 + self.cache_size as u32);
199 let max_search_block = min(latest_stable_block, max(up_to_block_number, since_block_plus));
200
201 let changes = self
202 .get_changes_in_range_to_cache(since_block_number, max_search_block, scripts)
203 .await?;
204
205 if let Ok(mut cache) = self.cache.lock() {
206 cache.update(changes.clone());
207 }
208
209 Ok(filter_changes_in_range(changes, since_block_number, up_to_block_number))
210 }
211}
212);
213
214impl GovernedMapDataSourceCachedImpl {
215 async fn get_changes_from_cache(
216 &self,
217 since_block: Option<BlockNumber>,
218 up_to_block: BlockNumber,
219 ) -> Result<Option<Vec<(String, Option<ByteString>)>>> {
220 if let Ok(cache) = self.cache.lock() {
221 Ok(cache.get_changes_in_range(since_block, up_to_block))
222 } else {
223 Ok(None)
224 }
225 }
226
227 async fn get_changes_in_range_to_cache(
228 &self,
229 since_block: Option<BlockNumber>,
230 up_to_block: BlockNumber,
231 scripts: MainChainScriptsV1,
232 ) -> Result<Vec<Change>> {
233 let changes = crate::db_model::get_changes(
234 &self.pool,
235 &scripts.validator_address.into(),
236 since_block,
237 up_to_block,
238 Asset::new(scripts.asset_policy_id),
239 )
240 .await?;
241
242 let mut result = Vec::new();
243
244 for change in changes {
245 let GovernedMapDatum { key, value } = match GovernedMapDatum::try_from(change.datum.0) {
246 Ok(datum) => datum,
247 Err(err) => {
248 warn!("Failed decoding map entry: {err}");
249 continue;
250 },
251 };
252 match change.action {
253 GovernedMapAction::Spend => result.push(Change::new(change.block_no, key, None)),
254 GovernedMapAction::Create => {
255 result.push(Change::new(change.block_no, key, Some(value)))
256 },
257 };
258 }
259 Ok(result)
260 }
261}
262
263#[derive(derive_new::new, Clone)]
264struct Change {
265 block_no: BlockNumber,
266 key: String,
267 value: Option<ByteString>,
268}
269
270#[derive(Default)]
271pub(crate) struct Cache {
272 highest_block_number: Option<BlockNumber>,
273 lowest_block_number: Option<BlockNumber>,
274 changes: Vec<Change>,
275 address: Option<MainchainAddress>,
276 policy_id: Option<PolicyId>,
277}
278
279fn filter_changes_in_range(
280 changes: Vec<Change>,
281 since_block: Option<BlockNumber>,
282 up_to_block: BlockNumber,
283) -> Vec<(String, Option<ByteString>)> {
284 changes
285 .into_iter()
286 .filter(|change| {
287 change.block_no.0 <= up_to_block.0
288 && since_block.map(|b| change.block_no.0 > b.0).unwrap_or(true)
289 })
290 .map(|change| (change.key, change.value))
291 .collect()
292}
293
294impl Cache {
295 fn get_changes_in_range(
296 &self,
297 since_block: Option<BlockNumber>,
298 up_to_block: BlockNumber,
299 ) -> Option<Vec<(String, Option<ByteString>)>> {
300 let Some(highest_block_number) = self.highest_block_number else {
301 return None;
302 };
303 let Some(lowest_block_number) = self.lowest_block_number else {
304 return None;
305 };
306
307 if highest_block_number.0 < up_to_block.0
308 || since_block.map(|b| b.0 < lowest_block_number.0).unwrap_or(false)
309 {
310 return None;
311 }
312
313 Some(filter_changes_in_range(self.changes.clone(), since_block, up_to_block))
314 }
315
316 fn update(&mut self, changes: Vec<Change>) {
317 self.changes = changes;
318 let (lowest_block_number, highest_block_number) = self
319 .changes
320 .iter()
321 .minmax_by_key(|change| change.block_no.0)
322 .into_option()
323 .map(|(min, max)| (min.block_no, max.block_no))
324 .unwrap_or((BlockNumber(0), BlockNumber(0)));
325 self.lowest_block_number = Some(lowest_block_number);
326 self.highest_block_number = Some(highest_block_number);
327 }
328
329 fn set_main_chain_scripts(&mut self, scripts: MainChainScriptsV1) {
330 if self.address != Some(scripts.validator_address.clone())
331 || self.policy_id != Some(scripts.asset_policy_id.clone())
332 {
333 self.changes.clear();
334 self.highest_block_number = None;
335 self.lowest_block_number = None;
336 self.address = Some(scripts.validator_address);
337 self.policy_id = Some(scripts.asset_policy_id);
338 }
339 }
340}