cat_gateway/cardano/
mod.rs

1//! Logic for orchestrating followers
2
3use std::{fmt::Display, ops::Sub, sync::Arc, time::Duration};
4
5use cardano_blockchain_types::{MultiEraBlock, Network, Point, Slot};
6use cardano_chain_follower::{ChainFollower, ChainSyncConfig};
7use duration_string::DurationString;
8use event::EventTarget;
9use futures::{stream::FuturesUnordered, StreamExt};
10use rand::{Rng, SeedableRng};
11use tokio::sync::broadcast;
12use tracing::{debug, error, info};
13
14use crate::{
15    db::index::{
16        block::{
17            index_block,
18            roll_forward::{self, PurgeCondition},
19        },
20        queries::sync_status::{
21            get::{get_sync_status, SyncStatus},
22            update::update_sync_status,
23        },
24        session::CassandraSession,
25    },
26    service::utilities::health::{
27        set_follower_immutable_first_reached_tip, set_follower_live_first_reached_tip,
28    },
29    settings::{chain_follower, Settings},
30};
31
32pub(crate) mod event;
33
34/// How long we wait between checks for connection to the indexing DB to be ready.
35pub(crate) const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
36
37/// Start syncing a particular network
38async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
39    let chain = cfg.chain;
40    let dl_config = cfg.dl_config.clone();
41
42    let mut cfg = ChainSyncConfig::default_for(chain);
43    cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
44    info!(chain = %chain, "Starting Chain Sync Task");
45
46    if let Err(error) = cfg.run().await {
47        error!(chain=%chain, error=%error, "Failed to start Chain Sync Task");
48        Err(error)?;
49    }
50
51    Ok(())
52}
53
54/// Data we return from a sync task.
55#[derive(Clone)]
56struct SyncParams {
57    /// What blockchain are we syncing.
58    chain: Network,
59    /// The starting point of this sync.
60    start: Point,
61    /// The ending point of this sync.
62    end: Point,
63    /// The first block we successfully synced.
64    first_indexed_block: Option<Point>,
65    /// Is the starting point immutable? (True = immutable, false = don't know.)
66    first_is_immutable: bool,
67    /// The last block we successfully synced.
68    last_indexed_block: Option<Point>,
69    /// Is the ending point immutable? (True = immutable, false = don't know.)
70    last_is_immutable: bool,
71    /// The number of blocks we successfully synced overall.
72    total_blocks_synced: u64,
73    /// The number of blocks we successfully synced, in the last attempt.
74    last_blocks_synced: u64,
75    /// The number of retries so far on this sync task.
76    retries: u64,
77    /// The number of retries so far on this sync task.
78    backoff_delay: Option<Duration>,
79    /// If the sync completed without error or not.
80    result: Arc<Option<anyhow::Result<()>>>,
81    /// Chain follower roll forward.
82    follower_roll_forward: Option<Point>,
83}
84
85impl Display for SyncParams {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        if self.result.is_none() {
88            write!(f, "Sync_Params {{ ")?;
89        } else {
90            write!(f, "Sync_Result {{ ")?;
91        }
92
93        write!(f, "start: {}, end: {}", self.start, self.end)?;
94
95        if let Some(first) = self.first_indexed_block.as_ref() {
96            write!(
97                f,
98                ", first_indexed_block: {first}{}",
99                if self.first_is_immutable { ":I" } else { "" }
100            )?;
101        }
102
103        if let Some(last) = self.last_indexed_block.as_ref() {
104            write!(
105                f,
106                ", last_indexed_block: {last}{}",
107                if self.last_is_immutable { ":I" } else { "" }
108            )?;
109        }
110
111        if self.retries > 0 {
112            write!(f, ", retries: {}", self.retries)?;
113        }
114
115        if self.retries > 0 || self.result.is_some() {
116            write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
117        }
118
119        if self.result.is_some() {
120            write!(f, ", last_sync: {}", self.last_blocks_synced)?;
121        }
122
123        if let Some(backoff) = self.backoff_delay.as_ref() {
124            write!(f, ", backoff: {}", DurationString::from(*backoff))?;
125        }
126
127        if let Some(result) = self.result.as_ref() {
128            match result {
129                Ok(()) => write!(f, ", Success")?,
130                Err(error) => write!(f, ", {error}")?,
131            };
132        }
133
134        f.write_str(" }")
135    }
136}
137
138/// The range we generate random backoffs within given a base backoff value.
139const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
140
141impl SyncParams {
142    /// Create a new `SyncParams`.
143    fn new(chain: Network, start: Point, end: Point) -> Self {
144        Self {
145            chain,
146            start,
147            end,
148            first_indexed_block: None,
149            first_is_immutable: false,
150            last_indexed_block: None,
151            last_is_immutable: false,
152            total_blocks_synced: 0,
153            last_blocks_synced: 0,
154            retries: 0,
155            backoff_delay: None,
156            result: Arc::new(None),
157            follower_roll_forward: None,
158        }
159    }
160
161    /// Convert a result back into parameters for a retry.
162    fn retry(&self) -> Self {
163        let retry_count = self.retries.saturating_add(1);
164
165        let mut backoff = None;
166
167        // If we did sync any blocks last time, first retry is immediate.
168        // Otherwise we backoff progressively more as we do more retries.
169        if self.last_blocks_synced == 0 {
170            // Calculate backoff based on number of retries so far.
171            backoff = match retry_count {
172                1 => Some(Duration::from_secs(1)),     // 1-3 seconds
173                2..5 => Some(Duration::from_secs(10)), // 10-30 seconds
174                _ => Some(Duration::from_secs(30)),    // 30-90 seconds.
175            };
176        }
177
178        let mut retry = self.clone();
179        retry.last_blocks_synced = 0;
180        retry.retries = retry_count;
181        retry.backoff_delay = backoff;
182        retry.result = Arc::new(None);
183        retry.follower_roll_forward = None;
184
185        retry
186    }
187
188    /// Convert Params into the result of the sync.
189    pub(crate) fn done(
190        &self, first: Option<Point>, first_immutable: bool, last: Option<Point>,
191        last_immutable: bool, synced: u64, result: anyhow::Result<()>,
192    ) -> Self {
193        if result.is_ok() && self.end != Point::TIP {
194            // Update sync status in the Immutable DB.
195            // Can fire and forget, because failure to update DB will simply cause the chunk to be
196            // re-indexed, on recovery.
197            update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
198        }
199        let mut done = self.clone();
200        done.first_indexed_block = first;
201        done.first_is_immutable = first_immutable;
202        done.last_indexed_block = last;
203        done.last_is_immutable = last_immutable;
204        done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
205        done.last_blocks_synced = synced;
206        done.result = Arc::new(Some(result));
207
208        done
209    }
210
211    /// Get where this sync run actually needs to start from.
212    fn actual_start(&self) -> Point {
213        self.last_indexed_block
214            .as_ref()
215            .unwrap_or(&self.start)
216            .clone()
217    }
218
219    /// Do the backoff delay processing.
220    ///
221    /// The actual delay is a random time from the Delay itself to
222    /// `BACKOFF_RANGE_MULTIPLIER` times the delay. This is to prevent hammering the
223    /// service at regular intervals.
224    async fn backoff(&self) {
225        if let Some(backoff) = self.backoff_delay {
226            let mut rng = rand::rngs::StdRng::from_entropy();
227            let actual_backoff =
228                rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
229
230            tokio::time::sleep(actual_backoff).await;
231        }
232    }
233}
234
235/// Sync a portion of the blockchain.
236/// Set end to `Point::TIP` to sync the tip continuously.
237#[allow(clippy::too_many_lines)]
238fn sync_subchain(
239    params: SyncParams, event_sender: broadcast::Sender<event::ChainIndexerEvent>,
240) -> tokio::task::JoinHandle<SyncParams> {
241    tokio::spawn(async move {
242        // Backoff hitting the database if we need to.
243        params.backoff().await;
244
245        // Wait for indexing DB to be ready before continuing.
246        drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
247        info!(chain=%params.chain, params=%params,"Starting Chain Indexing Task");
248
249        let mut first_indexed_block = params.first_indexed_block.clone();
250        let mut first_immutable = params.first_is_immutable;
251        let mut last_indexed_block = params.last_indexed_block.clone();
252        let mut last_immutable = params.last_is_immutable;
253        let mut blocks_synced = 0u64;
254
255        let mut follower =
256            ChainFollower::new(params.chain, params.actual_start(), params.end.clone()).await;
257        while let Some(chain_update) = follower.next().await {
258            let tips = ChainFollower::get_tips(params.chain).await;
259            let immutable_slot = tips.0.slot_or_default();
260            let live_slot = tips.1.slot_or_default();
261            let event = event::ChainIndexerEvent::LiveTipSlotChanged {
262                immutable_slot,
263                live_slot,
264            };
265            if let Err(err) = event_sender.send(event) {
266                error!(error=%err, "Unable to send event.");
267            } else {
268                debug!(live_tip_slot=?live_slot, "Chain Indexer update");
269            }
270            match chain_update.kind {
271                cardano_chain_follower::Kind::ImmutableBlockRollForward => {
272                    // We only process these on the follower tracking the TIP.
273                    if params.end == Point::TIP {
274                        // What we need to do here is tell the primary follower to start a new sync
275                        // for the new immutable data, and then purge the volatile database of the
276                        // old data (after the immutable data has synced).
277                        info!(chain=%params.chain, point=?chain_update.block_data().point(), "Immutable chain rolled forward.");
278                        let mut result = params.done(
279                            first_indexed_block,
280                            first_immutable,
281                            last_indexed_block,
282                            last_immutable,
283                            blocks_synced,
284                            Ok(()),
285                        );
286                        // Signal the point the immutable chain rolled forward to.
287                        // If this is live chain immediately stops to later run immutable sync tasks
288                        result.follower_roll_forward = Some(chain_update.block_data().point());
289                        return result;
290                    }
291                },
292                cardano_chain_follower::Kind::Block => {
293                    let block = chain_update.block_data();
294
295                    if let Err(error) = index_block(block).await {
296                        let error_msg = format!("Failed to index block {}", block.point());
297                        error!(chain=%params.chain, error=%error, params=%params, error_msg);
298                        return params.done(
299                            first_indexed_block,
300                            first_immutable,
301                            last_indexed_block,
302                            last_immutable,
303                            blocks_synced,
304                            Err(error.context(error_msg)),
305                        );
306                    }
307
308                    if chain_update.tip && !set_follower_live_first_reached_tip() {
309                        let _ = event_sender.send(event::ChainIndexerEvent::SyncLiveChainCompleted);
310                    }
311
312                    update_block_state(
313                        block,
314                        &mut first_indexed_block,
315                        &mut first_immutable,
316                        &mut last_indexed_block,
317                        &mut last_immutable,
318                        &mut blocks_synced,
319                    );
320                },
321
322                cardano_chain_follower::Kind::Rollback => {
323                    // Rollback occurs, need to purge forward
324                    let rollback_slot = chain_update.block_data().slot();
325
326                    let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
327                    if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
328                        error!(chain=%params.chain, error=%error,
329                            "Chain follower rollback, purging volatile data task failed."
330                        );
331                    } else {
332                        // How many slots are purged
333                        #[allow(clippy::arithmetic_side_effects)]
334                        let purge_slots = params
335                            .last_indexed_block
336                            .as_ref()
337                            // Slots arithmetic has saturating semantic, so this is ok
338                            .map_or(0.into(), |l| l.slot_or_default() - rollback_slot);
339
340                        let _ = event_sender.send(event::ChainIndexerEvent::ForwardDataPurged {
341                            purge_slots: purge_slots.into(),
342                        });
343
344                        // Purge success, now index the current block
345                        let block = chain_update.block_data();
346                        if let Err(error) = index_block(block).await {
347                            let error_msg =
348                                format!("Failed to index block after rollback {}", block.point());
349                            error!(chain=%params.chain, error=%error, params=%params, error_msg);
350                            return params.done(
351                                first_indexed_block,
352                                first_immutable,
353                                last_indexed_block,
354                                last_immutable,
355                                blocks_synced,
356                                Err(error.context(error_msg)),
357                            );
358                        }
359
360                        update_block_state(
361                            block,
362                            &mut first_indexed_block,
363                            &mut first_immutable,
364                            &mut last_indexed_block,
365                            &mut last_immutable,
366                            &mut blocks_synced,
367                        );
368                    }
369                },
370            }
371        }
372
373        let result = params.done(
374            first_indexed_block,
375            first_immutable,
376            last_indexed_block,
377            last_immutable,
378            blocks_synced,
379            Ok(()),
380        );
381
382        info!(chain = %result.chain, result=%result, "Indexing Blockchain Completed: OK");
383
384        result
385    })
386}
387
388/// Update block related state.
389fn update_block_state(
390    block: &MultiEraBlock, first_indexed_block: &mut Option<Point>, first_immutable: &mut bool,
391    last_indexed_block: &mut Option<Point>, last_immutable: &mut bool, blocks_synced: &mut u64,
392) {
393    *last_immutable = block.is_immutable();
394    *last_indexed_block = Some(block.point());
395
396    if first_indexed_block.is_none() {
397        *first_immutable = *last_immutable;
398        *first_indexed_block = Some(block.point());
399    }
400
401    *blocks_synced = blocks_synced.saturating_add(1);
402}
403
404/// The synchronisation task, and its state.
405/// There should ONLY ever be one of these at any time.
406struct SyncTask {
407    /// Chain follower configuration.
408    cfg: chain_follower::EnvVars,
409
410    /// The current running sync tasks.
411    sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
412
413    /// How many immutable chain follower sync tasks we are running.
414    current_sync_tasks: u16,
415
416    /// Start for the next block we would sync.
417    start_slot: Slot,
418
419    /// The immutable tip slot.
420    immutable_tip_slot: Slot,
421
422    /// The live tip slot.
423    live_tip_slot: Slot,
424
425    /// Current Sync Status.
426    sync_status: Vec<SyncStatus>,
427
428    /// Event sender during the process of sync tasks.
429    event_channel: (
430        broadcast::Sender<event::ChainIndexerEvent>,
431        broadcast::Receiver<event::ChainIndexerEvent>,
432    ),
433}
434
435impl SyncTask {
436    /// Create a new `SyncTask`.
437    fn new(cfg: chain_follower::EnvVars) -> SyncTask {
438        Self {
439            cfg,
440            sync_tasks: FuturesUnordered::new(),
441            start_slot: 0.into(),
442            current_sync_tasks: 0,
443            immutable_tip_slot: 0.into(),
444            live_tip_slot: 0.into(),
445            sync_status: Vec::new(),
446            event_channel: broadcast::channel(10),
447        }
448    }
449
450    /// Add a new `SyncTask` to the queue.
451    fn add_sync_task(
452        &mut self, params: SyncParams, event_sender: broadcast::Sender<event::ChainIndexerEvent>,
453    ) {
454        self.sync_tasks.push(sync_subchain(params, event_sender));
455        self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
456        debug!(current_sync_tasks=%self.current_sync_tasks, "Added new Sync Task");
457        self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
458            current_sync_tasks: self.current_sync_tasks,
459        });
460    }
461
462    /// Update `SyncTask` count.
463    fn sync_task_finished(&mut self) {
464        self.current_sync_tasks = self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
465            error!("current_sync_tasks -= 1 overflow");
466            0
467        });
468        debug!(current_sync_tasks=%self.current_sync_tasks, "Finished Sync Task");
469        self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
470            current_sync_tasks: self.current_sync_tasks,
471        });
472    }
473
474    /// Primary Chain Follower task.
475    ///
476    /// This continuously runs in the background, and never terminates.
477    ///
478    /// Sets the Index DB liveness flag to true if it is not already set.
479    ///
480    /// Sets the Chain Follower Has First Reached Tip flag to true if it is not already
481    /// set.
482    #[allow(clippy::too_many_lines)]
483    async fn run(&mut self) {
484        // We can't sync until the local chain data is synced.
485        // This call will wait until we sync.
486        let tips = ChainFollower::get_tips(self.cfg.chain).await;
487        self.immutable_tip_slot = tips.0.slot_or_default();
488        self.live_tip_slot = tips.1.slot_or_default();
489        info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Running the primary blockchain follower task.");
490
491        self.dispatch_event(event::ChainIndexerEvent::ImmutableTipSlotChanged {
492            immutable_slot: self.immutable_tip_slot,
493            live_slot: self.live_tip_slot,
494        });
495        self.dispatch_event(event::ChainIndexerEvent::LiveTipSlotChanged {
496            immutable_slot: self.immutable_tip_slot,
497            live_slot: self.live_tip_slot,
498        });
499
500        // Wait for indexing DB to be ready before continuing.
501        // We do this after the above, because other nodes may have finished already, and we don't
502        // want to wait do any work they already completed while we were fetching the blockchain.
503        //
504        // After waiting, we set the liveness flag to true if it is not already set.
505        drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
506
507        info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state for indexing");
508        self.sync_status = get_sync_status().await;
509        debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
510
511        // Start the Live Chain sync task - This can never end because it is syncing to TIP.
512        // So, if it fails, it will automatically be restarted.
513        self.add_sync_task(
514            SyncParams::new(
515                self.cfg.chain,
516                Point::fuzzy(self.immutable_tip_slot),
517                Point::TIP,
518            ),
519            self.event_channel.0.clone(),
520        );
521        self.dispatch_event(event::ChainIndexerEvent::SyncLiveChainStarted);
522
523        self.start_immutable_followers();
524        // IF there is only 1 chain follower spawn, then the immutable state already indexed and
525        // filled in the db.
526        if self.sync_tasks.len() == 1 {
527            set_follower_immutable_first_reached_tip();
528            self.dispatch_event(event::ChainIndexerEvent::SyncImmutableChainCompleted);
529        } else {
530            self.dispatch_event(event::ChainIndexerEvent::SyncImmutableChainStarted);
531        }
532
533        // Wait Sync tasks to complete.  If they fail and have not completed, reschedule them.
534        // If an immutable sync task ends OK, and we still have immutable data to sync then
535        // start a new task.
536        // They will return from this iterator in the order they complete.
537        // This iterator actually never ends, because the live sync task is always restarted.
538        while let Some(completed) = self.sync_tasks.next().await {
539            // update sync task count
540            self.sync_task_finished();
541
542            match completed {
543                Ok(finished) => {
544                    let tips = ChainFollower::get_tips(self.cfg.chain).await;
545                    let immutable_tip_slot = tips.0.slot_or_default();
546                    let live_tip_slot = tips.1.slot_or_default();
547                    info!(immutable_tip_slot=?immutable_tip_slot, live_tip_slot=?live_tip_slot, "Chain Indexer task finished");
548                    // Sync task finished.  Check if it completed OK or had an error.
549                    // If it failed, we need to reschedule it.
550
551                    // The TIP follower should NEVER end, unless there is an immutable roll forward,
552                    // or there is an error.  If this is not a roll forward, log an error.
553                    // It can fail if the index DB goes down in some way.
554                    // Restart it always.
555                    if finished.end == Point::TIP {
556                        if let Some(ref roll_forward_point) = finished.follower_roll_forward {
557                            // Advance the known immutable tip, and try and start followers to reach
558                            // it.
559                            self.immutable_tip_slot = roll_forward_point.slot_or_default();
560
561                            self.dispatch_event(
562                                event::ChainIndexerEvent::ImmutableTipSlotChanged {
563                                    immutable_slot: self.immutable_tip_slot,
564                                    live_slot: self.live_tip_slot,
565                                },
566                            );
567                            info!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished reaching TIP.");
568
569                            self.start_immutable_followers();
570                            self.dispatch_event(
571                                event::ChainIndexerEvent::SyncImmutableChainStarted,
572                            );
573                        } else {
574                            error!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished without to reach TIP.");
575                        }
576
577                        // Start the Live Chain sync task again from where it left off.
578                        self.add_sync_task(finished.retry(), self.event_channel.0.clone());
579                    } else if let Some(result) = finished.result.as_ref() {
580                        match result {
581                            Ok(()) => {
582                                info!(chain=%self.cfg.chain, report=%finished,
583                                    "The Immutable follower completed successfully.");
584
585                                finished.last_indexed_block.as_ref().inspect(|block| {
586                                    self.dispatch_event(
587                                        event::ChainIndexerEvent::IndexedSlotProgressed {
588                                            slot: block.slot_or_default(),
589                                        },
590                                    );
591                                });
592
593                                // If we need more immutable chain followers to sync the block
594                                // chain, we can now start them.
595                                self.start_immutable_followers();
596                            },
597                            Err(error) => {
598                                error!(chain=%self.cfg.chain, report=%finished, error=%error,
599                                        "An Immutable follower failed, restarting it.");
600                                // Restart the Immutable Chain sync task again from where it left
601                                // off.
602                                self.add_sync_task(finished.retry(), self.event_channel.0.clone());
603                            },
604                        }
605                    } else {
606                        error!(chain=%self.cfg.chain, report=%finished,
607                        "BUG: The Immutable follower completed, but without a proper result.");
608                    }
609                },
610                Err(error) => {
611                    error!(chain=%self.cfg.chain, error=%error, "BUG: Sync task failed. Can not restart it, not enough information.  Sync is probably failed at this point.");
612                },
613            }
614
615            // IF there is only 1 chain follower left in sync_tasks, then all
616            // immutable followers have finished.
617            // When this happens we need to purge the live index of any records that exist
618            // before the current immutable tip.
619            // Note: to prevent a data race when multiple nodes are syncing, we probably
620            // want to put a gap in this, so that there are X slots of overlap
621            // between the live chain and immutable chain.  This gap should be
622            // a parameter.
623            if self.sync_tasks.len() == 1 {
624                set_follower_immutable_first_reached_tip();
625                self.dispatch_event(event::ChainIndexerEvent::SyncImmutableChainCompleted);
626
627                // Purge data up to this slot
628                // Slots arithmetic has saturating semantic, so this is ok.
629                #[allow(clippy::arithmetic_side_effects)]
630                let purge_to_slot =
631                    self.immutable_tip_slot - Settings::purge_backward_slot_buffer();
632                let purge_condition = PurgeCondition::PurgeBackwards(purge_to_slot);
633                info!(chain=%self.cfg.chain, purge_to_slot=?purge_to_slot, "Backwards purging volatile data.");
634                if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
635                    error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
636                } else {
637                    self.dispatch_event(event::ChainIndexerEvent::BackwardDataPurged);
638                }
639            }
640        }
641
642        error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped.  This is an unexpected error!");
643    }
644
645    /// Start immutable followers, if we can
646    fn start_immutable_followers(&mut self) {
647        // Start the Immutable Chain sync tasks, as required.
648        // We will start at most the number of configured sync tasks.
649        // The live chain sync task is not counted as a sync task for this config value.
650
651        // Nothing to do if the start_slot is not less than the end of the immutable chain.
652        if self.start_slot < self.immutable_tip_slot {
653            // Will also break if there are no more slots left to sync.
654            while self.current_sync_tasks < self.cfg.sync_tasks {
655                let end_slot = self.immutable_tip_slot.min(
656                    (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
657                        .into(),
658                );
659
660                if let Some((first_point, last_point)) =
661                    self.get_syncable_range(self.start_slot, end_slot)
662                {
663                    self.add_sync_task(
664                        SyncParams::new(self.cfg.chain, first_point, last_point.clone()),
665                        self.event_channel.0.clone(),
666                    );
667                }
668
669                // The one slot overlap is deliberate, it doesn't hurt anything and prevents all off
670                // by one problems that may occur otherwise.
671                self.start_slot = end_slot;
672
673                if end_slot == self.immutable_tip_slot {
674                    break;
675                }
676            }
677        }
678    }
679
680    /// Check if the requested range has already been indexed.
681    /// If it hasn't just return the slots as points.
682    /// If it has, return a subset that hasn't been indexed if any, or None if its been
683    /// completely indexed already.
684    fn get_syncable_range(&self, start: Slot, end: Slot) -> Option<(Point, Point)> {
685        for sync_block in &self.sync_status {
686            // Check if we start within a previously synchronized block.
687            if start >= sync_block.start_slot && start <= sync_block.end_slot {
688                // Check if we are fully contained by the sync block, if so, nothing to sync.
689                if end <= sync_block.end_slot {
690                    return None;
691                }
692
693                // In theory, we could extend into another sync block, but because we could extend
694                // into an unbounded number of sync blocks, we would need to bust
695                // this range into an unbounded number of sub chunks.
696                // It is not a problem to sync the same data mutiple times, so for simplicity we do
697                // not account for this, if the requested range goes beyond the sync
698                // block it starts within we assume that the rest is not synced.
699                return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
700            }
701        }
702
703        let start_slot = if start == 0.into() {
704            Point::ORIGIN
705        } else {
706            Point::fuzzy(start)
707        };
708
709        Some((start_slot, Point::fuzzy(end)))
710    }
711}
712
713impl event::EventTarget<event::ChainIndexerEvent> for SyncTask {
714    fn add_event_listener(&mut self, listener: event::EventListenerFn<event::ChainIndexerEvent>) {
715        let mut rx = self.event_channel.0.subscribe();
716        tokio::spawn(async move {
717            loop {
718                match rx.recv().await {
719                    Ok(event) => (listener)(&event),
720                    Err(broadcast::error::RecvError::Lagged(lag)) => {
721                        debug!(lag = lag, "Sync tasks event listener lagged");
722                    },
723                    Err(broadcast::error::RecvError::Closed) => break,
724                }
725            }
726        });
727    }
728
729    fn dispatch_event(&self, message: event::ChainIndexerEvent) {
730        debug!(event = ?message, "Chain Indexer Event");
731        let _ = self.event_channel.0.send(message);
732    }
733}
734
735/// Start followers as per defined in the config
736pub(crate) async fn start_followers() -> anyhow::Result<()> {
737    let cfg = Settings::follower_cfg();
738
739    // Log the chain follower configuration.
740    cfg.log();
741
742    // Start Syncing the blockchain, so we can consume its data as required.
743    start_sync_for(&cfg).await?;
744
745    tokio::spawn(async move {
746        use self::event::ChainIndexerEvent as Event;
747        use crate::metrics::chain_indexer::reporter;
748
749        let api_host_names = Settings::api_host_names().join(",");
750        let service_id = Settings::service_id();
751        let network = cfg.chain.to_string();
752
753        let mut sync_task = SyncTask::new(cfg);
754
755        // add an event listener to watch for certain events to report as metrics
756        sync_task.add_event_listener(Box::new(move |event: &Event| {
757            if let Event::SyncLiveChainStarted = event {
758                reporter::LIVE_REACHED_TIP
759                    .with_label_values(&[&api_host_names, service_id, &network])
760                    .set(0);
761            }
762            if let Event::SyncImmutableChainStarted = event {
763                reporter::IMMUTABLE_REACHED_TIP
764                    .with_label_values(&[&api_host_names, service_id, &network])
765                    .set(0);
766            }
767            if let Event::SyncLiveChainCompleted = event {
768                reporter::LIVE_REACHED_TIP
769                    .with_label_values(&[&api_host_names, service_id, &network])
770                    .set(1);
771            }
772            if let Event::SyncImmutableChainCompleted = event {
773                reporter::IMMUTABLE_REACHED_TIP
774                    .with_label_values(&[&api_host_names, service_id, &network])
775                    .set(1);
776            }
777            if let Event::SyncTasksChanged { current_sync_tasks } = event {
778                reporter::RUNNING_INDEXER_TASKS_COUNT
779                    .with_label_values(&[&api_host_names, service_id, &network])
780                    .set(From::from(*current_sync_tasks));
781            }
782            if let Event::LiveTipSlotChanged {
783                live_slot,
784                immutable_slot,
785            } = event
786            {
787                reporter::CURRENT_LIVE_TIP_SLOT
788                    .with_label_values(&[&api_host_names, service_id, &network])
789                    .set(i64::try_from(u64::from(*live_slot)).unwrap_or(-1));
790                reporter::SLOT_TIP_DIFF
791                    .with_label_values(&[&api_host_names, service_id, &network])
792                    .set(
793                        u64::from(live_slot.sub(*immutable_slot))
794                            .try_into()
795                            .unwrap_or(-1),
796                    );
797            }
798            if let Event::ImmutableTipSlotChanged {
799                live_slot,
800                immutable_slot,
801            } = event
802            {
803                reporter::CURRENT_IMMUTABLE_TIP_SLOT
804                    .with_label_values(&[&api_host_names, service_id, &network])
805                    .set(i64::try_from(u64::from(*immutable_slot)).unwrap_or(-1));
806
807                reporter::SLOT_TIP_DIFF
808                    .with_label_values(&[&api_host_names, service_id, &network])
809                    .set(
810                        u64::from(live_slot.sub(*immutable_slot))
811                            .try_into()
812                            .unwrap_or(-1),
813                    );
814            }
815            if let Event::IndexedSlotProgressed { slot } = event {
816                reporter::HIGHEST_COMPLETE_INDEXED_SLOT
817                    .with_label_values(&[&api_host_names, service_id, &network])
818                    .set(i64::try_from(u64::from(*slot)).unwrap_or(-1));
819            }
820            if let Event::BackwardDataPurged = event {
821                reporter::TRIGGERED_BACKWARD_PURGES_COUNT
822                    .with_label_values(&[&api_host_names, service_id, &network])
823                    .inc();
824            }
825            if let Event::ForwardDataPurged { purge_slots } = event {
826                reporter::TRIGGERED_FORWARD_PURGES_COUNT
827                    .with_label_values(&[&api_host_names, service_id, &network])
828                    .inc();
829                reporter::PURGED_SLOTS
830                    .with_label_values(&[&api_host_names, service_id, &network])
831                    .set(i64::try_from(*purge_slots).unwrap_or(-1));
832            }
833        }));
834
835        sync_task.run().await;
836    });
837
838    Ok(())
839}