partner_chains_db_sync_data_sources/bridge/
cache.rs1use super::*;
2use crate::BlockDataSourceImpl;
3use crate::db_model::BridgeUtxo;
4use futures::lock::Mutex;
5use sidechain_domain::{MainchainBlock, McBlockHash, UtxoId};
6use std::{cmp::min, collections::HashMap, error::Error, sync::Arc};
7
8pub struct CachedTokenBridgeDataSourceImpl {
19 pool: PgPool,
21 metrics_opt: Option<McFollowerMetrics>,
23 db_sync_config: DbSyncConfigurationProvider,
25 blocks: Arc<BlockDataSourceImpl>,
27 cache: Arc<Mutex<TokenUtxoCache>>,
29 cache_lookahead: u32,
31}
32
33#[derive(Default)]
34pub(crate) struct TokenUtxoCache {
35 mc_scripts: MainChainScripts,
36 start_block: BlockNumber,
37 end_block: BlockNumber,
38 transfers: Vec<BridgeUtxo>,
39 utxo_cache: HashMap<UtxoId, BridgeUtxo>,
40}
41
42impl TokenUtxoCache {
43 pub(crate) fn new() -> Self {
44 Self::default()
45 }
46
47 pub(crate) fn set_mc_scripts(&mut self, mc_scripts: MainChainScripts) {
48 if self.mc_scripts != mc_scripts {
49 self.mc_scripts = mc_scripts;
50 self.transfers = vec![];
51 self.start_block = BlockNumber(0);
52 self.end_block = BlockNumber(0);
53 }
54 }
55
56 pub(crate) fn set_cached_transfers(
57 &mut self,
58 start_block: BlockNumber,
59 end_block: BlockNumber,
60 transfers: Vec<BridgeUtxo>,
61 ) {
62 self.start_block = start_block;
63 self.end_block = end_block;
64 self.utxo_cache = transfers.iter().map(|utxo| (utxo.utxo_id(), utxo.clone())).collect();
65 self.transfers = transfers;
66 }
67
68 pub(crate) fn serve_from_cache(
69 &self,
70 checkpoint: &ResolvedBridgeDataCheckpoint,
71 to_block: BlockNumber,
72 max_transfers: u32,
73 ) -> Option<Vec<BridgeUtxo>> {
74 if self.end_block < to_block {
75 return None;
76 }
77
78 let skip_pred: Box<dyn FnMut(&&BridgeUtxo) -> bool> = match checkpoint {
79 ResolvedBridgeDataCheckpoint::Block { number }
80 if self.start_block <= number.saturating_add(1u32) =>
81 {
82 Box::new(move |utxo| *number >= utxo.block_number)
83 },
84 ResolvedBridgeDataCheckpoint::Utxo { block_number, tx_ix, tx_out_ix }
85 if self.start_block <= *block_number =>
86 {
87 Box::new(move |utxo| utxo.ordering_key() <= (*block_number, *tx_ix, *tx_out_ix))
88 },
89 _ => return None,
90 };
91
92 Some(
93 self.transfers
94 .iter()
95 .skip_while(skip_pred)
96 .take_while(|utxo| to_block.0 >= utxo.block_number.0)
97 .take(max_transfers as usize)
98 .cloned()
99 .collect(),
100 )
101 }
102
103 pub(crate) fn try_resolve_checkpoint_from_cache(
104 &self,
105 utxo_id: &UtxoId,
106 ) -> Option<ResolvedBridgeDataCheckpoint> {
107 let BridgeUtxo { block_number, tx_ix, utxo_ix, .. } =
108 self.utxo_cache.get(utxo_id).cloned()?;
109
110 Some(ResolvedBridgeDataCheckpoint::Utxo { block_number, tx_ix, tx_out_ix: utxo_ix })
111 }
112}
113
114observed_async_trait!(
115 impl<RecipientAddress> TokenBridgeDataSource<RecipientAddress> for CachedTokenBridgeDataSourceImpl
116 where
117 RecipientAddress: Debug,
118 RecipientAddress: (for<'a> TryFrom<&'a [u8]>),
119 {
120 async fn get_transfers(
121 &self,
122 main_chain_scripts: MainChainScripts,
123 data_checkpoint: BridgeDataCheckpoint,
124 max_transfers: u32,
125 current_mc_block_hash: McBlockHash,
126 ) -> Result<
127 (Vec<BridgeTransferV1<RecipientAddress>>, BridgeDataCheckpoint),
128 Box<dyn std::error::Error + Send + Sync>,
129 > {
130 self.set_cache_mc_scripts(main_chain_scripts.clone()).await;
131
132 let to_block = self.get_block_by_hash(¤t_mc_block_hash).await?.number.into();
133
134 let data_checkpoint = self.resolve_data_checkpoint(&data_checkpoint).await?;
135
136 let utxos =
137 match self.try_serve_from_cache(&data_checkpoint, to_block, max_transfers).await {
138 Some(utxos) => utxos,
139 None => {
140 self.fill_cache(main_chain_scripts, &data_checkpoint, to_block).await?;
141 self.try_serve_from_cache(&data_checkpoint, to_block, max_transfers)
142 .await
143 .ok_or("Data should be present in cache after filling cache succeeded")?
144 },
145 };
146
147 let new_checkpoint = match utxos.last() {
148 Some(utxo) if (utxos.len() as u32) >= max_transfers => {
149 BridgeDataCheckpoint::Utxo(utxo.utxo_id())
150 },
151 _ => BridgeDataCheckpoint::Block(to_block.into()),
152 };
153
154 let transfers = utxos.into_iter().flat_map(utxo_to_transfer).collect();
155
156 Ok((transfers, new_checkpoint))
157 }
158 }
159);
160
161impl CachedTokenBridgeDataSourceImpl {
162 pub fn new(
164 pool: PgPool,
165 metrics_opt: Option<McFollowerMetrics>,
166 blocks: Arc<BlockDataSourceImpl>,
167 cache_lookahead: u32,
168 ) -> Self {
169 Self {
170 db_sync_config: DbSyncConfigurationProvider::new(pool.clone()),
171 pool,
172 metrics_opt,
173 blocks,
174 cache: Arc::new(Mutex::new(TokenUtxoCache::new())),
175 cache_lookahead,
176 }
177 }
178
179 async fn set_cache_mc_scripts(&self, main_chain_scripts: MainChainScripts) {
180 let mut cache = self.cache.lock().await;
181 cache.set_mc_scripts(main_chain_scripts.clone());
182 }
183
184 async fn try_serve_from_cache(
185 &self,
186 data_checkpoint: &ResolvedBridgeDataCheckpoint,
187 to_block: BlockNumber,
188 max_transfers: u32,
189 ) -> Option<Vec<BridgeUtxo>> {
190 let cache = self.cache.lock().await;
191 cache.serve_from_cache(data_checkpoint, to_block, max_transfers)
192 }
193
194 async fn fill_cache(
195 &self,
196 main_chain_scripts: MainChainScripts,
197 data_checkpoint: &ResolvedBridgeDataCheckpoint,
198 to_block: BlockNumber,
199 ) -> Result<(), Box<dyn Error + Send + Sync>> {
200 let from_block = data_checkpoint.get_block_number();
201
202 let effective_data_checkpoint =
204 ResolvedBridgeDataCheckpoint::Block { number: from_block.saturating_sub(1u32) };
205
206 let latest_block = self.get_latest_stable_block().await?.unwrap_or(to_block);
207
208 let to_block: BlockNumber =
209 min(to_block.saturating_add(self.cache_lookahead), latest_block);
210
211 let utxos = get_bridge_utxos_tx(
212 self.db_sync_config.get_tx_in_config().await?,
213 &self.pool,
214 &main_chain_scripts.illiquid_circulation_supply_validator_address.clone().into(),
215 main_chain_scripts.asset_id().into(),
216 effective_data_checkpoint,
217 to_block.into(),
218 None,
219 )
220 .await?;
221
222 self.set_cached_transfers(from_block, to_block, utxos).await;
223
224 Ok(())
225 }
226
227 async fn set_cached_transfers(
228 &self,
229 start_block: BlockNumber,
230 end_block: BlockNumber,
231 utxos: Vec<BridgeUtxo>,
232 ) {
233 let mut cache = self.cache.lock().await;
234 cache.set_cached_transfers(start_block, end_block, utxos);
235 }
236
237 async fn get_latest_stable_block(
238 &self,
239 ) -> Result<Option<BlockNumber>, Box<dyn Error + Send + Sync>> {
240 let latest_block_timestamp = self.blocks.get_latest_block_info().await?.timestamp;
241 Ok(self
242 .blocks
243 .get_latest_stable_block_for(latest_block_timestamp.into())
244 .await?
245 .map(|block| block.number.into()))
246 }
247
248 async fn resolve_checkpoint_for_utxo(
249 &self,
250 utxo_id: &UtxoId,
251 ) -> Result<ResolvedBridgeDataCheckpoint, Box<dyn Error + Send + Sync>> {
252 let TxBlockInfo { block_number, tx_ix } =
253 get_block_info_for_utxo(&self.pool, utxo_id.tx_hash.into())
254 .await?
255 .ok_or(format!("Could not find block info for utxo: {utxo_id:?}"))?;
256 Ok(ResolvedBridgeDataCheckpoint::Utxo {
257 block_number,
258 tx_ix,
259 tx_out_ix: utxo_id.index.into(),
260 })
261 }
262
263 async fn resolve_data_checkpoint(
264 &self,
265 data_checkpoint: &BridgeDataCheckpoint,
266 ) -> Result<ResolvedBridgeDataCheckpoint, Box<dyn Error + Send + Sync>> {
267 match data_checkpoint {
268 BridgeDataCheckpoint::Block(number) => {
269 Ok(ResolvedBridgeDataCheckpoint::Block { number: (*number).into() })
270 },
271 BridgeDataCheckpoint::Utxo(utxo) => {
272 match self.cache.lock().await.try_resolve_checkpoint_from_cache(&utxo) {
273 Some(checkpoint) => Ok(checkpoint),
274 None => self.resolve_checkpoint_for_utxo(&utxo).await,
275 }
276 },
277 }
278 }
279
280 async fn get_block_by_hash(
281 &self,
282 mc_block_hash: &McBlockHash,
283 ) -> Result<MainchainBlock, Box<dyn Error + Send + Sync>> {
284 Ok(self
285 .blocks
286 .get_block_by_hash(mc_block_hash.clone())
287 .await?
288 .ok_or(format!("Could not find block for hash {mc_block_hash:?}"))?)
289 }
290}