partner_chains_db_sync_data_sources/bridge/
cache.rs

1use super::*;
2use crate::BlockDataSourceImpl;
3use crate::db_model::BridgeUtxo;
4use futures::lock::Mutex;
5use sidechain_domain::{MainchainBlock, McBlockHash, UtxoId};
6use std::{cmp::min, collections::HashMap, error::Error, sync::Arc};
7
8/// Bridge transfer data source with block range-based caching
9///
10/// This data source caches utxos in some range [from_block, to_block] (inclusive) and serves
11/// queries from the cache. In case of a cache miss, the cache is first replaced before serving
12/// data. The cache is filled with utxos in range:
13///     [lower_query_bound, min(upper_query_bound + cache_lookahead, current_stable_tip)]
14///
15/// In case of queries where the lower bound is a UTXO, all UTXOs from the containing
16/// block are stored. Technically servable case where the lower bound UTXO is the last one in its
17/// block but the block is not stored, is treated as a cache miss.
18pub struct CachedTokenBridgeDataSourceImpl {
19	/// Postgres connection pool
20	pool: PgPool,
21	/// Prometheus metrics client
22	metrics_opt: Option<McFollowerMetrics>,
23	/// Configuration used by Db-Sync
24	db_sync_config: DbSyncConfigurationProvider,
25	/// [BlockDataSourceImpl] instance shared with other data sources for cache reuse.
26	blocks: Arc<BlockDataSourceImpl>,
27	/// Internal data cache
28	cache: Arc<Mutex<TokenUtxoCache>>,
29	/// Number of additional blocks that should be loaded into cache when refreshing
30	cache_lookahead: u32,
31}
32
33#[derive(Default)]
34pub(crate) struct TokenUtxoCache {
35	mc_scripts: MainChainScripts,
36	start_block: BlockNumber,
37	end_block: BlockNumber,
38	transfers: Vec<BridgeUtxo>,
39	utxo_cache: HashMap<UtxoId, BridgeUtxo>,
40}
41
42impl TokenUtxoCache {
43	pub(crate) fn new() -> Self {
44		Self::default()
45	}
46
47	pub(crate) fn set_mc_scripts(&mut self, mc_scripts: MainChainScripts) {
48		if self.mc_scripts != mc_scripts {
49			self.mc_scripts = mc_scripts;
50			self.transfers = vec![];
51			self.start_block = BlockNumber(0);
52			self.end_block = BlockNumber(0);
53		}
54	}
55
56	pub(crate) fn set_cached_transfers(
57		&mut self,
58		start_block: BlockNumber,
59		end_block: BlockNumber,
60		transfers: Vec<BridgeUtxo>,
61	) {
62		self.start_block = start_block;
63		self.end_block = end_block;
64		self.utxo_cache = transfers.iter().map(|utxo| (utxo.utxo_id(), utxo.clone())).collect();
65		self.transfers = transfers;
66	}
67
68	pub(crate) fn serve_from_cache(
69		&self,
70		checkpoint: &ResolvedBridgeDataCheckpoint,
71		to_block: BlockNumber,
72		max_transfers: u32,
73	) -> Option<Vec<BridgeUtxo>> {
74		if self.end_block < to_block {
75			return None;
76		}
77
78		let skip_pred: Box<dyn FnMut(&&BridgeUtxo) -> bool> = match checkpoint {
79			ResolvedBridgeDataCheckpoint::Block { number }
80				if self.start_block <= number.saturating_add(1u32) =>
81			{
82				Box::new(move |utxo| *number >= utxo.block_number)
83			},
84			ResolvedBridgeDataCheckpoint::Utxo { block_number, tx_ix, tx_out_ix }
85				if self.start_block <= *block_number =>
86			{
87				Box::new(move |utxo| utxo.ordering_key() <= (*block_number, *tx_ix, *tx_out_ix))
88			},
89			_ => return None,
90		};
91
92		Some(
93			self.transfers
94				.iter()
95				.skip_while(skip_pred)
96				.take_while(|utxo| to_block.0 >= utxo.block_number.0)
97				.take(max_transfers as usize)
98				.cloned()
99				.collect(),
100		)
101	}
102
103	pub(crate) fn try_resolve_checkpoint_from_cache(
104		&self,
105		utxo_id: &UtxoId,
106	) -> Option<ResolvedBridgeDataCheckpoint> {
107		let BridgeUtxo { block_number, tx_ix, utxo_ix, .. } =
108			self.utxo_cache.get(utxo_id).cloned()?;
109
110		Some(ResolvedBridgeDataCheckpoint::Utxo { block_number, tx_ix, tx_out_ix: utxo_ix })
111	}
112}
113
114observed_async_trait!(
115	impl<RecipientAddress> TokenBridgeDataSource<RecipientAddress> for CachedTokenBridgeDataSourceImpl
116	where
117		RecipientAddress: Debug,
118		RecipientAddress: (for<'a> TryFrom<&'a [u8]>),
119	{
120		async fn get_transfers(
121			&self,
122			main_chain_scripts: MainChainScripts,
123			data_checkpoint: BridgeDataCheckpoint,
124			max_transfers: u32,
125			current_mc_block_hash: McBlockHash,
126		) -> Result<
127			(Vec<BridgeTransferV1<RecipientAddress>>, BridgeDataCheckpoint),
128			Box<dyn std::error::Error + Send + Sync>,
129		> {
130			self.set_cache_mc_scripts(main_chain_scripts.clone()).await;
131
132			let to_block = self.get_block_by_hash(&current_mc_block_hash).await?.number.into();
133
134			let data_checkpoint = self.resolve_data_checkpoint(&data_checkpoint).await?;
135
136			let utxos =
137				match self.try_serve_from_cache(&data_checkpoint, to_block, max_transfers).await {
138					Some(utxos) => utxos,
139					None => {
140						self.fill_cache(main_chain_scripts, &data_checkpoint, to_block).await?;
141						self.try_serve_from_cache(&data_checkpoint, to_block, max_transfers)
142							.await
143							.ok_or("Data should be present in cache after filling cache succeeded")?
144					},
145				};
146
147			let new_checkpoint = match utxos.last() {
148				Some(utxo) if (utxos.len() as u32) >= max_transfers => {
149					BridgeDataCheckpoint::Utxo(utxo.utxo_id())
150				},
151				_ => BridgeDataCheckpoint::Block(to_block.into()),
152			};
153
154			let transfers = utxos.into_iter().flat_map(utxo_to_transfer).collect();
155
156			Ok((transfers, new_checkpoint))
157		}
158	}
159);
160
161impl CachedTokenBridgeDataSourceImpl {
162	/// Crates a new token bridge data source
163	pub fn new(
164		pool: PgPool,
165		metrics_opt: Option<McFollowerMetrics>,
166		blocks: Arc<BlockDataSourceImpl>,
167		cache_lookahead: u32,
168	) -> Self {
169		Self {
170			db_sync_config: DbSyncConfigurationProvider::new(pool.clone()),
171			pool,
172			metrics_opt,
173			blocks,
174			cache: Arc::new(Mutex::new(TokenUtxoCache::new())),
175			cache_lookahead,
176		}
177	}
178
179	async fn set_cache_mc_scripts(&self, main_chain_scripts: MainChainScripts) {
180		let mut cache = self.cache.lock().await;
181		cache.set_mc_scripts(main_chain_scripts.clone());
182	}
183
184	async fn try_serve_from_cache(
185		&self,
186		data_checkpoint: &ResolvedBridgeDataCheckpoint,
187		to_block: BlockNumber,
188		max_transfers: u32,
189	) -> Option<Vec<BridgeUtxo>> {
190		let cache = self.cache.lock().await;
191		cache.serve_from_cache(data_checkpoint, to_block, max_transfers)
192	}
193
194	async fn fill_cache(
195		&self,
196		main_chain_scripts: MainChainScripts,
197		data_checkpoint: &ResolvedBridgeDataCheckpoint,
198		to_block: BlockNumber,
199	) -> Result<(), Box<dyn Error + Send + Sync>> {
200		let from_block = data_checkpoint.get_block_number();
201
202		// We want to load all data in the block of `data_checkpoint`, so we go one block back.
203		let effective_data_checkpoint =
204			ResolvedBridgeDataCheckpoint::Block { number: from_block.saturating_sub(1u32) };
205
206		let latest_block = self.get_latest_stable_block().await?.unwrap_or(to_block);
207
208		let to_block: BlockNumber =
209			min(to_block.saturating_add(self.cache_lookahead), latest_block);
210
211		let utxos = get_bridge_utxos_tx(
212			self.db_sync_config.get_tx_in_config().await?,
213			&self.pool,
214			&main_chain_scripts.illiquid_circulation_supply_validator_address.clone().into(),
215			main_chain_scripts.asset_id().into(),
216			effective_data_checkpoint,
217			to_block.into(),
218			None,
219		)
220		.await?;
221
222		self.set_cached_transfers(from_block, to_block, utxos).await;
223
224		Ok(())
225	}
226
227	async fn set_cached_transfers(
228		&self,
229		start_block: BlockNumber,
230		end_block: BlockNumber,
231		utxos: Vec<BridgeUtxo>,
232	) {
233		let mut cache = self.cache.lock().await;
234		cache.set_cached_transfers(start_block, end_block, utxos);
235	}
236
237	async fn get_latest_stable_block(
238		&self,
239	) -> Result<Option<BlockNumber>, Box<dyn Error + Send + Sync>> {
240		let latest_block_timestamp = self.blocks.get_latest_block_info().await?.timestamp;
241		Ok(self
242			.blocks
243			.get_latest_stable_block_for(latest_block_timestamp.into())
244			.await?
245			.map(|block| block.number.into()))
246	}
247
248	async fn resolve_checkpoint_for_utxo(
249		&self,
250		utxo_id: &UtxoId,
251	) -> Result<ResolvedBridgeDataCheckpoint, Box<dyn Error + Send + Sync>> {
252		let TxBlockInfo { block_number, tx_ix } =
253			get_block_info_for_utxo(&self.pool, utxo_id.tx_hash.into())
254				.await?
255				.ok_or(format!("Could not find block info for utxo: {utxo_id:?}"))?;
256		Ok(ResolvedBridgeDataCheckpoint::Utxo {
257			block_number,
258			tx_ix,
259			tx_out_ix: utxo_id.index.into(),
260		})
261	}
262
263	async fn resolve_data_checkpoint(
264		&self,
265		data_checkpoint: &BridgeDataCheckpoint,
266	) -> Result<ResolvedBridgeDataCheckpoint, Box<dyn Error + Send + Sync>> {
267		match data_checkpoint {
268			BridgeDataCheckpoint::Block(number) => {
269				Ok(ResolvedBridgeDataCheckpoint::Block { number: (*number).into() })
270			},
271			BridgeDataCheckpoint::Utxo(utxo) => {
272				match self.cache.lock().await.try_resolve_checkpoint_from_cache(&utxo) {
273					Some(checkpoint) => Ok(checkpoint),
274					None => self.resolve_checkpoint_for_utxo(&utxo).await,
275				}
276			},
277		}
278	}
279
280	async fn get_block_by_hash(
281		&self,
282		mc_block_hash: &McBlockHash,
283	) -> Result<MainchainBlock, Box<dyn Error + Send + Sync>> {
284		Ok(self
285			.blocks
286			.get_block_by_hash(mc_block_hash.clone())
287			.await?
288			.ok_or(format!("Could not find block for hash {mc_block_hash:?}"))?)
289	}
290}