1use std::{fmt::Display, sync::Arc, time::Duration};
4
5use cardano_blockchain_types::{Network, Point, Slot};
6use cardano_chain_follower::{ChainFollower, ChainSyncConfig};
7use duration_string::DurationString;
8use futures::{stream::FuturesUnordered, StreamExt};
9use rand::{Rng, SeedableRng};
10use tracing::{debug, error, info, warn};
11
12use crate::{
13 db::index::{
14 block::{index_block, roll_forward},
15 queries::sync_status::{
16 get::{get_sync_status, SyncStatus},
17 update::update_sync_status,
18 },
19 session::CassandraSession,
20 },
21 settings::{chain_follower, Settings},
22};
23
24pub(crate) mod util;
26
27const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
29
30async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
32 let chain = cfg.chain;
33 let dl_config = cfg.dl_config.clone();
34
35 let mut cfg = ChainSyncConfig::default_for(chain);
36 cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
37 info!(chain = %chain, "Starting Blockchain Sync");
38
39 if let Err(error) = cfg.run().await {
40 error!(chain=%chain, error=%error, "Failed to start chain sync task");
41 Err(error)?;
42 }
43
44 Ok(())
45}
46
47#[derive(Clone)]
49struct SyncParams {
50 chain: Network,
52 start: Point,
54 end: Point,
56 first_indexed_block: Option<Point>,
58 first_is_immutable: bool,
60 last_indexed_block: Option<Point>,
62 last_is_immutable: bool,
64 total_blocks_synced: u64,
66 last_blocks_synced: u64,
68 retries: u64,
70 backoff_delay: Option<Duration>,
72 result: Arc<Option<anyhow::Result<()>>>,
74 follower_roll_forward: Option<Point>,
76}
77
78impl Display for SyncParams {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 if self.result.is_none() {
81 write!(f, "Sync_Params {{ ")?;
82 } else {
83 write!(f, "Sync_Result {{ ")?;
84 }
85
86 write!(f, "start: {}, end: {}", self.start, self.end)?;
87
88 if let Some(first) = self.first_indexed_block.as_ref() {
89 write!(
90 f,
91 ", first_indexed_block: {first}{}",
92 if self.first_is_immutable { ":I" } else { "" }
93 )?;
94 }
95
96 if let Some(last) = self.last_indexed_block.as_ref() {
97 write!(
98 f,
99 ", last_indexed_block: {last}{}",
100 if self.last_is_immutable { ":I" } else { "" }
101 )?;
102 }
103
104 if self.retries > 0 {
105 write!(f, ", retries: {}", self.retries)?;
106 }
107
108 if self.retries > 0 || self.result.is_some() {
109 write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
110 }
111
112 if self.result.is_some() {
113 write!(f, ", last_sync: {}", self.last_blocks_synced)?;
114 }
115
116 if let Some(backoff) = self.backoff_delay.as_ref() {
117 write!(f, ", backoff: {}", DurationString::from(*backoff))?;
118 }
119
120 if let Some(result) = self.result.as_ref() {
121 match result {
122 Ok(()) => write!(f, ", Success")?,
123 Err(error) => write!(f, ", {error}")?,
124 };
125 }
126
127 f.write_str(" }")
128 }
129}
130
131const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
133
134impl SyncParams {
135 fn new(chain: Network, start: Point, end: Point) -> Self {
137 Self {
138 chain,
139 start,
140 end,
141 first_indexed_block: None,
142 first_is_immutable: false,
143 last_indexed_block: None,
144 last_is_immutable: false,
145 total_blocks_synced: 0,
146 last_blocks_synced: 0,
147 retries: 0,
148 backoff_delay: None,
149 result: Arc::new(None),
150 follower_roll_forward: None,
151 }
152 }
153
154 fn retry(&self) -> Self {
156 let retry_count = self.retries.saturating_add(1);
157
158 let mut backoff = None;
159
160 if self.last_blocks_synced == 0 {
163 backoff = match retry_count {
165 1 => Some(Duration::from_secs(1)), 2..5 => Some(Duration::from_secs(10)), _ => Some(Duration::from_secs(30)), };
169 }
170
171 let mut retry = self.clone();
172 retry.last_blocks_synced = 0;
173 retry.retries = retry_count;
174 retry.backoff_delay = backoff;
175 retry.result = Arc::new(None);
176 retry.follower_roll_forward = None;
177
178 retry
179 }
180
181 fn done(
183 &self, first: Option<Point>, first_immutable: bool, last: Option<Point>,
184 last_immutable: bool, synced: u64, result: anyhow::Result<()>,
185 ) -> Self {
186 if result.is_ok() && first_immutable && last_immutable {
187 update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
191 }
192
193 let mut done = self.clone();
194 done.first_indexed_block = first;
195 done.first_is_immutable = first_immutable;
196 done.last_indexed_block = last;
197 done.last_is_immutable = last_immutable;
198 done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
199 done.last_blocks_synced = synced;
200
201 done.result = Arc::new(Some(result));
202
203 done
204 }
205
206 fn actual_start(&self) -> Point {
208 self.last_indexed_block
209 .as_ref()
210 .unwrap_or(&self.start)
211 .clone()
212 }
213
214 async fn backoff(&self) {
220 if let Some(backoff) = self.backoff_delay {
221 let mut rng = rand::rngs::StdRng::from_entropy();
222 let actual_backoff =
223 rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
224
225 tokio::time::sleep(actual_backoff).await;
226 }
227 }
228}
229
230fn sync_subchain(params: SyncParams) -> tokio::task::JoinHandle<SyncParams> {
233 tokio::spawn(async move {
234 info!(chain = %params.chain, params=%params, "Indexing Blockchain");
235
236 params.backoff().await;
238
239 drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
241 info!(chain=%params.chain, params=%params,"Indexing DB is ready");
242
243 let mut first_indexed_block = params.first_indexed_block.clone();
244 let mut first_immutable = params.first_is_immutable;
245 let mut last_indexed_block = params.last_indexed_block.clone();
246 let mut last_immutable = params.last_is_immutable;
247 let mut blocks_synced = 0u64;
248
249 let mut follower =
250 ChainFollower::new(params.chain, params.actual_start(), params.end.clone()).await;
251 while let Some(chain_update) = follower.next().await {
252 match chain_update.kind {
253 cardano_chain_follower::Kind::ImmutableBlockRollForward => {
254 if params.end == Point::TIP {
256 info!(chain=%params.chain, "Immutable chain rolled forward.");
260 let mut result = params.done(
261 first_indexed_block,
262 first_immutable,
263 last_indexed_block,
264 last_immutable,
265 blocks_synced,
266 Ok(()),
267 );
268 result.follower_roll_forward = Some(chain_update.block_data().point());
270 return result;
271 };
272 },
273 cardano_chain_follower::Kind::Block => {
274 let block = chain_update.block_data();
275
276 if let Err(error) = index_block(block).await {
277 let error_msg = format!("Failed to index block {}", block.point());
278 error!(chain=%params.chain, error=%error, params=%params, error_msg);
279 return params.done(
280 first_indexed_block,
281 first_immutable,
282 last_indexed_block,
283 last_immutable,
284 blocks_synced,
285 Err(error.context(error_msg)),
286 );
287 }
288
289 last_immutable = block.is_immutable();
290 last_indexed_block = Some(block.point());
291
292 if first_indexed_block.is_none() {
293 first_immutable = last_immutable;
294 first_indexed_block = Some(block.point());
295 }
296 blocks_synced = blocks_synced.saturating_add(1);
297 },
298 cardano_chain_follower::Kind::Rollback => {
299 warn!("TODO: Live Chain rollback");
300 },
304 }
305 }
306
307 let result = params.done(
308 first_indexed_block,
309 first_immutable,
310 last_indexed_block,
311 last_immutable,
312 blocks_synced,
313 Ok(()),
314 );
315
316 info!(chain = %params.chain, result=%result, "Indexing Blockchain Completed: OK");
317
318 result
319 })
320}
321
322struct SyncTask {
325 cfg: chain_follower::EnvVars,
327
328 sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
330
331 current_sync_tasks: u16,
333
334 start_slot: Slot,
336
337 immutable_tip_slot: Slot,
339
340 live_tip_slot: Slot,
342
343 sync_status: Vec<SyncStatus>,
345}
346
347impl SyncTask {
348 fn new(cfg: chain_follower::EnvVars) -> SyncTask {
350 SyncTask {
351 cfg,
352 sync_tasks: FuturesUnordered::new(),
353 start_slot: 0.into(),
354 current_sync_tasks: 0,
355 immutable_tip_slot: 0.into(),
356 live_tip_slot: 0.into(),
357 sync_status: Vec::new(),
358 }
359 }
360
361 async fn run(&mut self) {
365 let tips = ChainFollower::get_tips(self.cfg.chain).await;
368 self.immutable_tip_slot = tips.0.slot_or_default();
369 self.live_tip_slot = tips.1.slot_or_default();
370 info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Blockchain ready to sync from.");
371
372 drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
376 info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state");
377 self.sync_status = get_sync_status().await;
378 debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
379
380 self.sync_tasks.push(sync_subchain(SyncParams::new(
383 self.cfg.chain,
384 Point::fuzzy(self.immutable_tip_slot),
385 Point::TIP,
386 )));
387
388 self.start_immutable_followers();
389
390 while let Some(completed) = self.sync_tasks.next().await {
396 match completed {
397 Ok(finished) => {
398 if finished.end == Point::TIP {
406 if let Some(ref roll_forward_point) = finished.follower_roll_forward {
407 self.immutable_tip_slot = roll_forward_point.slot_or_default();
410 self.start_immutable_followers();
411 } else {
412 error!(chain=%self.cfg.chain, report=%finished,
413 "The TIP follower failed, restarting it.");
414 }
415
416 self.sync_tasks.push(sync_subchain(finished.retry()));
418 } else if let Some(result) = finished.result.as_ref() {
419 match result {
420 Ok(()) => {
421 self.current_sync_tasks =
422 self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
423 error!("current_sync_tasks -= 1 overflow");
424 0
425 });
426 info!(chain=%self.cfg.chain, report=%finished,
427 "The Immutable follower completed successfully.");
428
429 self.start_immutable_followers();
432 },
433 Err(error) => {
434 error!(chain=%self.cfg.chain, report=%finished, error=%error,
435 "An Immutable follower failed, restarting it.");
436 self.sync_tasks.push(sync_subchain(finished.retry()));
439 },
440 }
441 } else {
442 error!(chain=%self.cfg.chain, report=%finished,
443 "BUG: The Immutable follower completed, but without a proper result.");
444 }
445 },
446 Err(error) => {
447 error!(chain=%self.cfg.chain, error=%error, "BUG: Sync task failed. Can not restart it, not enough information. Sync is probably failed at this point.");
448 },
449 }
450
451 if self.sync_tasks.len() == 1 {
460 if let Err(error) = roll_forward::purge_live_index(self.immutable_tip_slot).await {
461 error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
462 }
463 }
464 }
465
466 error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped. This is an unexpected error!");
467 }
468
469 fn start_immutable_followers(&mut self) {
471 if self.start_slot < self.immutable_tip_slot {
477 while self.current_sync_tasks < self.cfg.sync_tasks {
479 let end_slot = self.immutable_tip_slot.min(
480 (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
481 .into(),
482 );
483
484 if let Some((first_point, last_point)) =
485 self.get_syncable_range(self.start_slot, end_slot)
486 {
487 self.sync_tasks.push(sync_subchain(SyncParams::new(
488 self.cfg.chain,
489 first_point,
490 last_point.clone(),
491 )));
492 self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
493 }
494
495 self.start_slot = end_slot;
498
499 if end_slot == self.immutable_tip_slot {
500 break;
501 }
502 }
503 info!(chain=%self.cfg.chain, tasks=self.current_sync_tasks, until=?self.start_slot, "Persistent Indexing DB tasks started");
506 }
507 }
508
509 fn get_syncable_range(&self, start: Slot, end: Slot) -> Option<(Point, Point)> {
514 for sync_block in &self.sync_status {
515 if start >= sync_block.start_slot && start <= sync_block.end_slot {
517 if end <= sync_block.end_slot {
519 return None;
520 }
521
522 return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
529 }
530 }
531
532 let start_slot = if start == 0.into() {
533 Point::ORIGIN
534 } else {
535 Point::fuzzy(start)
536 };
537
538 Some((start_slot, Point::fuzzy(end)))
539 }
540}
541
542pub(crate) async fn start_followers() -> anyhow::Result<()> {
544 let cfg = Settings::follower_cfg();
545
546 cfg.log();
548
549 start_sync_for(&cfg).await?;
551 info!(chain=%cfg.chain,"Chain Sync is started.");
552
553 tokio::spawn(async move {
554 let mut sync_task = SyncTask::new(cfg);
555 sync_task.run().await;
556 });
557
558 Ok(())
559}