cat_gateway/cardano/
mod.rs

1//! Logic for orchestrating followers
2
3use std::{fmt::Display, sync::Arc, time::Duration};
4
5use cardano_blockchain_types::{MultiEraBlock, Network, Point, Slot};
6use cardano_chain_follower::{ChainFollower, ChainSyncConfig};
7use duration_string::DurationString;
8use event::EventTarget;
9use futures::{stream::FuturesUnordered, StreamExt};
10use rand::{Rng, SeedableRng};
11use tokio::sync::broadcast;
12use tracing::{debug, error, info};
13
14use crate::{
15    db::index::{
16        block::{
17            index_block,
18            roll_forward::{self, PurgeCondition},
19        },
20        queries::sync_status::{
21            get::{get_sync_status, SyncStatus},
22            update::update_sync_status,
23        },
24        session::CassandraSession,
25    },
26    settings::{chain_follower, Settings},
27};
28
29// pub(crate) mod cip36_registration_obsolete;
30pub(crate) mod event;
31pub(crate) mod util;
32
33/// How long we wait between checks for connection to the indexing DB to be ready.
34const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
35
36/// Start syncing a particular network
37async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
38    let chain = cfg.chain;
39    let dl_config = cfg.dl_config.clone();
40
41    let mut cfg = ChainSyncConfig::default_for(chain);
42    cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
43    info!(chain = %chain, "Starting Blockchain Sync");
44
45    if let Err(error) = cfg.run().await {
46        error!(chain=%chain, error=%error, "Failed to start chain sync task");
47        Err(error)?;
48    }
49
50    Ok(())
51}
52
53/// Data we return from a sync task.
54#[derive(Clone)]
55struct SyncParams {
56    /// What blockchain are we syncing.
57    chain: Network,
58    /// The starting point of this sync.
59    start: Point,
60    /// The ending point of this sync.
61    end: Point,
62    /// The first block we successfully synced.
63    first_indexed_block: Option<Point>,
64    /// Is the starting point immutable? (True = immutable, false = don't know.)
65    first_is_immutable: bool,
66    /// The last block we successfully synced.
67    last_indexed_block: Option<Point>,
68    /// Is the ending point immutable? (True = immutable, false = don't know.)
69    last_is_immutable: bool,
70    /// The number of blocks we successfully synced overall.
71    total_blocks_synced: u64,
72    /// The number of blocks we successfully synced, in the last attempt.
73    last_blocks_synced: u64,
74    /// The number of retries so far on this sync task.
75    retries: u64,
76    /// The number of retries so far on this sync task.
77    backoff_delay: Option<Duration>,
78    /// If the sync completed without error or not.
79    result: Arc<Option<anyhow::Result<()>>>,
80    /// Chain follower roll forward.
81    follower_roll_forward: Option<Point>,
82}
83
84impl Display for SyncParams {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        if self.result.is_none() {
87            write!(f, "Sync_Params {{ ")?;
88        } else {
89            write!(f, "Sync_Result {{ ")?;
90        }
91
92        write!(f, "start: {}, end: {}", self.start, self.end)?;
93
94        if let Some(first) = self.first_indexed_block.as_ref() {
95            write!(
96                f,
97                ", first_indexed_block: {first}{}",
98                if self.first_is_immutable { ":I" } else { "" }
99            )?;
100        }
101
102        if let Some(last) = self.last_indexed_block.as_ref() {
103            write!(
104                f,
105                ", last_indexed_block: {last}{}",
106                if self.last_is_immutable { ":I" } else { "" }
107            )?;
108        }
109
110        if self.retries > 0 {
111            write!(f, ", retries: {}", self.retries)?;
112        }
113
114        if self.retries > 0 || self.result.is_some() {
115            write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
116        }
117
118        if self.result.is_some() {
119            write!(f, ", last_sync: {}", self.last_blocks_synced)?;
120        }
121
122        if let Some(backoff) = self.backoff_delay.as_ref() {
123            write!(f, ", backoff: {}", DurationString::from(*backoff))?;
124        }
125
126        if let Some(result) = self.result.as_ref() {
127            match result {
128                Ok(()) => write!(f, ", Success")?,
129                Err(error) => write!(f, ", {error}")?,
130            };
131        }
132
133        f.write_str(" }")
134    }
135}
136
137/// The range we generate random backoffs within given a base backoff value.
138const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
139
140impl SyncParams {
141    /// Create a new `SyncParams`.
142    fn new(chain: Network, start: Point, end: Point) -> Self {
143        Self {
144            chain,
145            start,
146            end,
147            first_indexed_block: None,
148            first_is_immutable: false,
149            last_indexed_block: None,
150            last_is_immutable: false,
151            total_blocks_synced: 0,
152            last_blocks_synced: 0,
153            retries: 0,
154            backoff_delay: None,
155            result: Arc::new(None),
156            follower_roll_forward: None,
157        }
158    }
159
160    /// Convert a result back into parameters for a retry.
161    fn retry(&self) -> Self {
162        let retry_count = self.retries.saturating_add(1);
163
164        let mut backoff = None;
165
166        // If we did sync any blocks last time, first retry is immediate.
167        // Otherwise we backoff progressively more as we do more retries.
168        if self.last_blocks_synced == 0 {
169            // Calculate backoff based on number of retries so far.
170            backoff = match retry_count {
171                1 => Some(Duration::from_secs(1)),     // 1-3 seconds
172                2..5 => Some(Duration::from_secs(10)), // 10-30 seconds
173                _ => Some(Duration::from_secs(30)),    // 30-90 seconds.
174            };
175        }
176
177        let mut retry = self.clone();
178        retry.last_blocks_synced = 0;
179        retry.retries = retry_count;
180        retry.backoff_delay = backoff;
181        retry.result = Arc::new(None);
182        retry.follower_roll_forward = None;
183
184        retry
185    }
186
187    /// Convert Params into the result of the sync.
188    fn done(
189        &self, first: Option<Point>, first_immutable: bool, last: Option<Point>,
190        last_immutable: bool, synced: u64, result: anyhow::Result<()>,
191    ) -> Self {
192        if result.is_ok() && first_immutable && last_immutable {
193            // Update sync status in the Immutable DB.
194            // Can fire and forget, because failure to update DB will simply cause the chunk to be
195            // re-indexed, on recovery.
196            update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
197        }
198
199        let mut done = self.clone();
200        done.first_indexed_block = first;
201        done.first_is_immutable = first_immutable;
202        done.last_indexed_block = last;
203        done.last_is_immutable = last_immutable;
204        done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
205        done.last_blocks_synced = synced;
206
207        done.result = Arc::new(Some(result));
208
209        done
210    }
211
212    /// Get where this sync run actually needs to start from.
213    fn actual_start(&self) -> Point {
214        self.last_indexed_block
215            .as_ref()
216            .unwrap_or(&self.start)
217            .clone()
218    }
219
220    /// Do the backoff delay processing.
221    ///
222    /// The actual delay is a random time from the Delay itself to
223    /// `BACKOFF_RANGE_MULTIPLIER` times the delay. This is to prevent hammering the
224    /// service at regular intervals.
225    async fn backoff(&self) {
226        if let Some(backoff) = self.backoff_delay {
227            let mut rng = rand::rngs::StdRng::from_entropy();
228            let actual_backoff =
229                rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
230
231            tokio::time::sleep(actual_backoff).await;
232        }
233    }
234}
235
236/// Sync a portion of the blockchain.
237/// Set end to `Point::TIP` to sync the tip continuously.
238#[allow(clippy::too_many_lines)]
239fn sync_subchain(
240    params: SyncParams, event_sender: broadcast::Sender<event::ChainIndexerEvent>,
241) -> tokio::task::JoinHandle<SyncParams> {
242    tokio::spawn(async move {
243        info!(chain = %params.chain, params=%params, "Indexing Blockchain");
244
245        // Backoff hitting the database if we need to.
246        params.backoff().await;
247
248        // Wait for indexing DB to be ready before continuing.
249        drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
250        info!(chain=%params.chain, params=%params,"Indexing DB is ready");
251
252        let mut first_indexed_block = params.first_indexed_block.clone();
253        let mut first_immutable = params.first_is_immutable;
254        let mut last_indexed_block = params.last_indexed_block.clone();
255        let mut last_immutable = params.last_is_immutable;
256        let mut blocks_synced = 0u64;
257
258        let mut follower =
259            ChainFollower::new(params.chain, params.actual_start(), params.end.clone()).await;
260        while let Some(chain_update) = follower.next().await {
261            match chain_update.kind {
262                cardano_chain_follower::Kind::ImmutableBlockRollForward => {
263                    // We only process these on the follower tracking the TIP.
264                    if params.end == Point::TIP {
265                        // What we need to do here is tell the primary follower to start a new sync
266                        // for the new immutable data, and then purge the volatile database of the
267                        // old data (after the immutable data has synced).
268                        info!(chain=%params.chain, "Immutable chain rolled forward.");
269                        let mut result = params.done(
270                            first_indexed_block,
271                            first_immutable,
272                            last_indexed_block,
273                            last_immutable,
274                            blocks_synced,
275                            Ok(()),
276                        );
277                        // Signal the point the immutable chain rolled forward to.
278                        result.follower_roll_forward = Some(chain_update.block_data().point());
279                        return result;
280                    };
281                },
282                cardano_chain_follower::Kind::Block => {
283                    let block = chain_update.block_data();
284
285                    if let Err(error) = index_block(block).await {
286                        let error_msg = format!("Failed to index block {}", block.point());
287                        error!(chain=%params.chain, error=%error, params=%params, error_msg);
288                        return params.done(
289                            first_indexed_block,
290                            first_immutable,
291                            last_indexed_block,
292                            last_immutable,
293                            blocks_synced,
294                            Err(error.context(error_msg)),
295                        );
296                    }
297
298                    update_block_state(
299                        block,
300                        &mut first_indexed_block,
301                        &mut first_immutable,
302                        &mut last_indexed_block,
303                        &mut last_immutable,
304                        &mut blocks_synced,
305                    );
306                },
307
308                cardano_chain_follower::Kind::Rollback => {
309                    // Rollback occurs, need to purge forward
310                    let rollback_slot = chain_update.block_data().slot();
311
312                    let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
313                    if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
314                        error!(chain=%params.chain, error=%error, "Chain follower
315                    rollback, purging volatile data task failed.");
316                    } else {
317                        // How many slots are purged
318                        #[allow(clippy::arithmetic_side_effects)]
319                        let purge_slots = params
320                            .last_indexed_block
321                            .as_ref()
322                            // Slots arithmetic has saturating semantic, so this is ok
323                            .map_or(0.into(), |l| l.slot_or_default() - rollback_slot);
324
325                        let _ = event_sender.send(event::ChainIndexerEvent::ForwardDataPurged {
326                            purge_slots: purge_slots.into(),
327                        });
328
329                        // Purge success, now index the current block
330                        let block = chain_update.block_data();
331                        if let Err(error) = index_block(block).await {
332                            let error_msg =
333                                format!("Failed to index block after rollback {}", block.point());
334                            error!(chain=%params.chain, error=%error, params=%params, error_msg);
335                            return params.done(
336                                first_indexed_block,
337                                first_immutable,
338                                last_indexed_block,
339                                last_immutable,
340                                blocks_synced,
341                                Err(error.context(error_msg)),
342                            );
343                        }
344
345                        update_block_state(
346                            block,
347                            &mut first_indexed_block,
348                            &mut first_immutable,
349                            &mut last_indexed_block,
350                            &mut last_immutable,
351                            &mut blocks_synced,
352                        );
353                    }
354                },
355            }
356        }
357
358        let result = params.done(
359            first_indexed_block,
360            first_immutable,
361            last_indexed_block,
362            last_immutable,
363            blocks_synced,
364            Ok(()),
365        );
366
367        info!(chain = %params.chain, result=%result, "Indexing Blockchain Completed: OK");
368
369        result
370    })
371}
372
373/// Update block related state.
374fn update_block_state(
375    block: &MultiEraBlock, first_indexed_block: &mut Option<Point>, first_immutable: &mut bool,
376    last_indexed_block: &mut Option<Point>, last_immutable: &mut bool, blocks_synced: &mut u64,
377) {
378    *last_immutable = block.is_immutable();
379    *last_indexed_block = Some(block.point());
380
381    if first_indexed_block.is_none() {
382        *first_immutable = *last_immutable;
383        *first_indexed_block = Some(block.point());
384    }
385
386    *blocks_synced = blocks_synced.saturating_add(1);
387}
388
389/// The synchronisation task, and its state.
390/// There should ONLY ever be one of these at any time.
391struct SyncTask {
392    /// Chain follower configuration.
393    cfg: chain_follower::EnvVars,
394
395    /// The current running sync tasks.
396    sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
397
398    /// How many immutable chain follower sync tasks we are running.
399    current_sync_tasks: u16,
400
401    /// Start for the next block we would sync.
402    start_slot: Slot,
403
404    /// The immutable tip slot.
405    immutable_tip_slot: Slot,
406
407    /// The live tip slot.
408    live_tip_slot: Slot,
409
410    /// Current Sync Status.
411    sync_status: Vec<SyncStatus>,
412
413    /// Event sender during the process of sync tasks.
414    event_channel: (
415        broadcast::Sender<event::ChainIndexerEvent>,
416        broadcast::Receiver<event::ChainIndexerEvent>,
417    ),
418}
419
420impl SyncTask {
421    /// Create a new `SyncTask`.
422    fn new(cfg: chain_follower::EnvVars) -> SyncTask {
423        Self {
424            cfg,
425            sync_tasks: FuturesUnordered::new(),
426            start_slot: 0.into(),
427            current_sync_tasks: 0,
428            immutable_tip_slot: 0.into(),
429            live_tip_slot: 0.into(),
430            sync_status: Vec::new(),
431            event_channel: broadcast::channel(10),
432        }
433    }
434
435    /// Primary Chain Follower task.
436    ///
437    /// This continuously runs in the background, and never terminates.
438    async fn run(&mut self) {
439        // We can't sync until the local chain data is synced.
440        // This call will wait until we sync.
441        let tips = ChainFollower::get_tips(self.cfg.chain).await;
442        self.immutable_tip_slot = tips.0.slot_or_default();
443        self.live_tip_slot = tips.1.slot_or_default();
444        info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Blockchain ready to sync from.");
445
446        self.dispatch_event(event::ChainIndexerEvent::ImmutableTipSlotChanged {
447            slot: self.immutable_tip_slot,
448        });
449        self.dispatch_event(event::ChainIndexerEvent::LiveTipSlotChanged {
450            slot: self.live_tip_slot,
451        });
452
453        // Wait for indexing DB to be ready before continuing.
454        // We do this after the above, because other nodes may have finished already, and we don't
455        // want to wait do any work they already completed while we were fetching the blockchain.
456        drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
457        info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state");
458        self.sync_status = get_sync_status().await;
459        debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
460
461        // Start the Live Chain sync task - This can never end because it is syncing to TIP.
462        // So, if it fails, it will automatically be restarted.
463        self.sync_tasks.push(sync_subchain(
464            SyncParams::new(
465                self.cfg.chain,
466                Point::fuzzy(self.immutable_tip_slot),
467                Point::TIP,
468            ),
469            self.event_channel.0.clone(),
470        ));
471
472        self.start_immutable_followers();
473
474        self.dispatch_event(event::ChainIndexerEvent::SyncStarted);
475
476        // Wait Sync tasks to complete.  If they fail and have not completed, reschedule them.
477        // If an immutable sync task ends OK, and we still have immutable data to sync then
478        // start a new task.
479        // They will return from this iterator in the order they complete.
480        // This iterator actually never ends, because the live sync task is always restarted.
481        while let Some(completed) = self.sync_tasks.next().await {
482            match completed {
483                Ok(finished) => {
484                    // Sync task finished.  Check if it completed OK or had an error.
485                    // If it failed, we need to reschedule it.
486
487                    // The TIP follower should NEVER end, unless there is an immutable roll forward,
488                    // or there is an error.  If this is not a roll forward, log an error.
489                    // It can fail if the index DB goes down in some way.
490                    // Restart it always.
491                    if finished.end == Point::TIP {
492                        if let Some(ref roll_forward_point) = finished.follower_roll_forward {
493                            // Advance the known immutable tip, and try and start followers to reach
494                            // it.
495                            self.immutable_tip_slot = roll_forward_point.slot_or_default();
496
497                            self.dispatch_event(
498                                event::ChainIndexerEvent::ImmutableTipSlotChanged {
499                                    slot: self.immutable_tip_slot,
500                                },
501                            );
502
503                            self.start_immutable_followers();
504                        } else {
505                            error!(chain=%self.cfg.chain, report=%finished,
506                            "The TIP follower failed, restarting it.");
507                        }
508
509                        // Start the Live Chain sync task again from where it left off.
510                        self.sync_tasks.push(sync_subchain(
511                            finished.retry(),
512                            self.event_channel.0.clone(),
513                        ));
514                    } else if let Some(result) = finished.result.as_ref() {
515                        match result {
516                            Ok(()) => {
517                                self.current_sync_tasks =
518                                    self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
519                                        error!("current_sync_tasks -= 1 overflow");
520                                        0
521                                    });
522                                info!(chain=%self.cfg.chain, report=%finished,
523                                    "The Immutable follower completed successfully.");
524
525                                finished.last_indexed_block.as_ref().inspect(|block| {
526                                    self.dispatch_event(
527                                        event::ChainIndexerEvent::IndexedSlotProgressed {
528                                            slot: block.slot_or_default(),
529                                        },
530                                    );
531                                });
532                                self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
533                                    current_sync_tasks: self.current_sync_tasks,
534                                });
535
536                                // If we need more immutable chain followers to sync the block
537                                // chain, we can now start them.
538                                self.start_immutable_followers();
539                            },
540                            Err(error) => {
541                                error!(chain=%self.cfg.chain, report=%finished, error=%error,
542                                    "An Immutable follower failed, restarting it.");
543                                // Restart the Immutable Chain sync task again from where it left
544                                // off.
545                                self.sync_tasks.push(sync_subchain(
546                                    finished.retry(),
547                                    self.event_channel.0.clone(),
548                                ));
549                            },
550                        }
551                    } else {
552                        error!(chain=%self.cfg.chain, report=%finished,
553                                 "BUG: The Immutable follower completed, but without a proper result.");
554                    }
555                },
556                Err(error) => {
557                    error!(chain=%self.cfg.chain, error=%error, "BUG: Sync task failed. Can not restart it, not enough information.  Sync is probably failed at this point.");
558                },
559            }
560
561            // IF there is only 1 chain follower left in sync_tasks, then all
562            // immutable followers have finished.
563            // When this happens we need to purge the live index of any records that exist
564            // before the current immutable tip.
565            // Note: to prevent a data race when multiple nodes are syncing, we probably
566            // want to put a gap in this, so that there are X slots of overlap
567            // between the live chain and immutable chain.  This gap should be
568            // a parameter.
569            if self.sync_tasks.len() == 1 {
570                self.dispatch_event(event::ChainIndexerEvent::SyncCompleted);
571
572                // Purge data up to this slot
573                // Slots arithmetic has saturating semantic, so this is ok.
574                #[allow(clippy::arithmetic_side_effects)]
575                let purge_to_slot =
576                    self.immutable_tip_slot - Settings::purge_backward_slot_buffer();
577                let purge_condition = PurgeCondition::PurgeBackwards(purge_to_slot);
578                if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
579                    error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
580                } else {
581                    self.dispatch_event(event::ChainIndexerEvent::BackwardDataPurged);
582                }
583            }
584        }
585
586        error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped.  This is an unexpected error!");
587    }
588
589    /// Start immutable followers, if we can
590    fn start_immutable_followers(&mut self) {
591        // Start the Immutable Chain sync tasks, as required.
592        // We will start at most the number of configured sync tasks.
593        // The live chain sync task is not counted as a sync task for this config value.
594
595        // Nothing to do if the start_slot is not less than the end of the immutable chain.
596        if self.start_slot < self.immutable_tip_slot {
597            // Will also break if there are no more slots left to sync.
598            while self.current_sync_tasks < self.cfg.sync_tasks {
599                let end_slot = self.immutable_tip_slot.min(
600                    (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
601                        .into(),
602                );
603
604                if let Some((first_point, last_point)) =
605                    self.get_syncable_range(self.start_slot, end_slot)
606                {
607                    self.sync_tasks.push(sync_subchain(
608                        SyncParams::new(self.cfg.chain, first_point, last_point.clone()),
609                        self.event_channel.0.clone(),
610                    ));
611                    self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
612
613                    self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
614                        current_sync_tasks: self.current_sync_tasks,
615                    });
616                }
617
618                // The one slot overlap is deliberate, it doesn't hurt anything and prevents all off
619                // by one problems that may occur otherwise.
620                self.start_slot = end_slot;
621
622                if end_slot == self.immutable_tip_slot {
623                    break;
624                }
625            }
626            // `start_slot` is still used, because it is used to keep syncing chunks as required
627            // while each immutable sync task finishes.
628            info!(chain=%self.cfg.chain, tasks=self.current_sync_tasks, until=?self.start_slot, "Persistent Indexing DB tasks started");
629        }
630    }
631
632    /// Check if the requested range has already been indexed.
633    /// If it hasn't just return the slots as points.
634    /// If it has, return a subset that hasn't been indexed if any, or None if its been
635    /// completely indexed already.
636    fn get_syncable_range(&self, start: Slot, end: Slot) -> Option<(Point, Point)> {
637        for sync_block in &self.sync_status {
638            // Check if we start within a previously synchronized block.
639            if start >= sync_block.start_slot && start <= sync_block.end_slot {
640                // Check if we are fully contained by the sync block, if so, nothing to sync.
641                if end <= sync_block.end_slot {
642                    return None;
643                }
644
645                // In theory, we could extend into another sync block, but because we could extend
646                // into an unbounded number of sync blocks, we would need to bust
647                // this range into an unbounded number of sub chunks.
648                // It is not a problem to sync the same data mutiple times, so for simplicity we do
649                // not account for this, if the requested range goes beyond the sync
650                // block it starts within we assume that the rest is not synced.
651                return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
652            }
653        }
654
655        let start_slot = if start == 0.into() {
656            Point::ORIGIN
657        } else {
658            Point::fuzzy(start)
659        };
660
661        Some((start_slot, Point::fuzzy(end)))
662    }
663}
664
665impl event::EventTarget<event::ChainIndexerEvent> for SyncTask {
666    fn add_event_listener(&mut self, listener: event::EventListenerFn<event::ChainIndexerEvent>) {
667        let mut rx = self.event_channel.0.subscribe();
668        tokio::spawn(async move {
669            while let Ok(event) = rx.recv().await {
670                (listener)(&event);
671            }
672        });
673    }
674
675    fn dispatch_event(&self, message: event::ChainIndexerEvent) {
676        let _ = self.event_channel.0.send(message);
677    }
678}
679
680/// Start followers as per defined in the config
681pub(crate) async fn start_followers() -> anyhow::Result<()> {
682    let cfg = Settings::follower_cfg();
683
684    // Log the chain follower configuration.
685    cfg.log();
686
687    // Start Syncing the blockchain, so we can consume its data as required.
688    start_sync_for(&cfg).await?;
689    info!(chain=%cfg.chain,"Chain Sync is started.");
690
691    tokio::spawn(async move {
692        use self::event::ChainIndexerEvent as Event;
693        use crate::metrics::chain_indexer::reporter;
694
695        let api_host_names = Settings::api_host_names().join(",");
696        let service_id = Settings::service_id();
697        let network = cfg.chain.to_string();
698
699        let mut sync_task = SyncTask::new(cfg);
700
701        // add an event listener to watch for certain events to report as metrics
702        sync_task.add_event_listener(Box::new(move |event: &Event| {
703            if let Event::SyncStarted = event {
704                reporter::REACHED_TIP
705                    .with_label_values(&[&api_host_names, service_id, &network])
706                    .set(0);
707            }
708            if let Event::SyncCompleted = event {
709                reporter::REACHED_TIP
710                    .with_label_values(&[&api_host_names, service_id, &network])
711                    .set(1);
712            }
713            if let Event::SyncTasksChanged { current_sync_tasks } = event {
714                reporter::RUNNING_INDEXER_TASKS_COUNT
715                    .with_label_values(&[&api_host_names, service_id, &network])
716                    .set(From::from(*current_sync_tasks));
717            }
718            if let Event::LiveTipSlotChanged { slot } = event {
719                reporter::CURRENT_LIVE_TIP_SLOT
720                    .with_label_values(&[&api_host_names, service_id, &network])
721                    .set(i64::try_from(u64::from(*slot)).unwrap_or(-1));
722            }
723            if let Event::ImmutableTipSlotChanged { slot } = event {
724                reporter::CURRENT_IMMUTABLE_TIP_SLOT
725                    .with_label_values(&[&api_host_names, service_id, &network])
726                    .set(i64::try_from(u64::from(*slot)).unwrap_or(-1));
727            }
728            if let Event::IndexedSlotProgressed { slot } = event {
729                reporter::HIGHEST_COMPLETE_INDEXED_SLOT
730                    .with_label_values(&[&api_host_names, service_id, &network])
731                    .set(i64::try_from(u64::from(*slot)).unwrap_or(-1));
732            }
733            if let Event::BackwardDataPurged = event {
734                reporter::TRIGGERED_BACKWARD_PURGES_COUNT
735                    .with_label_values(&[&api_host_names, service_id, &network])
736                    .inc();
737            }
738            if let Event::ForwardDataPurged { purge_slots } = event {
739                reporter::TRIGGERED_FORWARD_PURGES_COUNT
740                    .with_label_values(&[&api_host_names, service_id, &network])
741                    .inc();
742                reporter::PURGED_SLOTS
743                    .with_label_values(&[&api_host_names, service_id, &network])
744                    .set(i64::try_from(*purge_slots).unwrap_or(-1));
745            }
746        }));
747
748        sync_task.run().await;
749    });
750
751    Ok(())
752}