partner_chains_dolos_data_sources/
block.rs1use crate::{
2 DataSourceError, Result,
3 client::{MiniBFClient, api::MiniBFApi, conversions::from_block_content},
4 read_mc_epoch_config,
5};
6use chrono::{DateTime, NaiveDateTime, TimeDelta};
7use derive_new::new;
8use figment::{Figment, providers::Env};
9use log::{debug, info};
10use serde::Deserialize;
11use sidechain_domain::mainchain_epoch::{MainchainEpochConfig, MainchainEpochDerivation};
12use sidechain_domain::*;
13use sp_timestamp::Timestamp;
14use std::{
15 error::Error,
16 sync::{Arc, Mutex},
17};
18
19#[derive(new)]
20pub struct BlockDataSourceImpl {
21 client: MiniBFClient,
23 security_parameter: u32,
28 min_slot_boundary_as_seconds: TimeDelta,
31 max_slot_boundary_as_seconds: TimeDelta,
33 mainchain_epoch_config: MainchainEpochConfig,
35 block_stability_margin: u32,
39 cache_size: u16,
41 stable_blocks_cache: Arc<Mutex<BlocksCache>>,
43}
44
45impl BlockDataSourceImpl {
46 pub async fn get_latest_block_info(&self) -> Result<MainchainBlock> {
48 self.client
49 .blocks_latest()
50 .await
51 .map_err(|e| {
52 DataSourceError::ExpectedDataNotFound(format!("No latest block on chain. {e}",))
53 .into()
54 })
55 .and_then(from_block_content)
56 }
57
58 pub async fn get_latest_stable_block_for(
62 &self,
63 reference_timestamp: Timestamp,
64 ) -> Result<Option<MainchainBlock>> {
65 let reference_timestamp = BlockDataSourceImpl::timestamp_to_db_type(reference_timestamp)?;
66 let latest = self.get_latest_block_info().await?;
67 let offset = self.security_parameter + self.block_stability_margin;
68 let stable = latest.number.saturating_sub(offset).into();
69 let block = self.get_latest_block(stable, reference_timestamp).await?;
70 Ok(block)
71 }
72
73 pub async fn get_stable_block_for(
76 &self,
77 hash: McBlockHash,
78 reference_timestamp: Timestamp,
79 ) -> Result<Option<MainchainBlock>> {
80 let reference_timestamp = BlockDataSourceImpl::timestamp_to_db_type(reference_timestamp)?;
81 self.get_stable_block_by_hash(hash, reference_timestamp).await
82 }
83
84 pub async fn get_block_by_hash(&self, hash: McBlockHash) -> Result<Option<MainchainBlock>> {
86 let from_cache = if let Ok(cache) = self.stable_blocks_cache.lock() {
87 cache.find_by_hash(hash.clone())
88 } else {
89 None
90 };
91 let block_opt = match from_cache {
92 Some(block) => Some(block),
93 None => Some(from_block_content(self.client.blocks_by_id(hash).await?)?),
94 };
95 Ok(block_opt)
96 }
97}
98
99#[derive(Debug, Clone, Deserialize)]
101pub struct DolosBlockDataSourceConfig {
102 pub block_stability_margin: u32,
106}
107
108impl DolosBlockDataSourceConfig {
109 pub fn from_env() -> std::result::Result<Self, Box<dyn Error + Send + Sync + 'static>> {
111 let config: Self = Figment::new()
112 .merge(Env::raw())
113 .extract()
114 .map_err(|e| format!("Failed to read block data source config: {e}"))?;
115 info!("Using block data source configuration: {config:?}");
116 Ok(config)
117 }
118}
119
120impl BlockDataSourceImpl {
121 pub async fn new_from_env(
123 client: MiniBFClient,
124 ) -> std::result::Result<Self, Box<dyn Error + Send + Sync + 'static>> {
125 Self::from_config(client, DolosBlockDataSourceConfig::from_env()?, &read_mc_epoch_config()?)
126 .await
127 }
128
129 pub async fn from_config(
131 client: MiniBFClient,
132 DolosBlockDataSourceConfig { block_stability_margin }: DolosBlockDataSourceConfig,
133 mc_epoch_config: &MainchainEpochConfig,
134 ) -> Result<BlockDataSourceImpl> {
135 let genesis = client.genesis().await?;
136 let active_slots_coeff = genesis.active_slots_coefficient;
137 let security_parameter = genesis.security_param as u32;
138 let k: f64 = security_parameter.into();
139 let slot_duration: f64 = mc_epoch_config.slot_duration_millis.millis() as f64;
140 let min_slot_boundary = (slot_duration * k / active_slots_coeff).round() as i64;
141 let max_slot_boundary = 3 * min_slot_boundary;
142 let cache_size = 100;
143 Ok(BlockDataSourceImpl::new(
144 client,
145 security_parameter,
146 TimeDelta::milliseconds(min_slot_boundary),
147 TimeDelta::milliseconds(max_slot_boundary),
148 mc_epoch_config.clone(),
149 block_stability_margin,
150 cache_size,
151 BlocksCache::new_arc_mutex(),
152 ))
153 }
154 async fn get_latest_block(
155 &self,
156 max_block: McBlockNumber,
157 reference_timestamp: NaiveDateTime,
158 ) -> Result<Option<MainchainBlock>> {
159 let min_time_naive = self.min_block_allowed_time(reference_timestamp);
160 let min_time = convert_naive_datetime(min_time_naive);
161 let min_slot = self.date_time_to_slot(min_time_naive)?;
162 let max_time_naive = self.max_allowed_block_time(reference_timestamp);
163 let max_time = convert_naive_datetime(max_time_naive);
164 let max_slot = self.date_time_to_slot(max_time_naive)?;
165
166 let mut current_block_number = max_block;
167
168 loop {
169 let block = match self.client.blocks_by_id(current_block_number).await {
170 Ok(b) => from_block_content(b)?,
171 Err(_) => return Ok(None),
172 };
173
174 let is_time_match = block.timestamp >= min_time && block.timestamp <= max_time;
175 let is_slot_match = block.slot >= min_slot && block.slot <= max_slot;
176
177 if is_time_match && is_slot_match {
178 return Ok(Some(block));
179 }
180
181 if block.timestamp < min_time || block.slot < min_slot || block.number.0 == 0 {
182 return Ok(None);
183 }
184
185 current_block_number = block.number.saturating_sub(1u32);
186 }
187 }
188
189 fn min_block_allowed_time(&self, reference_timestamp: NaiveDateTime) -> NaiveDateTime {
190 reference_timestamp - self.max_slot_boundary_as_seconds
191 }
192
193 fn max_allowed_block_time(&self, reference_timestamp: NaiveDateTime) -> NaiveDateTime {
194 reference_timestamp - self.min_slot_boundary_as_seconds
195 }
196
197 fn is_block_time_valid(&self, block: &MainchainBlock, timestamp: NaiveDateTime) -> bool {
201 convert_naive_datetime(self.min_block_allowed_time(timestamp)) <= block.timestamp
202 && block.timestamp <= convert_naive_datetime(self.max_allowed_block_time(timestamp))
203 }
204
205 async fn get_stable_block_by_hash(
206 &self,
207 hash: McBlockHash,
208 reference_timestamp: NaiveDateTime,
209 ) -> Result<Option<MainchainBlock>> {
210 if let Some(block) =
211 self.get_stable_block_by_hash_from_cache(hash.clone(), reference_timestamp)
212 {
213 debug!("Block by hash: {hash} found in cache.");
214 Ok(Some(From::from(block)))
215 } else {
216 debug!("Block by hash: {hash}, not found in cache, serving from Dolos.");
217 if let Some(block_by_hash) =
218 self.get_stable_block_by_hash_from_db(hash, reference_timestamp).await?
219 {
220 self.fill_cache(&block_by_hash).await?;
221 Ok(Some(MainchainBlock::from(block_by_hash)))
222 } else {
223 Ok(None)
224 }
225 }
226 }
227
228 fn get_stable_block_by_hash_from_cache(
229 &self,
230 hash: McBlockHash,
231 reference_timestamp: NaiveDateTime,
232 ) -> Option<MainchainBlock> {
233 if let Ok(cache) = self.stable_blocks_cache.lock() {
234 cache
235 .find_by_hash(hash)
236 .filter(|block| self.is_block_time_valid(block, reference_timestamp))
237 } else {
238 None
239 }
240 }
241
242 async fn get_stable_block_by_hash_from_db(
244 &self,
245 hash: McBlockHash,
246 reference_timestamp: NaiveDateTime,
247 ) -> Result<Option<MainchainBlock>> {
248 let block = Some(from_block_content(self.client.blocks_by_id(hash).await?)?);
249 let latest_block = Some(from_block_content(self.client.blocks_latest().await?)?);
250 Ok(block
251 .zip(latest_block)
252 .filter(|(block, latest_block)| {
253 block.number.saturating_add(self.security_parameter) <= latest_block.number
254 && self.is_block_time_valid(block, reference_timestamp)
255 })
256 .map(|(block, _)| block))
257 }
258
259 async fn fill_cache(&self, from_block: &MainchainBlock) -> Result<()> {
261 let from_block_no = from_block.number;
262 let size = u32::from(self.cache_size);
263 let latest_block = from_block_content(self.client.blocks_latest().await?)?;
264 let stable_block_num = latest_block.number.saturating_sub(self.security_parameter);
265
266 let to_block_no = from_block_no.saturating_add(size).min(stable_block_num);
267 let blocks = if from_block_no < to_block_no {
268 let futures = (from_block_no.0..=to_block_no.0).map(|block_no| async move {
269 self.client
270 .blocks_by_id(McBlockNumber(block_no))
271 .await
272 .map_err(|e| e.into())
273 .and_then(from_block_content)
274 });
275 futures::future::try_join_all(futures).await?.into_iter().collect()
276 } else {
277 vec![from_block.clone()]
278 };
279
280 if let Ok(mut cache) = self.stable_blocks_cache.lock() {
281 cache.update(blocks);
282 debug!("Cached blocks {} to {} for by hash lookups.", from_block_no.0, to_block_no.0);
283 }
284 Ok(())
285 }
286
287 fn date_time_to_slot(&self, dt: NaiveDateTime) -> Result<McSlotNumber> {
288 let millis: u64 =
289 dt.and_utc().timestamp_millis().try_into().map_err(|_| {
290 DataSourceError::BadRequest(format!("Datetime out of range: {dt:?}"))
291 })?;
292 let ts = sidechain_domain::mainchain_epoch::Timestamp::from_unix_millis(millis);
293 let slot = self
294 .mainchain_epoch_config
295 .timestamp_to_mainchain_slot_number(ts)
296 .unwrap_or(self.mainchain_epoch_config.first_slot_number);
297 Ok(McSlotNumber(slot))
298 }
299
300 fn timestamp_to_db_type(timestamp: Timestamp) -> Result<NaiveDateTime> {
301 let millis: Option<i64> = timestamp.as_millis().try_into().ok();
302 let dt = millis
303 .and_then(DateTime::from_timestamp_millis)
304 .ok_or(DataSourceError::BadRequest(format!("Timestamp out of range: {timestamp:?}")))?;
305 Ok(NaiveDateTime::new(dt.date_naive(), dt.time()))
306 }
307}
308
309fn convert_naive_datetime(d: NaiveDateTime) -> u64 {
310 d.and_utc().timestamp().try_into().expect("i64 timestamp is valid u64")
311}
312
313#[derive(new)]
315pub(crate) struct BlocksCache {
316 #[new(default)]
318 from_last_by_hash: Vec<MainchainBlock>,
319}
320
321impl BlocksCache {
322 fn find_by_hash(&self, hash: McBlockHash) -> Option<MainchainBlock> {
323 self.from_last_by_hash.iter().find(|b| b.hash == hash).cloned()
324 }
325
326 pub fn update(&mut self, from_last_by_hash: Vec<MainchainBlock>) {
327 self.from_last_by_hash = from_last_by_hash;
328 }
329
330 pub fn new_arc_mutex() -> Arc<Mutex<Self>> {
331 Arc::new(Mutex::new(Self::new()))
332 }
333}