cat_gateway/db/index/queries/sync_status/
get.rs

1//! Get Sync Status query
2
3use cardano_blockchain_types::Slot;
4use futures::stream::StreamExt;
5use tracing::{debug, warn};
6
7use super::update::row::SyncStatusQueryParams;
8use crate::{db::index::session::CassandraSession, service::utilities::convert::from_saturating};
9
10/// Get TXI query string.
11const GET_SYNC_STATUS: &str = include_str!("../cql/get_sync_status.cql");
12
13/// Clean Sync Status Response
14#[derive(PartialEq, Debug)]
15pub(crate) struct SyncStatus {
16    /// End Slot.
17    pub(crate) end_slot: Slot,
18    /// Start Slot.
19    pub(crate) start_slot: Slot,
20    /// Sync Time.
21    pub(crate) sync_time: u64,
22    /// Node ID
23    pub(crate) node_id: String,
24}
25
26/// Merge consecutive sync records, to make processing them easier.
27fn merge_consecutive_sync_records(mut synced_chunks: Vec<SyncStatus>) -> Vec<SyncStatus> {
28    // Sort the chunks by the starting key, if the ending key overlaps, we will deal with that
29    // during the merge.
30    synced_chunks.sort_by_key(|rec| rec.start_slot);
31
32    let mut best_sync: Vec<SyncStatus> = vec![];
33    let mut current_status: Option<SyncStatus> = None;
34    for rec in synced_chunks {
35        if let Some(current) = current_status.take() {
36            if rec.start_slot >= current.start_slot && rec.end_slot <= current.end_slot {
37                // The new record is contained fully within the previous one.
38                // We will ignore the new record and use the previous one instead.
39                current_status = Some(current);
40            } else if rec.start_slot <= u64::from(current.end_slot).saturating_add(1).into() {
41                // Either overlaps, or is directly consecutive.
42                // But not fully contained within the previous one.
43                current_status = Some(SyncStatus {
44                    end_slot: rec.end_slot,
45                    start_slot: current.start_slot,
46                    sync_time: rec.sync_time.max(current.sync_time),
47                    node_id: rec.node_id,
48                });
49            } else {
50                // Not consecutive, so store it.
51                // And set a new current one.
52                best_sync.push(current);
53                current_status = Some(rec);
54            }
55        } else {
56            current_status = Some(rec);
57        }
58    }
59    // Could have the final one in current still, so store it
60    if let Some(current) = current_status.take() {
61        best_sync.push(current);
62    }
63
64    best_sync
65}
66
67/// Get the sync status.
68///
69/// Note: This only happens once when a node starts.  So there is no need to prepare it.
70/// It is also only ever run on the persistent database.
71///
72/// Regarding failures:
73/// Failures of this function will simply cause the node to re-sync which is non fatal.
74pub(crate) async fn get_sync_status() -> Vec<SyncStatus> {
75    let mut synced_chunks: Vec<SyncStatus> = vec![];
76
77    let Some(session) = CassandraSession::get(true) else {
78        warn!("Failed to get Cassandra Session, trying to get current indexing status");
79        return synced_chunks;
80    };
81
82    // Get the raw underlying session, so we can do an unprepared simple query.
83    let session = session.get_raw_session();
84
85    let mut results = match session.query_iter(GET_SYNC_STATUS, ()).await {
86        Ok(result) => {
87            match result.rows_stream::<SyncStatusQueryParams>() {
88                Ok(result) => result,
89                Err(err) => {
90                    warn!(error=%err, "Failed to get sync status results from query.");
91                    return synced_chunks;
92                },
93            }
94        },
95        Err(err) => {
96            warn!(error=%err, "Failed to get sync status results from query.");
97            return synced_chunks;
98        },
99    };
100
101    // Get all the sync records, and de-cassandra-ize the values
102    while let Some(next_row) = results.next().await {
103        match next_row {
104            Err(err) => warn!(error=%err, "Failed to deserialize sync status results from query."),
105            Ok(row) => {
106                debug!("Sync Status:  {:?}", row);
107                synced_chunks.push(SyncStatus {
108                    end_slot: row.end_slot.into(),
109                    start_slot: row.start_slot.into(),
110                    sync_time: from_saturating(row.sync_time.0),
111                    node_id: row.node_id,
112                });
113            },
114        }
115    }
116
117    merge_consecutive_sync_records(synced_chunks)
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    #[test]
125    /// This test checks we can properly merge sync status chunks.
126    fn test_sync_merge() {
127        // Add some test records, out of order.
128        // Two mergeable groups
129        let synced_chunks: Vec<SyncStatus> = vec![
130            SyncStatus {
131                end_slot: 200_000.into(),
132                start_slot: 112_001.into(),
133                sync_time: 1_200_000,
134                node_id: "test-node-1".to_string(),
135            },
136            SyncStatus {
137                end_slot: 12000.into(),
138                start_slot: 0.into(),
139                sync_time: 100_100,
140                node_id: "test-node-1".to_string(),
141            },
142            SyncStatus {
143                end_slot: 99000.into(),
144                start_slot: 56789.into(),
145                sync_time: 200_000,
146                node_id: "test-node-2".to_string(),
147            },
148            SyncStatus {
149                end_slot: 112_000.into(),
150                start_slot: 100_000.into(),
151                sync_time: 1_100_100,
152                node_id: "test-node-1".to_string(),
153            },
154            SyncStatus {
155                end_slot: 56789.into(),
156                start_slot: 12300.into(),
157                sync_time: 200_000,
158                node_id: "test-node-2".to_string(),
159            },
160            SyncStatus {
161                end_slot: 12345.into(),
162                start_slot: 0.into(),
163                sync_time: 100_000,
164                node_id: "test-node-1".to_string(),
165            },
166        ];
167
168        let merged_syncs_status = merge_consecutive_sync_records(synced_chunks);
169
170        // Expected result
171        let expected: &[SyncStatus] = &[
172            SyncStatus {
173                end_slot: 99000.into(),
174                start_slot: 0.into(),
175                sync_time: 200_000,
176                node_id: "test-node-2".to_string(),
177            },
178            SyncStatus {
179                end_slot: 200_000.into(),
180                start_slot: 100_000.into(),
181                sync_time: 1_200_000,
182                node_id: "test-node-1".to_string(),
183            },
184        ];
185
186        assert_eq!(merged_syncs_status.as_slice(), expected);
187    }
188}