cat_gateway/db/index/queries/sync_status/
update.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//! Read and write the synchronisation status.

use std::{sync::Arc, time::SystemTime};

use row::SyncStatusQueryParams;
use scylla::{frame::value::CqlTimestamp, prepared_statement::PreparedStatement, Session};
use tokio::task;
use tracing::{error, warn};

use crate::{
    db::index::{
        queries::{PreparedQueries, PreparedUpsertQuery},
        session::CassandraSession,
    },
    service::utilities::convert::from_saturating,
    settings::Settings,
};

/// Insert Sync Status query string.
const INSERT_SYNC_STATUS_QUERY: &str = include_str!("../cql/insert_sync_status.cql");

/// Sync Status Row Record Module
pub(super) mod row {
    use scylla::{frame::value::CqlTimestamp, DeserializeRow, SerializeRow};

    /// Sync Status Record Row (used for both Insert and Query response)
    #[derive(SerializeRow, DeserializeRow, Debug)]
    pub(crate) struct SyncStatusQueryParams {
        /// End Slot.
        pub(crate) end_slot: num_bigint::BigInt,
        /// Start Slot.
        pub(crate) start_slot: num_bigint::BigInt,
        /// Sync Time.
        pub(crate) sync_time: CqlTimestamp,
        /// Node ID
        pub(crate) node_id: String,
    }
}

impl SyncStatusQueryParams {
    /// Create a new instance of [`SyncStatusQueryParams`]
    pub(crate) fn new(end_slot: u64, start_slot: u64) -> Self {
        let sync_time = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
            Ok(now) => now.as_millis(),
            Err(_) => 0, // Shouldn't actually happen.
        };

        Self {
            end_slot: end_slot.into(),
            start_slot: start_slot.into(),
            sync_time: CqlTimestamp(from_saturating(sync_time)),
            node_id: Settings::service_id().to_owned(),
        }
    }
}

/// Sync Status Insert query.
pub(crate) struct SyncStatusInsertQuery;

impl SyncStatusInsertQuery {
    /// Prepares a Sync Status Insert query.
    pub(crate) async fn prepare(session: Arc<Session>) -> anyhow::Result<PreparedStatement> {
        let sync_status_insert_query = PreparedQueries::prepare(
            session,
            INSERT_SYNC_STATUS_QUERY,
            scylla::statement::Consistency::All,
            true,
        )
        .await;

        if let Err(ref error) = sync_status_insert_query {
            error!(error=%error, "Failed to prepare get Sync Status Insert query.");
        };

        sync_status_insert_query
    }

    /// Executes a sync status insert query.
    pub(crate) async fn execute(
        session: &CassandraSession, params: SyncStatusQueryParams,
    ) -> anyhow::Result<()> {
        session
            .execute_upsert(PreparedUpsertQuery::SyncStatusInsert, params)
            .await
    }
}

/// Update the sync status of the immutable database.
///
/// Note: There is no need to update the sync status of the volatile database.
///
/// Regarding failures:
/// Failures of this function to record status, fail safely.
/// This data is only used to recover sync
/// There fore this function is both fire and forget, and returns no status.
pub(crate) fn update_sync_status(end_slot: u64, start_slot: u64) {
    task::spawn(async move {
        let Some(session) = CassandraSession::get(true) else {
            warn!(
                start_slot = start_slot,
                end_slot = end_slot,
                "Failed to get Cassandra Session, trying to record indexing status"
            );
            return;
        };

        if let Err(err) = SyncStatusInsertQuery::execute(
            &session,
            SyncStatusQueryParams::new(end_slot, start_slot),
        )
        .await
        {
            warn!(
                error=%err,
                start_slot = start_slot,
                end_slot = end_slot,
                "Failed to store Sync Status"
            );
        };
    });
}