cat_gateway/db/index/queries/sync_status/
update.rs

1//! Read and write the synchronisation status.
2
3use std::{sync::Arc, time::SystemTime};
4
5use cardano_blockchain_types::Slot;
6use row::SyncStatusQueryParams;
7use scylla::{frame::value::CqlTimestamp, prepared_statement::PreparedStatement, Session};
8use tokio::task;
9use tracing::{error, warn};
10
11use crate::{
12    db::index::{
13        queries::{PreparedQueries, PreparedUpsertQuery},
14        session::CassandraSession,
15    },
16    service::utilities::convert::from_saturating,
17    settings::Settings,
18};
19
20/// Insert Sync Status query string.
21const INSERT_SYNC_STATUS_QUERY: &str = include_str!("../cql/insert_sync_status.cql");
22
23/// Sync Status Row Record Module
24pub(crate) mod row {
25    use scylla::{frame::value::CqlTimestamp, DeserializeRow, SerializeRow};
26
27    use crate::db::types::DbSlot;
28
29    /// Sync Status Record Row (used for both Insert and Query response)
30    #[derive(SerializeRow, DeserializeRow, Debug)]
31    pub(crate) struct SyncStatusQueryParams {
32        /// End Slot.
33        pub(crate) end_slot: DbSlot,
34        /// Start Slot.
35        pub(crate) start_slot: DbSlot,
36        /// Sync Time.
37        pub(crate) sync_time: CqlTimestamp,
38        /// Node ID
39        pub(crate) node_id: String,
40    }
41}
42
43impl SyncStatusQueryParams {
44    /// Create a new instance of [`SyncStatusQueryParams`]
45    pub(crate) fn new(end_slot: Slot, start_slot: Slot) -> Self {
46        let sync_time = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
47            Ok(now) => now.as_millis(),
48            Err(_) => 0, // Shouldn't actually happen.
49        };
50
51        Self {
52            end_slot: end_slot.into(),
53            start_slot: start_slot.into(),
54            sync_time: CqlTimestamp(from_saturating(sync_time)),
55            node_id: Settings::service_id().to_owned(),
56        }
57    }
58}
59
60/// Sync Status Insert query.
61pub(crate) struct SyncStatusInsertQuery;
62
63impl SyncStatusInsertQuery {
64    /// Prepares a Sync Status Insert query.
65    pub(crate) async fn prepare(session: Arc<Session>) -> anyhow::Result<PreparedStatement> {
66        PreparedQueries::prepare(
67            session,
68            INSERT_SYNC_STATUS_QUERY,
69            scylla::statement::Consistency::All,
70            true,
71        )
72        .await
73        .inspect_err(
74            |error| error!(error=%error, "Failed to prepare get Sync Status Insert query."),
75        )
76        .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_SYNC_STATUS_QUERY}"))
77    }
78
79    /// Executes a sync status insert query.
80    pub(crate) async fn execute(
81        session: &CassandraSession, params: SyncStatusQueryParams,
82    ) -> anyhow::Result<()> {
83        session
84            .execute_upsert(PreparedUpsertQuery::SyncStatusInsert, params)
85            .await
86    }
87}
88
89/// Update the sync status of the immutable database.
90///
91/// Note: There is no need to update the sync status of the volatile database.
92///
93/// Regarding failures:
94/// Failures of this function to record status, fail safely.
95/// This data is only used to recover sync
96/// There fore this function is both fire and forget, and returns no status.
97pub(crate) fn update_sync_status(end_slot: Slot, start_slot: Slot) {
98    task::spawn(async move {
99        let Some(session) = CassandraSession::get(true) else {
100            warn!(
101                start_slot = ?start_slot,
102                end_slot = ?end_slot,
103                "Failed to get Cassandra Session, trying to record indexing status"
104            );
105            return;
106        };
107
108        if let Err(err) = SyncStatusInsertQuery::execute(
109            &session,
110            SyncStatusQueryParams::new(end_slot, start_slot),
111        )
112        .await
113        {
114            warn!(
115                error=%err,
116                start_slot = ?start_slot,
117                end_slot = ?end_slot,
118                "Failed to store Sync Status"
119            );
120        };
121    });
122}