partner_chains_dolos_data_sources/
block.rs

1use crate::{
2	DataSourceError, Result,
3	client::{MiniBFClient, api::MiniBFApi, conversions::from_block_content},
4	read_mc_epoch_config,
5};
6use chrono::{DateTime, NaiveDateTime, TimeDelta};
7use derive_new::new;
8use figment::{Figment, providers::Env};
9use log::{debug, info};
10use serde::Deserialize;
11use sidechain_domain::mainchain_epoch::{MainchainEpochConfig, MainchainEpochDerivation};
12use sidechain_domain::*;
13use sp_timestamp::Timestamp;
14use std::{
15	error::Error,
16	sync::{Arc, Mutex},
17};
18
19#[derive(new)]
20pub struct BlockDataSourceImpl {
21	/// MiniBF client
22	client: MiniBFClient,
23	/// Cardano security parameter
24	///
25	/// This parameter controls how many confirmations (blocks on top) are required by
26	/// the Cardano node to consider a block to be stable. This is a network-wide parameter.
27	security_parameter: u32,
28	/// Minimal age of a block to be considered valid stable in relation to some given timestamp.
29	/// Must be equal to `security parameter / active slot coefficient`.
30	min_slot_boundary_as_seconds: TimeDelta,
31	/// a characteristic of Ouroboros Praos and is equal to `3 * security parameter / active slot coefficient`
32	max_slot_boundary_as_seconds: TimeDelta,
33	/// Cardano main chain epoch configuration
34	mainchain_epoch_config: MainchainEpochConfig,
35	/// Additional offset applied when selecting the latest stable Cardano block
36	///
37	/// This parameter should be 1 by default.
38	block_stability_margin: u32,
39	/// Number of contiguous Cardano blocks to be cached by this data source
40	cache_size: u16,
41	/// Internal block cache
42	stable_blocks_cache: Arc<Mutex<BlocksCache>>,
43}
44
45impl BlockDataSourceImpl {
46	/// Returns the latest _unstable_ Cardano block from Dolos
47	pub async fn get_latest_block_info(&self) -> Result<MainchainBlock> {
48		self.client
49			.blocks_latest()
50			.await
51			.map_err(|e| {
52				DataSourceError::ExpectedDataNotFound(format!("No latest block on chain. {e}",))
53					.into()
54			})
55			.and_then(from_block_content)
56	}
57
58	/// Returns the latest _stable_ Cardano block from Dolos that is within
59	/// acceptable bounds from `reference_timestamp`, accounting for the additional stability
60	/// offset configured by [block_stability_margin][Self::block_stability_margin].
61	pub async fn get_latest_stable_block_for(
62		&self,
63		reference_timestamp: Timestamp,
64	) -> Result<Option<MainchainBlock>> {
65		let reference_timestamp = BlockDataSourceImpl::timestamp_to_db_type(reference_timestamp)?;
66		let latest = self.get_latest_block_info().await?;
67		let offset = self.security_parameter + self.block_stability_margin;
68		let stable = latest.number.saturating_sub(offset).into();
69		let block = self.get_latest_block(stable, reference_timestamp).await?;
70		Ok(block)
71	}
72
73	/// Finds a block by its `hash` and verifies that it is stable in reference to `reference_timestamp`
74	/// and returns its info
75	pub async fn get_stable_block_for(
76		&self,
77		hash: McBlockHash,
78		reference_timestamp: Timestamp,
79	) -> Result<Option<MainchainBlock>> {
80		let reference_timestamp = BlockDataSourceImpl::timestamp_to_db_type(reference_timestamp)?;
81		self.get_stable_block_by_hash(hash, reference_timestamp).await
82	}
83
84	/// Finds a block by its `hash` and returns its info
85	pub async fn get_block_by_hash(&self, hash: McBlockHash) -> Result<Option<MainchainBlock>> {
86		let from_cache = if let Ok(cache) = self.stable_blocks_cache.lock() {
87			cache.find_by_hash(hash.clone())
88		} else {
89			None
90		};
91		let block_opt = match from_cache {
92			Some(block) => Some(block),
93			None => Some(from_block_content(self.client.blocks_by_id(hash).await?)?),
94		};
95		Ok(block_opt)
96	}
97}
98
99/// Configuration for [BlockDataSourceImpl]
100#[derive(Debug, Clone, Deserialize)]
101pub struct DolosBlockDataSourceConfig {
102	/// Additional offset applied when selecting the latest stable Cardano block
103	///
104	/// This parameter should be 1 by default.
105	pub block_stability_margin: u32,
106}
107
108impl DolosBlockDataSourceConfig {
109	/// Reads the config from environment
110	pub fn from_env() -> std::result::Result<Self, Box<dyn Error + Send + Sync + 'static>> {
111		let config: Self = Figment::new()
112			.merge(Env::raw())
113			.extract()
114			.map_err(|e| format!("Failed to read block data source config: {e}"))?;
115		info!("Using block data source configuration: {config:?}");
116		Ok(config)
117	}
118}
119
120impl BlockDataSourceImpl {
121	/// Creates a new instance of [BlockDataSourceImpl], reading configuration from the environment.
122	pub async fn new_from_env(
123		client: MiniBFClient,
124	) -> std::result::Result<Self, Box<dyn Error + Send + Sync + 'static>> {
125		Self::from_config(client, DolosBlockDataSourceConfig::from_env()?, &read_mc_epoch_config()?)
126			.await
127	}
128
129	/// Creates a new instance of [BlockDataSourceImpl], using passed configuration.
130	pub async fn from_config(
131		client: MiniBFClient,
132		DolosBlockDataSourceConfig { block_stability_margin }: DolosBlockDataSourceConfig,
133		mc_epoch_config: &MainchainEpochConfig,
134	) -> Result<BlockDataSourceImpl> {
135		let genesis = client.genesis().await?;
136		let active_slots_coeff = genesis.active_slots_coefficient;
137		let security_parameter = genesis.security_param as u32;
138		let k: f64 = security_parameter.into();
139		let slot_duration: f64 = mc_epoch_config.slot_duration_millis.millis() as f64;
140		let min_slot_boundary = (slot_duration * k / active_slots_coeff).round() as i64;
141		let max_slot_boundary = 3 * min_slot_boundary;
142		let cache_size = 100;
143		Ok(BlockDataSourceImpl::new(
144			client,
145			security_parameter,
146			TimeDelta::milliseconds(min_slot_boundary),
147			TimeDelta::milliseconds(max_slot_boundary),
148			mc_epoch_config.clone(),
149			block_stability_margin,
150			cache_size,
151			BlocksCache::new_arc_mutex(),
152		))
153	}
154	async fn get_latest_block(
155		&self,
156		max_block: McBlockNumber,
157		reference_timestamp: NaiveDateTime,
158	) -> Result<Option<MainchainBlock>> {
159		let min_time_naive = self.min_block_allowed_time(reference_timestamp);
160		let min_time = convert_naive_datetime(min_time_naive);
161		let min_slot = self.date_time_to_slot(min_time_naive)?;
162		let max_time_naive = self.max_allowed_block_time(reference_timestamp);
163		let max_time = convert_naive_datetime(max_time_naive);
164		let max_slot = self.date_time_to_slot(max_time_naive)?;
165
166		let mut current_block_number = max_block;
167
168		loop {
169			let block = match self.client.blocks_by_id(current_block_number).await {
170				Ok(b) => from_block_content(b)?,
171				Err(_) => return Ok(None),
172			};
173
174			let is_time_match = block.timestamp >= min_time && block.timestamp <= max_time;
175			let is_slot_match = block.slot >= min_slot && block.slot <= max_slot;
176
177			if is_time_match && is_slot_match {
178				return Ok(Some(block));
179			}
180
181			if block.timestamp < min_time || block.slot < min_slot || block.number.0 == 0 {
182				return Ok(None);
183			}
184
185			current_block_number = block.number.saturating_sub(1u32);
186		}
187	}
188
189	fn min_block_allowed_time(&self, reference_timestamp: NaiveDateTime) -> NaiveDateTime {
190		reference_timestamp - self.max_slot_boundary_as_seconds
191	}
192
193	fn max_allowed_block_time(&self, reference_timestamp: NaiveDateTime) -> NaiveDateTime {
194		reference_timestamp - self.min_slot_boundary_as_seconds
195	}
196
197	/// Rules for block selection and verification mandates that timestamp of the block
198	/// falls in a given range, calculated from the reference timestamp, which is either
199	/// PC current time or PC block timestamp.
200	fn is_block_time_valid(&self, block: &MainchainBlock, timestamp: NaiveDateTime) -> bool {
201		convert_naive_datetime(self.min_block_allowed_time(timestamp)) <= block.timestamp
202			&& block.timestamp <= convert_naive_datetime(self.max_allowed_block_time(timestamp))
203	}
204
205	async fn get_stable_block_by_hash(
206		&self,
207		hash: McBlockHash,
208		reference_timestamp: NaiveDateTime,
209	) -> Result<Option<MainchainBlock>> {
210		if let Some(block) =
211			self.get_stable_block_by_hash_from_cache(hash.clone(), reference_timestamp)
212		{
213			debug!("Block by hash: {hash} found in cache.");
214			Ok(Some(From::from(block)))
215		} else {
216			debug!("Block by hash: {hash}, not found in cache, serving from Dolos.");
217			if let Some(block_by_hash) =
218				self.get_stable_block_by_hash_from_db(hash, reference_timestamp).await?
219			{
220				self.fill_cache(&block_by_hash).await?;
221				Ok(Some(MainchainBlock::from(block_by_hash)))
222			} else {
223				Ok(None)
224			}
225		}
226	}
227
228	fn get_stable_block_by_hash_from_cache(
229		&self,
230		hash: McBlockHash,
231		reference_timestamp: NaiveDateTime,
232	) -> Option<MainchainBlock> {
233		if let Ok(cache) = self.stable_blocks_cache.lock() {
234			cache
235				.find_by_hash(hash)
236				.filter(|block| self.is_block_time_valid(block, reference_timestamp))
237		} else {
238			None
239		}
240	}
241
242	/// Returns block by given hash from the cache if it is valid in reference to given timestamp
243	async fn get_stable_block_by_hash_from_db(
244		&self,
245		hash: McBlockHash,
246		reference_timestamp: NaiveDateTime,
247	) -> Result<Option<MainchainBlock>> {
248		let block = Some(from_block_content(self.client.blocks_by_id(hash).await?)?);
249		let latest_block = Some(from_block_content(self.client.blocks_latest().await?)?);
250		Ok(block
251			.zip(latest_block)
252			.filter(|(block, latest_block)| {
253				block.number.saturating_add(self.security_parameter) <= latest_block.number
254					&& self.is_block_time_valid(block, reference_timestamp)
255			})
256			.map(|(block, _)| block))
257	}
258
259	/// Caches stable blocks for lookup by hash.
260	async fn fill_cache(&self, from_block: &MainchainBlock) -> Result<()> {
261		let from_block_no = from_block.number;
262		let size = u32::from(self.cache_size);
263		let latest_block = from_block_content(self.client.blocks_latest().await?)?;
264		let stable_block_num = latest_block.number.saturating_sub(self.security_parameter);
265
266		let to_block_no = from_block_no.saturating_add(size).min(stable_block_num);
267		let blocks = if from_block_no < to_block_no {
268			let futures = (from_block_no.0..=to_block_no.0).map(|block_no| async move {
269				self.client
270					.blocks_by_id(McBlockNumber(block_no))
271					.await
272					.map_err(|e| e.into())
273					.and_then(from_block_content)
274			});
275			futures::future::try_join_all(futures).await?.into_iter().collect()
276		} else {
277			vec![from_block.clone()]
278		};
279
280		if let Ok(mut cache) = self.stable_blocks_cache.lock() {
281			cache.update(blocks);
282			debug!("Cached blocks {} to {} for by hash lookups.", from_block_no.0, to_block_no.0);
283		}
284		Ok(())
285	}
286
287	fn date_time_to_slot(&self, dt: NaiveDateTime) -> Result<McSlotNumber> {
288		let millis: u64 =
289			dt.and_utc().timestamp_millis().try_into().map_err(|_| {
290				DataSourceError::BadRequest(format!("Datetime out of range: {dt:?}"))
291			})?;
292		let ts = sidechain_domain::mainchain_epoch::Timestamp::from_unix_millis(millis);
293		let slot = self
294			.mainchain_epoch_config
295			.timestamp_to_mainchain_slot_number(ts)
296			.unwrap_or(self.mainchain_epoch_config.first_slot_number);
297		Ok(McSlotNumber(slot))
298	}
299
300	fn timestamp_to_db_type(timestamp: Timestamp) -> Result<NaiveDateTime> {
301		let millis: Option<i64> = timestamp.as_millis().try_into().ok();
302		let dt = millis
303			.and_then(DateTime::from_timestamp_millis)
304			.ok_or(DataSourceError::BadRequest(format!("Timestamp out of range: {timestamp:?}")))?;
305		Ok(NaiveDateTime::new(dt.date_naive(), dt.time()))
306	}
307}
308
309fn convert_naive_datetime(d: NaiveDateTime) -> u64 {
310	d.and_utc().timestamp().try_into().expect("i64 timestamp is valid u64")
311}
312
313/// Helper structure for caching stable blocks.
314#[derive(new)]
315pub(crate) struct BlocksCache {
316	/// Continuous main chain blocks. All blocks should be stable. Used to query by hash.
317	#[new(default)]
318	from_last_by_hash: Vec<MainchainBlock>,
319}
320
321impl BlocksCache {
322	fn find_by_hash(&self, hash: McBlockHash) -> Option<MainchainBlock> {
323		self.from_last_by_hash.iter().find(|b| b.hash == hash).cloned()
324	}
325
326	pub fn update(&mut self, from_last_by_hash: Vec<MainchainBlock>) {
327		self.from_last_by_hash = from_last_by_hash;
328	}
329
330	pub fn new_arc_mutex() -> Arc<Mutex<Self>> {
331		Arc::new(Mutex::new(Self::new()))
332	}
333}