1use std::{fmt::Display, sync::Arc, time::Duration};
4
5use cardano_blockchain_types::{MultiEraBlock, Network, Point, Slot};
6use cardano_chain_follower::{ChainFollower, ChainSyncConfig};
7use duration_string::DurationString;
8use event::EventTarget;
9use futures::{stream::FuturesUnordered, StreamExt};
10use rand::{Rng, SeedableRng};
11use tokio::sync::broadcast;
12use tracing::{debug, error, info};
13
14use crate::{
15 db::index::{
16 block::{
17 index_block,
18 roll_forward::{self, PurgeCondition},
19 },
20 queries::sync_status::{
21 get::{get_sync_status, SyncStatus},
22 update::update_sync_status,
23 },
24 session::CassandraSession,
25 },
26 settings::{chain_follower, Settings},
27};
28
29pub(crate) mod event;
31pub(crate) mod util;
32
33const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
35
36async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
38 let chain = cfg.chain;
39 let dl_config = cfg.dl_config.clone();
40
41 let mut cfg = ChainSyncConfig::default_for(chain);
42 cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
43 info!(chain = %chain, "Starting Blockchain Sync");
44
45 if let Err(error) = cfg.run().await {
46 error!(chain=%chain, error=%error, "Failed to start chain sync task");
47 Err(error)?;
48 }
49
50 Ok(())
51}
52
53#[derive(Clone)]
55struct SyncParams {
56 chain: Network,
58 start: Point,
60 end: Point,
62 first_indexed_block: Option<Point>,
64 first_is_immutable: bool,
66 last_indexed_block: Option<Point>,
68 last_is_immutable: bool,
70 total_blocks_synced: u64,
72 last_blocks_synced: u64,
74 retries: u64,
76 backoff_delay: Option<Duration>,
78 result: Arc<Option<anyhow::Result<()>>>,
80 follower_roll_forward: Option<Point>,
82}
83
84impl Display for SyncParams {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 if self.result.is_none() {
87 write!(f, "Sync_Params {{ ")?;
88 } else {
89 write!(f, "Sync_Result {{ ")?;
90 }
91
92 write!(f, "start: {}, end: {}", self.start, self.end)?;
93
94 if let Some(first) = self.first_indexed_block.as_ref() {
95 write!(
96 f,
97 ", first_indexed_block: {first}{}",
98 if self.first_is_immutable { ":I" } else { "" }
99 )?;
100 }
101
102 if let Some(last) = self.last_indexed_block.as_ref() {
103 write!(
104 f,
105 ", last_indexed_block: {last}{}",
106 if self.last_is_immutable { ":I" } else { "" }
107 )?;
108 }
109
110 if self.retries > 0 {
111 write!(f, ", retries: {}", self.retries)?;
112 }
113
114 if self.retries > 0 || self.result.is_some() {
115 write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
116 }
117
118 if self.result.is_some() {
119 write!(f, ", last_sync: {}", self.last_blocks_synced)?;
120 }
121
122 if let Some(backoff) = self.backoff_delay.as_ref() {
123 write!(f, ", backoff: {}", DurationString::from(*backoff))?;
124 }
125
126 if let Some(result) = self.result.as_ref() {
127 match result {
128 Ok(()) => write!(f, ", Success")?,
129 Err(error) => write!(f, ", {error}")?,
130 };
131 }
132
133 f.write_str(" }")
134 }
135}
136
137const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
139
140impl SyncParams {
141 fn new(chain: Network, start: Point, end: Point) -> Self {
143 Self {
144 chain,
145 start,
146 end,
147 first_indexed_block: None,
148 first_is_immutable: false,
149 last_indexed_block: None,
150 last_is_immutable: false,
151 total_blocks_synced: 0,
152 last_blocks_synced: 0,
153 retries: 0,
154 backoff_delay: None,
155 result: Arc::new(None),
156 follower_roll_forward: None,
157 }
158 }
159
160 fn retry(&self) -> Self {
162 let retry_count = self.retries.saturating_add(1);
163
164 let mut backoff = None;
165
166 if self.last_blocks_synced == 0 {
169 backoff = match retry_count {
171 1 => Some(Duration::from_secs(1)), 2..5 => Some(Duration::from_secs(10)), _ => Some(Duration::from_secs(30)), };
175 }
176
177 let mut retry = self.clone();
178 retry.last_blocks_synced = 0;
179 retry.retries = retry_count;
180 retry.backoff_delay = backoff;
181 retry.result = Arc::new(None);
182 retry.follower_roll_forward = None;
183
184 retry
185 }
186
187 fn done(
189 &self, first: Option<Point>, first_immutable: bool, last: Option<Point>,
190 last_immutable: bool, synced: u64, result: anyhow::Result<()>,
191 ) -> Self {
192 if result.is_ok() && first_immutable && last_immutable {
193 update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
197 }
198
199 let mut done = self.clone();
200 done.first_indexed_block = first;
201 done.first_is_immutable = first_immutable;
202 done.last_indexed_block = last;
203 done.last_is_immutable = last_immutable;
204 done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
205 done.last_blocks_synced = synced;
206
207 done.result = Arc::new(Some(result));
208
209 done
210 }
211
212 fn actual_start(&self) -> Point {
214 self.last_indexed_block
215 .as_ref()
216 .unwrap_or(&self.start)
217 .clone()
218 }
219
220 async fn backoff(&self) {
226 if let Some(backoff) = self.backoff_delay {
227 let mut rng = rand::rngs::StdRng::from_entropy();
228 let actual_backoff =
229 rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
230
231 tokio::time::sleep(actual_backoff).await;
232 }
233 }
234}
235
236#[allow(clippy::too_many_lines)]
239fn sync_subchain(
240 params: SyncParams, event_sender: broadcast::Sender<event::ChainIndexerEvent>,
241) -> tokio::task::JoinHandle<SyncParams> {
242 tokio::spawn(async move {
243 info!(chain = %params.chain, params=%params, "Indexing Blockchain");
244
245 params.backoff().await;
247
248 drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
250 info!(chain=%params.chain, params=%params,"Indexing DB is ready");
251
252 let mut first_indexed_block = params.first_indexed_block.clone();
253 let mut first_immutable = params.first_is_immutable;
254 let mut last_indexed_block = params.last_indexed_block.clone();
255 let mut last_immutable = params.last_is_immutable;
256 let mut blocks_synced = 0u64;
257
258 let mut follower =
259 ChainFollower::new(params.chain, params.actual_start(), params.end.clone()).await;
260 while let Some(chain_update) = follower.next().await {
261 match chain_update.kind {
262 cardano_chain_follower::Kind::ImmutableBlockRollForward => {
263 if params.end == Point::TIP {
265 info!(chain=%params.chain, "Immutable chain rolled forward.");
269 let mut result = params.done(
270 first_indexed_block,
271 first_immutable,
272 last_indexed_block,
273 last_immutable,
274 blocks_synced,
275 Ok(()),
276 );
277 result.follower_roll_forward = Some(chain_update.block_data().point());
279 return result;
280 };
281 },
282 cardano_chain_follower::Kind::Block => {
283 let block = chain_update.block_data();
284
285 if let Err(error) = index_block(block).await {
286 let error_msg = format!("Failed to index block {}", block.point());
287 error!(chain=%params.chain, error=%error, params=%params, error_msg);
288 return params.done(
289 first_indexed_block,
290 first_immutable,
291 last_indexed_block,
292 last_immutable,
293 blocks_synced,
294 Err(error.context(error_msg)),
295 );
296 }
297
298 update_block_state(
299 block,
300 &mut first_indexed_block,
301 &mut first_immutable,
302 &mut last_indexed_block,
303 &mut last_immutable,
304 &mut blocks_synced,
305 );
306 },
307
308 cardano_chain_follower::Kind::Rollback => {
309 let rollback_slot = chain_update.block_data().slot();
311
312 let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
313 if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
314 error!(chain=%params.chain, error=%error, "Chain follower
315 rollback, purging volatile data task failed.");
316 } else {
317 #[allow(clippy::arithmetic_side_effects)]
319 let purge_slots = params
320 .last_indexed_block
321 .as_ref()
322 .map_or(0.into(), |l| l.slot_or_default() - rollback_slot);
324
325 let _ = event_sender.send(event::ChainIndexerEvent::ForwardDataPurged {
326 purge_slots: purge_slots.into(),
327 });
328
329 let block = chain_update.block_data();
331 if let Err(error) = index_block(block).await {
332 let error_msg =
333 format!("Failed to index block after rollback {}", block.point());
334 error!(chain=%params.chain, error=%error, params=%params, error_msg);
335 return params.done(
336 first_indexed_block,
337 first_immutable,
338 last_indexed_block,
339 last_immutable,
340 blocks_synced,
341 Err(error.context(error_msg)),
342 );
343 }
344
345 update_block_state(
346 block,
347 &mut first_indexed_block,
348 &mut first_immutable,
349 &mut last_indexed_block,
350 &mut last_immutable,
351 &mut blocks_synced,
352 );
353 }
354 },
355 }
356 }
357
358 let result = params.done(
359 first_indexed_block,
360 first_immutable,
361 last_indexed_block,
362 last_immutable,
363 blocks_synced,
364 Ok(()),
365 );
366
367 info!(chain = %params.chain, result=%result, "Indexing Blockchain Completed: OK");
368
369 result
370 })
371}
372
373fn update_block_state(
375 block: &MultiEraBlock, first_indexed_block: &mut Option<Point>, first_immutable: &mut bool,
376 last_indexed_block: &mut Option<Point>, last_immutable: &mut bool, blocks_synced: &mut u64,
377) {
378 *last_immutable = block.is_immutable();
379 *last_indexed_block = Some(block.point());
380
381 if first_indexed_block.is_none() {
382 *first_immutable = *last_immutable;
383 *first_indexed_block = Some(block.point());
384 }
385
386 *blocks_synced = blocks_synced.saturating_add(1);
387}
388
389struct SyncTask {
392 cfg: chain_follower::EnvVars,
394
395 sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
397
398 current_sync_tasks: u16,
400
401 start_slot: Slot,
403
404 immutable_tip_slot: Slot,
406
407 live_tip_slot: Slot,
409
410 sync_status: Vec<SyncStatus>,
412
413 event_channel: (
415 broadcast::Sender<event::ChainIndexerEvent>,
416 broadcast::Receiver<event::ChainIndexerEvent>,
417 ),
418}
419
420impl SyncTask {
421 fn new(cfg: chain_follower::EnvVars) -> SyncTask {
423 Self {
424 cfg,
425 sync_tasks: FuturesUnordered::new(),
426 start_slot: 0.into(),
427 current_sync_tasks: 0,
428 immutable_tip_slot: 0.into(),
429 live_tip_slot: 0.into(),
430 sync_status: Vec::new(),
431 event_channel: broadcast::channel(10),
432 }
433 }
434
435 async fn run(&mut self) {
439 let tips = ChainFollower::get_tips(self.cfg.chain).await;
442 self.immutable_tip_slot = tips.0.slot_or_default();
443 self.live_tip_slot = tips.1.slot_or_default();
444 info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Blockchain ready to sync from.");
445
446 self.dispatch_event(event::ChainIndexerEvent::ImmutableTipSlotChanged {
447 slot: self.immutable_tip_slot,
448 });
449 self.dispatch_event(event::ChainIndexerEvent::LiveTipSlotChanged {
450 slot: self.live_tip_slot,
451 });
452
453 drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
457 info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state");
458 self.sync_status = get_sync_status().await;
459 debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
460
461 self.sync_tasks.push(sync_subchain(
464 SyncParams::new(
465 self.cfg.chain,
466 Point::fuzzy(self.immutable_tip_slot),
467 Point::TIP,
468 ),
469 self.event_channel.0.clone(),
470 ));
471
472 self.start_immutable_followers();
473
474 self.dispatch_event(event::ChainIndexerEvent::SyncStarted);
475
476 while let Some(completed) = self.sync_tasks.next().await {
482 match completed {
483 Ok(finished) => {
484 if finished.end == Point::TIP {
492 if let Some(ref roll_forward_point) = finished.follower_roll_forward {
493 self.immutable_tip_slot = roll_forward_point.slot_or_default();
496
497 self.dispatch_event(
498 event::ChainIndexerEvent::ImmutableTipSlotChanged {
499 slot: self.immutable_tip_slot,
500 },
501 );
502
503 self.start_immutable_followers();
504 } else {
505 error!(chain=%self.cfg.chain, report=%finished,
506 "The TIP follower failed, restarting it.");
507 }
508
509 self.sync_tasks.push(sync_subchain(
511 finished.retry(),
512 self.event_channel.0.clone(),
513 ));
514 } else if let Some(result) = finished.result.as_ref() {
515 match result {
516 Ok(()) => {
517 self.current_sync_tasks =
518 self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
519 error!("current_sync_tasks -= 1 overflow");
520 0
521 });
522 info!(chain=%self.cfg.chain, report=%finished,
523 "The Immutable follower completed successfully.");
524
525 finished.last_indexed_block.as_ref().inspect(|block| {
526 self.dispatch_event(
527 event::ChainIndexerEvent::IndexedSlotProgressed {
528 slot: block.slot_or_default(),
529 },
530 );
531 });
532 self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
533 current_sync_tasks: self.current_sync_tasks,
534 });
535
536 self.start_immutable_followers();
539 },
540 Err(error) => {
541 error!(chain=%self.cfg.chain, report=%finished, error=%error,
542 "An Immutable follower failed, restarting it.");
543 self.sync_tasks.push(sync_subchain(
546 finished.retry(),
547 self.event_channel.0.clone(),
548 ));
549 },
550 }
551 } else {
552 error!(chain=%self.cfg.chain, report=%finished,
553 "BUG: The Immutable follower completed, but without a proper result.");
554 }
555 },
556 Err(error) => {
557 error!(chain=%self.cfg.chain, error=%error, "BUG: Sync task failed. Can not restart it, not enough information. Sync is probably failed at this point.");
558 },
559 }
560
561 if self.sync_tasks.len() == 1 {
570 self.dispatch_event(event::ChainIndexerEvent::SyncCompleted);
571
572 #[allow(clippy::arithmetic_side_effects)]
575 let purge_to_slot =
576 self.immutable_tip_slot - Settings::purge_backward_slot_buffer();
577 let purge_condition = PurgeCondition::PurgeBackwards(purge_to_slot);
578 if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
579 error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
580 } else {
581 self.dispatch_event(event::ChainIndexerEvent::BackwardDataPurged);
582 }
583 }
584 }
585
586 error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped. This is an unexpected error!");
587 }
588
589 fn start_immutable_followers(&mut self) {
591 if self.start_slot < self.immutable_tip_slot {
597 while self.current_sync_tasks < self.cfg.sync_tasks {
599 let end_slot = self.immutable_tip_slot.min(
600 (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
601 .into(),
602 );
603
604 if let Some((first_point, last_point)) =
605 self.get_syncable_range(self.start_slot, end_slot)
606 {
607 self.sync_tasks.push(sync_subchain(
608 SyncParams::new(self.cfg.chain, first_point, last_point.clone()),
609 self.event_channel.0.clone(),
610 ));
611 self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
612
613 self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
614 current_sync_tasks: self.current_sync_tasks,
615 });
616 }
617
618 self.start_slot = end_slot;
621
622 if end_slot == self.immutable_tip_slot {
623 break;
624 }
625 }
626 info!(chain=%self.cfg.chain, tasks=self.current_sync_tasks, until=?self.start_slot, "Persistent Indexing DB tasks started");
629 }
630 }
631
632 fn get_syncable_range(&self, start: Slot, end: Slot) -> Option<(Point, Point)> {
637 for sync_block in &self.sync_status {
638 if start >= sync_block.start_slot && start <= sync_block.end_slot {
640 if end <= sync_block.end_slot {
642 return None;
643 }
644
645 return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
652 }
653 }
654
655 let start_slot = if start == 0.into() {
656 Point::ORIGIN
657 } else {
658 Point::fuzzy(start)
659 };
660
661 Some((start_slot, Point::fuzzy(end)))
662 }
663}
664
665impl event::EventTarget<event::ChainIndexerEvent> for SyncTask {
666 fn add_event_listener(&mut self, listener: event::EventListenerFn<event::ChainIndexerEvent>) {
667 let mut rx = self.event_channel.0.subscribe();
668 tokio::spawn(async move {
669 while let Ok(event) = rx.recv().await {
670 (listener)(&event);
671 }
672 });
673 }
674
675 fn dispatch_event(&self, message: event::ChainIndexerEvent) {
676 let _ = self.event_channel.0.send(message);
677 }
678}
679
680pub(crate) async fn start_followers() -> anyhow::Result<()> {
682 let cfg = Settings::follower_cfg();
683
684 cfg.log();
686
687 start_sync_for(&cfg).await?;
689 info!(chain=%cfg.chain,"Chain Sync is started.");
690
691 tokio::spawn(async move {
692 use self::event::ChainIndexerEvent as Event;
693 use crate::metrics::chain_indexer::reporter;
694
695 let api_host_names = Settings::api_host_names().join(",");
696 let service_id = Settings::service_id();
697 let network = cfg.chain.to_string();
698
699 let mut sync_task = SyncTask::new(cfg);
700
701 sync_task.add_event_listener(Box::new(move |event: &Event| {
703 if let Event::SyncStarted = event {
704 reporter::REACHED_TIP
705 .with_label_values(&[&api_host_names, service_id, &network])
706 .set(0);
707 }
708 if let Event::SyncCompleted = event {
709 reporter::REACHED_TIP
710 .with_label_values(&[&api_host_names, service_id, &network])
711 .set(1);
712 }
713 if let Event::SyncTasksChanged { current_sync_tasks } = event {
714 reporter::RUNNING_INDEXER_TASKS_COUNT
715 .with_label_values(&[&api_host_names, service_id, &network])
716 .set(From::from(*current_sync_tasks));
717 }
718 if let Event::LiveTipSlotChanged { slot } = event {
719 reporter::CURRENT_LIVE_TIP_SLOT
720 .with_label_values(&[&api_host_names, service_id, &network])
721 .set(i64::try_from(u64::from(*slot)).unwrap_or(-1));
722 }
723 if let Event::ImmutableTipSlotChanged { slot } = event {
724 reporter::CURRENT_IMMUTABLE_TIP_SLOT
725 .with_label_values(&[&api_host_names, service_id, &network])
726 .set(i64::try_from(u64::from(*slot)).unwrap_or(-1));
727 }
728 if let Event::IndexedSlotProgressed { slot } = event {
729 reporter::HIGHEST_COMPLETE_INDEXED_SLOT
730 .with_label_values(&[&api_host_names, service_id, &network])
731 .set(i64::try_from(u64::from(*slot)).unwrap_or(-1));
732 }
733 if let Event::BackwardDataPurged = event {
734 reporter::TRIGGERED_BACKWARD_PURGES_COUNT
735 .with_label_values(&[&api_host_names, service_id, &network])
736 .inc();
737 }
738 if let Event::ForwardDataPurged { purge_slots } = event {
739 reporter::TRIGGERED_FORWARD_PURGES_COUNT
740 .with_label_values(&[&api_host_names, service_id, &network])
741 .inc();
742 reporter::PURGED_SLOTS
743 .with_label_values(&[&api_host_names, service_id, &network])
744 .set(i64::try_from(*purge_slots).unwrap_or(-1));
745 }
746 }));
747
748 sync_task.run().await;
749 });
750
751 Ok(())
752}