partner_chains_db_sync_data_sources/block/
mod.rs1use crate::{
3 DataSourceError::*,
4 data_sources::read_mc_epoch_config,
5 db_model::{self, Block, BlockNumber, SlotNumber},
6};
7use chrono::{DateTime, NaiveDateTime, TimeDelta};
8use derive_new::new;
9use figment::{Figment, providers::Env};
10use log::{debug, info};
11use serde::Deserialize;
12use sidechain_domain::mainchain_epoch::{MainchainEpochConfig, MainchainEpochDerivation};
13use sidechain_domain::*;
14use sp_timestamp::Timestamp;
15use sqlx::PgPool;
16use std::{
17 error::Error,
18 sync::{Arc, Mutex},
19};
20
21#[cfg(test)]
22mod tests;
23
24#[allow(clippy::too_many_arguments)]
30#[derive(new)]
31pub struct BlockDataSourceImpl {
32 pool: PgPool,
34 security_parameter: u32,
39 min_slot_boundary_as_seconds: TimeDelta,
42 max_slot_boundary_as_seconds: TimeDelta,
44 mainchain_epoch_config: MainchainEpochConfig,
46 block_stability_margin: u32,
51 cache_size: u16,
53 stable_blocks_cache: Arc<Mutex<BlocksCache>>,
55}
56
57impl BlockDataSourceImpl {
58 pub async fn get_latest_block_info(
60 &self,
61 ) -> Result<MainchainBlock, Box<dyn std::error::Error + Send + Sync>> {
62 db_model::get_latest_block_info(&self.pool)
63 .await?
64 .map(From::from)
65 .ok_or(ExpectedDataNotFound("No latest block on chain.".to_string()).into())
66 }
67
68 pub async fn get_latest_stable_block_for(
72 &self,
73 reference_timestamp: Timestamp,
74 ) -> Result<Option<MainchainBlock>, Box<dyn std::error::Error + Send + Sync>> {
75 let reference_timestamp = BlockDataSourceImpl::timestamp_to_db_type(reference_timestamp)?;
76 let latest = self.get_latest_block_info().await?;
77 let offset = self.security_parameter + self.block_stability_margin;
78 let stable = BlockNumber(latest.number.0.saturating_sub(offset));
79 let block = self.get_latest_block(stable, reference_timestamp).await?;
80 Ok(block.map(From::from))
81 }
82
83 pub async fn get_stable_block_for(
86 &self,
87 hash: McBlockHash,
88 reference_timestamp: Timestamp,
89 ) -> Result<Option<MainchainBlock>, Box<dyn std::error::Error + Send + Sync>> {
90 let reference_timestamp = BlockDataSourceImpl::timestamp_to_db_type(reference_timestamp)?;
91 self.get_stable_block_by_hash(hash, reference_timestamp).await
92 }
93
94 pub async fn get_block_by_hash(
96 &self,
97 hash: McBlockHash,
98 ) -> Result<Option<MainchainBlock>, Box<dyn std::error::Error + Send + Sync>> {
99 let from_cache = if let Ok(cache) = self.stable_blocks_cache.lock() {
100 cache.find_by_hash(hash.clone())
101 } else {
102 None
103 };
104 let block_opt = match from_cache {
105 Some(block) => Some(block),
106 None => db_model::get_block_by_hash(&self.pool, hash).await?,
107 };
108 Ok(block_opt.map(From::from))
109 }
110}
111
112#[derive(Debug, Clone, Deserialize)]
114pub struct DbSyncBlockDataSourceConfig {
115 pub cardano_security_parameter: u32,
117 pub cardano_active_slots_coeff: f64,
122 pub block_stability_margin: u32,
127}
128
129impl DbSyncBlockDataSourceConfig {
130 pub fn from_env() -> std::result::Result<Self, Box<dyn Error + Send + Sync + 'static>> {
132 let config: Self = Figment::new()
133 .merge(Env::raw())
134 .extract()
135 .map_err(|e| format!("Failed to read block data source config: {e}"))?;
136 info!("Using block data source configuration: {config:?}");
137 Ok(config)
138 }
139}
140
141impl BlockDataSourceImpl {
142 pub async fn new_from_env(
144 pool: PgPool,
145 ) -> std::result::Result<Self, Box<dyn Error + Send + Sync + 'static>> {
146 Ok(Self::from_config(
147 pool,
148 DbSyncBlockDataSourceConfig::from_env()?,
149 &read_mc_epoch_config()?,
150 ))
151 }
152
153 pub fn from_config(
155 pool: PgPool,
156 DbSyncBlockDataSourceConfig {
157 cardano_security_parameter,
158 cardano_active_slots_coeff,
159 block_stability_margin,
160 }: DbSyncBlockDataSourceConfig,
161 mc_epoch_config: &MainchainEpochConfig,
162 ) -> BlockDataSourceImpl {
163 let k: f64 = cardano_security_parameter.into();
164 let slot_duration: f64 = mc_epoch_config.slot_duration_millis.millis() as f64;
165 let min_slot_boundary = (slot_duration * k / cardano_active_slots_coeff).round() as i64;
166 let max_slot_boundary = 3 * min_slot_boundary;
167 let cache_size = 100;
168 BlockDataSourceImpl::new(
169 pool,
170 cardano_security_parameter,
171 TimeDelta::milliseconds(min_slot_boundary),
172 TimeDelta::milliseconds(max_slot_boundary),
173 mc_epoch_config.clone(),
174 block_stability_margin,
175 cache_size,
176 BlocksCache::new_arc_mutex(),
177 )
178 }
179 async fn get_latest_block(
180 &self,
181 max_block: BlockNumber,
182 reference_timestamp: NaiveDateTime,
183 ) -> Result<Option<Block>, Box<dyn std::error::Error + Send + Sync>> {
184 let min_time = self.min_block_allowed_time(reference_timestamp);
185 let min_slot = self.date_time_to_slot(min_time)?;
186 let max_time = self.max_allowed_block_time(reference_timestamp);
187 let max_slot = self.date_time_to_slot(max_time)?;
188 Ok(db_model::get_highest_block(
189 &self.pool, max_block, min_time, min_slot, max_time, max_slot,
190 )
191 .await?)
192 }
193
194 fn min_block_allowed_time(&self, reference_timestamp: NaiveDateTime) -> NaiveDateTime {
195 reference_timestamp - self.max_slot_boundary_as_seconds
196 }
197
198 fn max_allowed_block_time(&self, reference_timestamp: NaiveDateTime) -> NaiveDateTime {
199 reference_timestamp - self.min_slot_boundary_as_seconds
200 }
201
202 fn is_block_time_valid(&self, block: &Block, timestamp: NaiveDateTime) -> bool {
206 self.min_block_allowed_time(timestamp) <= block.time
207 && block.time <= self.max_allowed_block_time(timestamp)
208 }
209
210 async fn get_stable_block_by_hash(
211 &self,
212 hash: McBlockHash,
213 reference_timestamp: NaiveDateTime,
214 ) -> Result<Option<MainchainBlock>, Box<dyn std::error::Error + Send + Sync>> {
215 if let Some(block) =
216 self.get_stable_block_by_hash_from_cache(hash.clone(), reference_timestamp)
217 {
218 debug!("Block by hash: {hash} found in cache.");
219 Ok(Some(From::from(block)))
220 } else {
221 debug!("Block by hash: {hash}, not found in cache, serving from database.");
222 if let Some(block_by_hash) =
223 self.get_stable_block_by_hash_from_db(hash, reference_timestamp).await?
224 {
225 self.fill_cache(&block_by_hash).await?;
226 Ok(Some(MainchainBlock::from(block_by_hash)))
227 } else {
228 Ok(None)
229 }
230 }
231 }
232
233 fn get_stable_block_by_hash_from_cache(
234 &self,
235 hash: McBlockHash,
236 reference_timestamp: NaiveDateTime,
237 ) -> Option<Block> {
238 if let Ok(cache) = self.stable_blocks_cache.lock() {
239 cache
240 .find_by_hash(hash)
241 .filter(|block| self.is_block_time_valid(block, reference_timestamp))
242 } else {
243 None
244 }
245 }
246
247 async fn get_stable_block_by_hash_from_db(
249 &self,
250 hash: McBlockHash,
251 reference_timestamp: NaiveDateTime,
252 ) -> Result<Option<Block>, Box<dyn std::error::Error + Send + Sync>> {
253 let block = db_model::get_block_by_hash(&self.pool, hash).await?;
254 let latest_block = db_model::get_latest_block_info(&self.pool).await?;
255 Ok(block
256 .zip(latest_block)
257 .filter(|(block, latest_block)| {
258 block.block_no.0 + self.security_parameter <= latest_block.block_no.0
259 && self.is_block_time_valid(block, reference_timestamp)
260 })
261 .map(|(block, _)| block))
262 }
263
264 async fn fill_cache(
266 &self,
267 from_block: &Block,
268 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
269 let from_block_no = from_block.block_no;
270 let size = u32::from(self.cache_size);
271 let latest_block =
272 db_model::get_latest_block_info(&self.pool)
273 .await?
274 .ok_or(InternalDataSourceError(
275 "No latest block when filling the caches.".to_string(),
276 ))?;
277 let latest_block_num = latest_block.block_no.0;
278 let stable_block_num = latest_block_num.saturating_sub(self.security_parameter);
279
280 let to_block_no = BlockNumber(from_block_no.0.saturating_add(size).min(stable_block_num));
281 let blocks = if to_block_no.0 > from_block_no.0 {
282 db_model::get_blocks_by_numbers(&self.pool, from_block_no, to_block_no).await?
283 } else {
284 vec![from_block.clone()]
285 };
286
287 if let Ok(mut cache) = self.stable_blocks_cache.lock() {
288 cache.update(blocks);
289 debug!("Cached blocks {} to {} for by hash lookups.", from_block_no.0, to_block_no.0);
290 }
291 Ok(())
292 }
293
294 fn date_time_to_slot(
295 &self,
296 dt: NaiveDateTime,
297 ) -> Result<SlotNumber, Box<dyn std::error::Error + Send + Sync>> {
298 let millis: u64 = dt
299 .and_utc()
300 .timestamp_millis()
301 .try_into()
302 .map_err(|_| BadRequest(format!("Datetime out of range: {dt:?}")))?;
303 let ts = sidechain_domain::mainchain_epoch::Timestamp::from_unix_millis(millis);
304 let slot = self
305 .mainchain_epoch_config
306 .timestamp_to_mainchain_slot_number(ts)
307 .unwrap_or(self.mainchain_epoch_config.first_slot_number);
308 Ok(SlotNumber(slot))
309 }
310
311 fn timestamp_to_db_type(
312 timestamp: Timestamp,
313 ) -> Result<NaiveDateTime, Box<dyn std::error::Error + Send + Sync>> {
314 let millis: Option<i64> = timestamp.as_millis().try_into().ok();
315 let dt = millis
316 .and_then(DateTime::from_timestamp_millis)
317 .ok_or(BadRequest(format!("Timestamp out of range: {timestamp:?}")))?;
318 Ok(NaiveDateTime::new(dt.date_naive(), dt.time()))
319 }
320}
321
322#[derive(new)]
324pub(crate) struct BlocksCache {
325 #[new(default)]
327 from_last_by_hash: Vec<Block>,
328}
329
330impl BlocksCache {
331 fn find_by_hash(&self, hash: McBlockHash) -> Option<Block> {
332 self.from_last_by_hash.iter().find(|b| b.hash == hash.0).cloned()
333 }
334
335 pub fn update(&mut self, from_last_by_hash: Vec<Block>) {
336 self.from_last_by_hash = from_last_by_hash;
337 }
338
339 pub fn new_arc_mutex() -> Arc<Mutex<Self>> {
340 Arc::new(Mutex::new(Self::new()))
341 }
342}