partner_chains_db_sync_data_sources/block/
mod.rs

1//! Db-Sync data source implementation that queries Cardano block information
2use crate::{
3	DataSourceError::*,
4	data_sources::read_mc_epoch_config,
5	db_model::{self, Block, BlockNumber, SlotNumber},
6};
7use chrono::{DateTime, NaiveDateTime, TimeDelta};
8use derive_new::new;
9use figment::{Figment, providers::Env};
10use log::{debug, info};
11use serde::Deserialize;
12use sidechain_domain::mainchain_epoch::{MainchainEpochConfig, MainchainEpochDerivation};
13use sidechain_domain::*;
14use sp_timestamp::Timestamp;
15use sqlx::PgPool;
16use std::{
17	error::Error,
18	sync::{Arc, Mutex},
19};
20
21#[cfg(test)]
22mod tests;
23
24/// Db-Sync data source that queries Cardano block information
25///
26/// This data source does not implement any data source interface used by one of the
27/// Partner Chain toolkit's features, but is used internally by other data sources
28/// that require access to Cardano block data
29#[allow(clippy::too_many_arguments)]
30#[derive(new)]
31pub struct BlockDataSourceImpl {
32	/// Postgres connection pool
33	pool: PgPool,
34	/// Cardano security parameter
35	///
36	/// This parameter controls how many confirmations (blocks on top) are required by
37	/// the Cardano node to consider a block to be stable. This is a network-wide parameter.
38	security_parameter: u32,
39	/// Minimal age of a block to be considered valid stable in relation to some given timestamp.
40	/// Must be equal to `security parameter / active slot coefficient`.
41	min_slot_boundary_as_seconds: TimeDelta,
42	/// a characteristic of Ouroboros Praos and is equal to `3 * security parameter / active slot coefficient`
43	max_slot_boundary_as_seconds: TimeDelta,
44	/// Cardano main chain epoch configuration
45	mainchain_epoch_config: MainchainEpochConfig,
46	/// Additional offset applied when selecting the latest stable Cardano block
47	///
48	/// This parameter should be 0 by default and should only be increased to 1 in networks
49	/// struggling with frequent block rejections due to Db-Sync or Cardano node lag.
50	block_stability_margin: u32,
51	/// Number of contiguous Cardano blocks to be cached by this data source
52	cache_size: u16,
53	/// Internal block cache
54	stable_blocks_cache: Arc<Mutex<BlocksCache>>,
55}
56
57impl BlockDataSourceImpl {
58	/// Returns the latest _unstable_ Cardano block from the Db-Sync database
59	pub async fn get_latest_block_info(
60		&self,
61	) -> Result<MainchainBlock, Box<dyn std::error::Error + Send + Sync>> {
62		db_model::get_latest_block_info(&self.pool)
63			.await?
64			.map(From::from)
65			.ok_or(ExpectedDataNotFound("No latest block on chain.".to_string()).into())
66	}
67
68	/// Returns the latest _stable_ Cardano block from the Db-Sync database that is within
69	/// acceptable bounds from `reference_timestamp`, accounting for the additional stability
70	/// offset configured by [block_stability_margin][Self::block_stability_margin].
71	pub async fn get_latest_stable_block_for(
72		&self,
73		reference_timestamp: Timestamp,
74	) -> Result<Option<MainchainBlock>, Box<dyn std::error::Error + Send + Sync>> {
75		let reference_timestamp = BlockDataSourceImpl::timestamp_to_db_type(reference_timestamp)?;
76		let latest = self.get_latest_block_info().await?;
77		let offset = self.security_parameter + self.block_stability_margin;
78		let stable = BlockNumber(latest.number.0.saturating_sub(offset));
79		let block = self.get_latest_block(stable, reference_timestamp).await?;
80		Ok(block.map(From::from))
81	}
82
83	/// Finds a block by its `hash` and verifies that it is stable in reference to `reference_timestamp`
84	/// and returns its info
85	pub async fn get_stable_block_for(
86		&self,
87		hash: McBlockHash,
88		reference_timestamp: Timestamp,
89	) -> Result<Option<MainchainBlock>, Box<dyn std::error::Error + Send + Sync>> {
90		let reference_timestamp = BlockDataSourceImpl::timestamp_to_db_type(reference_timestamp)?;
91		self.get_stable_block_by_hash(hash, reference_timestamp).await
92	}
93
94	/// Finds a block by its `hash` and returns its info
95	pub async fn get_block_by_hash(
96		&self,
97		hash: McBlockHash,
98	) -> Result<Option<MainchainBlock>, Box<dyn std::error::Error + Send + Sync>> {
99		let from_cache = if let Ok(cache) = self.stable_blocks_cache.lock() {
100			cache.find_by_hash(hash.clone())
101		} else {
102			None
103		};
104		let block_opt = match from_cache {
105			Some(block) => Some(block),
106			None => db_model::get_block_by_hash(&self.pool, hash).await?,
107		};
108		Ok(block_opt.map(From::from))
109	}
110}
111
112/// Configuration for [BlockDataSourceImpl]
113#[derive(Debug, Clone, Deserialize)]
114pub struct DbSyncBlockDataSourceConfig {
115	/// Cardano security parameter, ie. the number of confirmations needed to stabilize a block
116	pub cardano_security_parameter: u32,
117	/// Expected fraction of Cardano slots that will have a block produced
118	///
119	/// This value can be found in `shelley-genesis.json` file used by the Cardano node,
120	/// example: `"activeSlotsCoeff": 0.05`.
121	pub cardano_active_slots_coeff: f64,
122	/// Additional offset applied when selecting the latest stable Cardano block
123	///
124	/// This parameter should be 0 by default and should only be increased to 1 in networks
125	/// struggling with frequent block rejections due to Db-Sync or Cardano node lag.
126	pub block_stability_margin: u32,
127}
128
129impl DbSyncBlockDataSourceConfig {
130	/// Reads the config from environment
131	pub fn from_env() -> std::result::Result<Self, Box<dyn Error + Send + Sync + 'static>> {
132		let config: Self = Figment::new()
133			.merge(Env::raw())
134			.extract()
135			.map_err(|e| format!("Failed to read block data source config: {e}"))?;
136		info!("Using block data source configuration: {config:?}");
137		Ok(config)
138	}
139}
140
141impl BlockDataSourceImpl {
142	/// Creates a new instance of [BlockDataSourceImpl], reading configuration from the environment.
143	pub async fn new_from_env(
144		pool: PgPool,
145	) -> std::result::Result<Self, Box<dyn Error + Send + Sync + 'static>> {
146		Ok(Self::from_config(
147			pool,
148			DbSyncBlockDataSourceConfig::from_env()?,
149			&read_mc_epoch_config()?,
150		))
151	}
152
153	/// Creates a new instance of [BlockDataSourceImpl], using passed configuration.
154	pub fn from_config(
155		pool: PgPool,
156		DbSyncBlockDataSourceConfig {
157			cardano_security_parameter,
158			cardano_active_slots_coeff,
159			block_stability_margin,
160		}: DbSyncBlockDataSourceConfig,
161		mc_epoch_config: &MainchainEpochConfig,
162	) -> BlockDataSourceImpl {
163		let k: f64 = cardano_security_parameter.into();
164		let slot_duration: f64 = mc_epoch_config.slot_duration_millis.millis() as f64;
165		let min_slot_boundary = (slot_duration * k / cardano_active_slots_coeff).round() as i64;
166		let max_slot_boundary = 3 * min_slot_boundary;
167		let cache_size = 100;
168		BlockDataSourceImpl::new(
169			pool,
170			cardano_security_parameter,
171			TimeDelta::milliseconds(min_slot_boundary),
172			TimeDelta::milliseconds(max_slot_boundary),
173			mc_epoch_config.clone(),
174			block_stability_margin,
175			cache_size,
176			BlocksCache::new_arc_mutex(),
177		)
178	}
179	async fn get_latest_block(
180		&self,
181		max_block: BlockNumber,
182		reference_timestamp: NaiveDateTime,
183	) -> Result<Option<Block>, Box<dyn std::error::Error + Send + Sync>> {
184		let min_time = self.min_block_allowed_time(reference_timestamp);
185		let min_slot = self.date_time_to_slot(min_time)?;
186		let max_time = self.max_allowed_block_time(reference_timestamp);
187		let max_slot = self.date_time_to_slot(max_time)?;
188		Ok(db_model::get_highest_block(
189			&self.pool, max_block, min_time, min_slot, max_time, max_slot,
190		)
191		.await?)
192	}
193
194	fn min_block_allowed_time(&self, reference_timestamp: NaiveDateTime) -> NaiveDateTime {
195		reference_timestamp - self.max_slot_boundary_as_seconds
196	}
197
198	fn max_allowed_block_time(&self, reference_timestamp: NaiveDateTime) -> NaiveDateTime {
199		reference_timestamp - self.min_slot_boundary_as_seconds
200	}
201
202	/// Rules for block selection and verification mandates that timestamp of the block
203	/// falls in a given range, calculated from the reference timestamp, which is either
204	/// PC current time or PC block timestamp.
205	fn is_block_time_valid(&self, block: &Block, timestamp: NaiveDateTime) -> bool {
206		self.min_block_allowed_time(timestamp) <= block.time
207			&& block.time <= self.max_allowed_block_time(timestamp)
208	}
209
210	async fn get_stable_block_by_hash(
211		&self,
212		hash: McBlockHash,
213		reference_timestamp: NaiveDateTime,
214	) -> Result<Option<MainchainBlock>, Box<dyn std::error::Error + Send + Sync>> {
215		if let Some(block) =
216			self.get_stable_block_by_hash_from_cache(hash.clone(), reference_timestamp)
217		{
218			debug!("Block by hash: {hash} found in cache.");
219			Ok(Some(From::from(block)))
220		} else {
221			debug!("Block by hash: {hash}, not found in cache, serving from database.");
222			if let Some(block_by_hash) =
223				self.get_stable_block_by_hash_from_db(hash, reference_timestamp).await?
224			{
225				self.fill_cache(&block_by_hash).await?;
226				Ok(Some(MainchainBlock::from(block_by_hash)))
227			} else {
228				Ok(None)
229			}
230		}
231	}
232
233	fn get_stable_block_by_hash_from_cache(
234		&self,
235		hash: McBlockHash,
236		reference_timestamp: NaiveDateTime,
237	) -> Option<Block> {
238		if let Ok(cache) = self.stable_blocks_cache.lock() {
239			cache
240				.find_by_hash(hash)
241				.filter(|block| self.is_block_time_valid(block, reference_timestamp))
242		} else {
243			None
244		}
245	}
246
247	/// Returns block by given hash from the cache if it is valid in reference to given timestamp
248	async fn get_stable_block_by_hash_from_db(
249		&self,
250		hash: McBlockHash,
251		reference_timestamp: NaiveDateTime,
252	) -> Result<Option<Block>, Box<dyn std::error::Error + Send + Sync>> {
253		let block = db_model::get_block_by_hash(&self.pool, hash).await?;
254		let latest_block = db_model::get_latest_block_info(&self.pool).await?;
255		Ok(block
256			.zip(latest_block)
257			.filter(|(block, latest_block)| {
258				block.block_no.0 + self.security_parameter <= latest_block.block_no.0
259					&& self.is_block_time_valid(block, reference_timestamp)
260			})
261			.map(|(block, _)| block))
262	}
263
264	/// Caches stable blocks for lookup by hash.
265	async fn fill_cache(
266		&self,
267		from_block: &Block,
268	) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
269		let from_block_no = from_block.block_no;
270		let size = u32::from(self.cache_size);
271		let latest_block =
272			db_model::get_latest_block_info(&self.pool)
273				.await?
274				.ok_or(InternalDataSourceError(
275					"No latest block when filling the caches.".to_string(),
276				))?;
277		let latest_block_num = latest_block.block_no.0;
278		let stable_block_num = latest_block_num.saturating_sub(self.security_parameter);
279
280		let to_block_no = BlockNumber(from_block_no.0.saturating_add(size).min(stable_block_num));
281		let blocks = if to_block_no.0 > from_block_no.0 {
282			db_model::get_blocks_by_numbers(&self.pool, from_block_no, to_block_no).await?
283		} else {
284			vec![from_block.clone()]
285		};
286
287		if let Ok(mut cache) = self.stable_blocks_cache.lock() {
288			cache.update(blocks);
289			debug!("Cached blocks {} to {} for by hash lookups.", from_block_no.0, to_block_no.0);
290		}
291		Ok(())
292	}
293
294	fn date_time_to_slot(
295		&self,
296		dt: NaiveDateTime,
297	) -> Result<SlotNumber, Box<dyn std::error::Error + Send + Sync>> {
298		let millis: u64 = dt
299			.and_utc()
300			.timestamp_millis()
301			.try_into()
302			.map_err(|_| BadRequest(format!("Datetime out of range: {dt:?}")))?;
303		let ts = sidechain_domain::mainchain_epoch::Timestamp::from_unix_millis(millis);
304		let slot = self
305			.mainchain_epoch_config
306			.timestamp_to_mainchain_slot_number(ts)
307			.unwrap_or(self.mainchain_epoch_config.first_slot_number);
308		Ok(SlotNumber(slot))
309	}
310
311	fn timestamp_to_db_type(
312		timestamp: Timestamp,
313	) -> Result<NaiveDateTime, Box<dyn std::error::Error + Send + Sync>> {
314		let millis: Option<i64> = timestamp.as_millis().try_into().ok();
315		let dt = millis
316			.and_then(DateTime::from_timestamp_millis)
317			.ok_or(BadRequest(format!("Timestamp out of range: {timestamp:?}")))?;
318		Ok(NaiveDateTime::new(dt.date_naive(), dt.time()))
319	}
320}
321
322/// Helper structure for caching stable blocks.
323#[derive(new)]
324pub(crate) struct BlocksCache {
325	/// Continuous main chain blocks. All blocks should be stable. Used to query by hash.
326	#[new(default)]
327	from_last_by_hash: Vec<Block>,
328}
329
330impl BlocksCache {
331	fn find_by_hash(&self, hash: McBlockHash) -> Option<Block> {
332		self.from_last_by_hash.iter().find(|b| b.hash == hash.0).cloned()
333	}
334
335	pub fn update(&mut self, from_last_by_hash: Vec<Block>) {
336		self.from_last_by_hash = from_last_by_hash;
337	}
338
339	pub fn new_arc_mutex() -> Arc<Mutex<Self>> {
340		Arc::new(Mutex::new(Self::new()))
341	}
342}