cat_gateway/cardano/
mod.rs

1//! Logic for orchestrating followers
2
3use std::{fmt::Display, sync::Arc, time::Duration};
4
5use cardano_blockchain_types::{Network, Point, Slot};
6use cardano_chain_follower::{ChainFollower, ChainSyncConfig};
7use duration_string::DurationString;
8use futures::{stream::FuturesUnordered, StreamExt};
9use rand::{Rng, SeedableRng};
10use tracing::{debug, error, info, warn};
11
12use crate::{
13    db::index::{
14        block::{index_block, roll_forward},
15        queries::sync_status::{
16            get::{get_sync_status, SyncStatus},
17            update::update_sync_status,
18        },
19        session::CassandraSession,
20    },
21    settings::{chain_follower, Settings},
22};
23
24// pub(crate) mod cip36_registration_obsolete;
25pub(crate) mod util;
26
27/// How long we wait between checks for connection to the indexing DB to be ready.
28const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
29
30/// Start syncing a particular network
31async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
32    let chain = cfg.chain;
33    let dl_config = cfg.dl_config.clone();
34
35    let mut cfg = ChainSyncConfig::default_for(chain);
36    cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
37    info!(chain = %chain, "Starting Blockchain Sync");
38
39    if let Err(error) = cfg.run().await {
40        error!(chain=%chain, error=%error, "Failed to start chain sync task");
41        Err(error)?;
42    }
43
44    Ok(())
45}
46
47/// Data we return from a sync task.
48#[derive(Clone)]
49struct SyncParams {
50    /// What blockchain are we syncing.
51    chain: Network,
52    /// The starting point of this sync.
53    start: Point,
54    /// The ending point of this sync.
55    end: Point,
56    /// The first block we successfully synced.
57    first_indexed_block: Option<Point>,
58    /// Is the starting point immutable? (True = immutable, false = don't know.)
59    first_is_immutable: bool,
60    /// The last block we successfully synced.
61    last_indexed_block: Option<Point>,
62    /// Is the ending point immutable? (True = immutable, false = don't know.)
63    last_is_immutable: bool,
64    /// The number of blocks we successfully synced overall.
65    total_blocks_synced: u64,
66    /// The number of blocks we successfully synced, in the last attempt.
67    last_blocks_synced: u64,
68    /// The number of retries so far on this sync task.
69    retries: u64,
70    /// The number of retries so far on this sync task.
71    backoff_delay: Option<Duration>,
72    /// If the sync completed without error or not.
73    result: Arc<Option<anyhow::Result<()>>>,
74    /// Chain follower roll forward.
75    follower_roll_forward: Option<Point>,
76}
77
78impl Display for SyncParams {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        if self.result.is_none() {
81            write!(f, "Sync_Params {{ ")?;
82        } else {
83            write!(f, "Sync_Result {{ ")?;
84        }
85
86        write!(f, "start: {}, end: {}", self.start, self.end)?;
87
88        if let Some(first) = self.first_indexed_block.as_ref() {
89            write!(
90                f,
91                ", first_indexed_block: {first}{}",
92                if self.first_is_immutable { ":I" } else { "" }
93            )?;
94        }
95
96        if let Some(last) = self.last_indexed_block.as_ref() {
97            write!(
98                f,
99                ", last_indexed_block: {last}{}",
100                if self.last_is_immutable { ":I" } else { "" }
101            )?;
102        }
103
104        if self.retries > 0 {
105            write!(f, ", retries: {}", self.retries)?;
106        }
107
108        if self.retries > 0 || self.result.is_some() {
109            write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
110        }
111
112        if self.result.is_some() {
113            write!(f, ", last_sync: {}", self.last_blocks_synced)?;
114        }
115
116        if let Some(backoff) = self.backoff_delay.as_ref() {
117            write!(f, ", backoff: {}", DurationString::from(*backoff))?;
118        }
119
120        if let Some(result) = self.result.as_ref() {
121            match result {
122                Ok(()) => write!(f, ", Success")?,
123                Err(error) => write!(f, ", {error}")?,
124            };
125        }
126
127        f.write_str(" }")
128    }
129}
130
131/// The range we generate random backoffs within given a base backoff value.
132const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
133
134impl SyncParams {
135    /// Create a new `SyncParams`.
136    fn new(chain: Network, start: Point, end: Point) -> Self {
137        Self {
138            chain,
139            start,
140            end,
141            first_indexed_block: None,
142            first_is_immutable: false,
143            last_indexed_block: None,
144            last_is_immutable: false,
145            total_blocks_synced: 0,
146            last_blocks_synced: 0,
147            retries: 0,
148            backoff_delay: None,
149            result: Arc::new(None),
150            follower_roll_forward: None,
151        }
152    }
153
154    /// Convert a result back into parameters for a retry.
155    fn retry(&self) -> Self {
156        let retry_count = self.retries.saturating_add(1);
157
158        let mut backoff = None;
159
160        // If we did sync any blocks last time, first retry is immediate.
161        // Otherwise we backoff progressively more as we do more retries.
162        if self.last_blocks_synced == 0 {
163            // Calculate backoff based on number of retries so far.
164            backoff = match retry_count {
165                1 => Some(Duration::from_secs(1)),     // 1-3 seconds
166                2..5 => Some(Duration::from_secs(10)), // 10-30 seconds
167                _ => Some(Duration::from_secs(30)),    // 30-90 seconds.
168            };
169        }
170
171        let mut retry = self.clone();
172        retry.last_blocks_synced = 0;
173        retry.retries = retry_count;
174        retry.backoff_delay = backoff;
175        retry.result = Arc::new(None);
176        retry.follower_roll_forward = None;
177
178        retry
179    }
180
181    /// Convert Params into the result of the sync.
182    fn done(
183        &self, first: Option<Point>, first_immutable: bool, last: Option<Point>,
184        last_immutable: bool, synced: u64, result: anyhow::Result<()>,
185    ) -> Self {
186        if result.is_ok() && first_immutable && last_immutable {
187            // Update sync status in the Immutable DB.
188            // Can fire and forget, because failure to update DB will simply cause the chunk to be
189            // re-indexed, on recovery.
190            update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
191        }
192
193        let mut done = self.clone();
194        done.first_indexed_block = first;
195        done.first_is_immutable = first_immutable;
196        done.last_indexed_block = last;
197        done.last_is_immutable = last_immutable;
198        done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
199        done.last_blocks_synced = synced;
200
201        done.result = Arc::new(Some(result));
202
203        done
204    }
205
206    /// Get where this sync run actually needs to start from.
207    fn actual_start(&self) -> Point {
208        self.last_indexed_block
209            .as_ref()
210            .unwrap_or(&self.start)
211            .clone()
212    }
213
214    /// Do the backoff delay processing.
215    ///
216    /// The actual delay is a random time from the Delay itself to
217    /// `BACKOFF_RANGE_MULTIPLIER` times the delay. This is to prevent hammering the
218    /// service at regular intervals.
219    async fn backoff(&self) {
220        if let Some(backoff) = self.backoff_delay {
221            let mut rng = rand::rngs::StdRng::from_entropy();
222            let actual_backoff =
223                rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
224
225            tokio::time::sleep(actual_backoff).await;
226        }
227    }
228}
229
230/// Sync a portion of the blockchain.
231/// Set end to `Point::TIP` to sync the tip continuously.
232fn sync_subchain(params: SyncParams) -> tokio::task::JoinHandle<SyncParams> {
233    tokio::spawn(async move {
234        info!(chain = %params.chain, params=%params, "Indexing Blockchain");
235
236        // Backoff hitting the database if we need to.
237        params.backoff().await;
238
239        // Wait for indexing DB to be ready before continuing.
240        drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
241        info!(chain=%params.chain, params=%params,"Indexing DB is ready");
242
243        let mut first_indexed_block = params.first_indexed_block.clone();
244        let mut first_immutable = params.first_is_immutable;
245        let mut last_indexed_block = params.last_indexed_block.clone();
246        let mut last_immutable = params.last_is_immutable;
247        let mut blocks_synced = 0u64;
248
249        let mut follower =
250            ChainFollower::new(params.chain, params.actual_start(), params.end.clone()).await;
251        while let Some(chain_update) = follower.next().await {
252            match chain_update.kind {
253                cardano_chain_follower::Kind::ImmutableBlockRollForward => {
254                    // We only process these on the follower tracking the TIP.
255                    if params.end == Point::TIP {
256                        // What we need to do here is tell the primary follower to start a new sync
257                        // for the new immutable data, and then purge the volatile database of the
258                        // old data (after the immutable data has synced).
259                        info!(chain=%params.chain, "Immutable chain rolled forward.");
260                        let mut result = params.done(
261                            first_indexed_block,
262                            first_immutable,
263                            last_indexed_block,
264                            last_immutable,
265                            blocks_synced,
266                            Ok(()),
267                        );
268                        // Signal the point the immutable chain rolled forward to.
269                        result.follower_roll_forward = Some(chain_update.block_data().point());
270                        return result;
271                    };
272                },
273                cardano_chain_follower::Kind::Block => {
274                    let block = chain_update.block_data();
275
276                    if let Err(error) = index_block(block).await {
277                        let error_msg = format!("Failed to index block {}", block.point());
278                        error!(chain=%params.chain, error=%error, params=%params, error_msg);
279                        return params.done(
280                            first_indexed_block,
281                            first_immutable,
282                            last_indexed_block,
283                            last_immutable,
284                            blocks_synced,
285                            Err(error.context(error_msg)),
286                        );
287                    }
288
289                    last_immutable = block.is_immutable();
290                    last_indexed_block = Some(block.point());
291
292                    if first_indexed_block.is_none() {
293                        first_immutable = last_immutable;
294                        first_indexed_block = Some(block.point());
295                    }
296                    blocks_synced = blocks_synced.saturating_add(1);
297                },
298                cardano_chain_follower::Kind::Rollback => {
299                    warn!("TODO: Live Chain rollback");
300                    // What we need to do here, is purge the live DB of records after the
301                    // rollback point.  We need to complete this operation here
302                    // before we keep syncing the live chain.
303                },
304            }
305        }
306
307        let result = params.done(
308            first_indexed_block,
309            first_immutable,
310            last_indexed_block,
311            last_immutable,
312            blocks_synced,
313            Ok(()),
314        );
315
316        info!(chain = %params.chain, result=%result, "Indexing Blockchain Completed: OK");
317
318        result
319    })
320}
321
322/// The synchronisation task, and its state.
323/// There should ONLY ever be one of these at any time.
324struct SyncTask {
325    /// Chain follower configuration.
326    cfg: chain_follower::EnvVars,
327
328    /// The current running sync tasks.
329    sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
330
331    /// // How many immutable chain follower sync tasks we are running.
332    current_sync_tasks: u16,
333
334    /// Start for the next block we would sync.
335    start_slot: Slot,
336
337    /// The immutable tip slot.
338    immutable_tip_slot: Slot,
339
340    /// The live tip slot.
341    live_tip_slot: Slot,
342
343    /// Current Sync Status
344    sync_status: Vec<SyncStatus>,
345}
346
347impl SyncTask {
348    /// Create a new `SyncTask`.
349    fn new(cfg: chain_follower::EnvVars) -> SyncTask {
350        SyncTask {
351            cfg,
352            sync_tasks: FuturesUnordered::new(),
353            start_slot: 0.into(),
354            current_sync_tasks: 0,
355            immutable_tip_slot: 0.into(),
356            live_tip_slot: 0.into(),
357            sync_status: Vec::new(),
358        }
359    }
360
361    /// Primary Chain Follower task.
362    ///
363    /// This continuously runs in the background, and never terminates.
364    async fn run(&mut self) {
365        // We can't sync until the local chain data is synced.
366        // This call will wait until we sync.
367        let tips = ChainFollower::get_tips(self.cfg.chain).await;
368        self.immutable_tip_slot = tips.0.slot_or_default();
369        self.live_tip_slot = tips.1.slot_or_default();
370        info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Blockchain ready to sync from.");
371
372        // Wait for indexing DB to be ready before continuing.
373        // We do this after the above, because other nodes may have finished already, and we don't
374        // want to wait do any work they already completed while we were fetching the blockchain.
375        drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
376        info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state");
377        self.sync_status = get_sync_status().await;
378        debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
379
380        // Start the Live Chain sync task - This can never end because it is syncing to TIP.
381        // So, if it fails, it will automatically be restarted.
382        self.sync_tasks.push(sync_subchain(SyncParams::new(
383            self.cfg.chain,
384            Point::fuzzy(self.immutable_tip_slot),
385            Point::TIP,
386        )));
387
388        self.start_immutable_followers();
389
390        // Wait Sync tasks to complete.  If they fail and have not completed, reschedule them.
391        // If an immutable sync task ends OK, and we still have immutable data to sync then
392        // start a new task.
393        // They will return from this iterator in the order they complete.
394        // This iterator actually never ends, because the live sync task is always restarted.
395        while let Some(completed) = self.sync_tasks.next().await {
396            match completed {
397                Ok(finished) => {
398                    // Sync task finished.  Check if it completed OK or had an error.
399                    // If it failed, we need to reschedule it.
400
401                    // The TIP follower should NEVER end, unless there is an immutable roll forward,
402                    // or there is an error.  If this is not a roll forward, log an error.
403                    // It can fail if the index DB goes down in some way.
404                    // Restart it always.
405                    if finished.end == Point::TIP {
406                        if let Some(ref roll_forward_point) = finished.follower_roll_forward {
407                            // Advance the known immutable tip, and try and start followers to reach
408                            // it.
409                            self.immutable_tip_slot = roll_forward_point.slot_or_default();
410                            self.start_immutable_followers();
411                        } else {
412                            error!(chain=%self.cfg.chain, report=%finished,
413                            "The TIP follower failed, restarting it.");
414                        }
415
416                        // Start the Live Chain sync task again from where it left off.
417                        self.sync_tasks.push(sync_subchain(finished.retry()));
418                    } else if let Some(result) = finished.result.as_ref() {
419                        match result {
420                            Ok(()) => {
421                                self.current_sync_tasks =
422                                    self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
423                                        error!("current_sync_tasks -= 1 overflow");
424                                        0
425                                    });
426                                info!(chain=%self.cfg.chain, report=%finished,
427                                    "The Immutable follower completed successfully.");
428
429                                // If we need more immutable chain followers to sync the block
430                                // chain, we can now start them.
431                                self.start_immutable_followers();
432                            },
433                            Err(error) => {
434                                error!(chain=%self.cfg.chain, report=%finished, error=%error,
435                                    "An Immutable follower failed, restarting it.");
436                                // Restart the Immutable Chain sync task again from where it left
437                                // off.
438                                self.sync_tasks.push(sync_subchain(finished.retry()));
439                            },
440                        }
441                    } else {
442                        error!(chain=%self.cfg.chain, report=%finished,
443                                 "BUG: The Immutable follower completed, but without a proper result.");
444                    }
445                },
446                Err(error) => {
447                    error!(chain=%self.cfg.chain, error=%error, "BUG: Sync task failed. Can not restart it, not enough information.  Sync is probably failed at this point.");
448                },
449            }
450
451            // IF there is only 1 chain follower left in sync_tasks, then all
452            // immutable followers have finished.
453            // When this happens we need to purge the live index of any records that exist
454            // before the current immutable tip.
455            // Note: to prevent a data race when multiple nodes are syncing, we probably
456            // want to put a gap in this, so that there are X slots of overlap
457            // between the live chain and immutable chain.  This gap should be
458            // a parameter.
459            if self.sync_tasks.len() == 1 {
460                if let Err(error) = roll_forward::purge_live_index(self.immutable_tip_slot).await {
461                    error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
462                }
463            }
464        }
465
466        error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped.  This is an unexpected error!");
467    }
468
469    /// Start immutable followers, if we can
470    fn start_immutable_followers(&mut self) {
471        // Start the Immutable Chain sync tasks, as required.
472        // We will start at most the number of configured sync tasks.
473        // The live chain sync task is not counted as a sync task for this config value.
474
475        // Nothing to do if the start_slot is not less than the end of the immutable chain.
476        if self.start_slot < self.immutable_tip_slot {
477            // Will also break if there are no more slots left to sync.
478            while self.current_sync_tasks < self.cfg.sync_tasks {
479                let end_slot = self.immutable_tip_slot.min(
480                    (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
481                        .into(),
482                );
483
484                if let Some((first_point, last_point)) =
485                    self.get_syncable_range(self.start_slot, end_slot)
486                {
487                    self.sync_tasks.push(sync_subchain(SyncParams::new(
488                        self.cfg.chain,
489                        first_point,
490                        last_point.clone(),
491                    )));
492                    self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
493                }
494
495                // The one slot overlap is deliberate, it doesn't hurt anything and prevents all off
496                // by one problems that may occur otherwise.
497                self.start_slot = end_slot;
498
499                if end_slot == self.immutable_tip_slot {
500                    break;
501                }
502            }
503            // `start_slot` is still used, because it is used to keep syncing chunks as required
504            // while each immutable sync task finishes.
505            info!(chain=%self.cfg.chain, tasks=self.current_sync_tasks, until=?self.start_slot, "Persistent Indexing DB tasks started");
506        }
507    }
508
509    /// Check if the requested range has already been indexed.
510    /// If it hasn't just return the slots as points.
511    /// If it has, return a subset that hasn't been indexed if any, or None if its been
512    /// completely indexed already.
513    fn get_syncable_range(&self, start: Slot, end: Slot) -> Option<(Point, Point)> {
514        for sync_block in &self.sync_status {
515            // Check if we start within a previously synchronized block.
516            if start >= sync_block.start_slot && start <= sync_block.end_slot {
517                // Check if we are fully contained by the sync block, if so, nothing to sync.
518                if end <= sync_block.end_slot {
519                    return None;
520                }
521
522                // In theory, we could extend into another sync block, but because we could extend
523                // into an unbounded number of sync blocks, we would need to bust
524                // this range into an unbounded number of sub chunks.
525                // It is not a problem to sync the same data mutiple times, so for simplicity we do
526                // not account for this, if the requested range goes beyond the sync
527                // block it starts within we assume that the rest is not synced.
528                return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
529            }
530        }
531
532        let start_slot = if start == 0.into() {
533            Point::ORIGIN
534        } else {
535            Point::fuzzy(start)
536        };
537
538        Some((start_slot, Point::fuzzy(end)))
539    }
540}
541
542/// Start followers as per defined in the config
543pub(crate) async fn start_followers() -> anyhow::Result<()> {
544    let cfg = Settings::follower_cfg();
545
546    // Log the chain follower configuration.
547    cfg.log();
548
549    // Start Syncing the blockchain, so we can consume its data as required.
550    start_sync_for(&cfg).await?;
551    info!(chain=%cfg.chain,"Chain Sync is started.");
552
553    tokio::spawn(async move {
554        let mut sync_task = SyncTask::new(cfg);
555        sync_task.run().await;
556    });
557
558    Ok(())
559}