cat_gateway/db/index/queries/sync_status/
get.rs1use cardano_blockchain_types::Slot;
4use futures::stream::StreamExt;
5use tracing::{debug, warn};
6
7use super::update::row::SyncStatusQueryParams;
8use crate::{db::index::session::CassandraSession, service::utilities::convert::from_saturating};
9
10const GET_SYNC_STATUS: &str = include_str!("../cql/get_sync_status.cql");
12
13#[derive(PartialEq, Debug)]
15pub(crate) struct SyncStatus {
16 pub(crate) end_slot: Slot,
18 pub(crate) start_slot: Slot,
20 pub(crate) sync_time: u64,
22 pub(crate) node_id: String,
24}
25
26fn merge_consecutive_sync_records(mut synced_chunks: Vec<SyncStatus>) -> Vec<SyncStatus> {
28 synced_chunks.sort_by_key(|rec| rec.start_slot);
31
32 let mut best_sync: Vec<SyncStatus> = vec![];
33 let mut current_status: Option<SyncStatus> = None;
34 for rec in synced_chunks {
35 if let Some(current) = current_status.take() {
36 if rec.start_slot >= current.start_slot && rec.end_slot <= current.end_slot {
37 current_status = Some(current);
40 } else if rec.start_slot <= u64::from(current.end_slot).saturating_add(1).into() {
41 current_status = Some(SyncStatus {
44 end_slot: rec.end_slot,
45 start_slot: current.start_slot,
46 sync_time: rec.sync_time.max(current.sync_time),
47 node_id: rec.node_id,
48 });
49 } else {
50 best_sync.push(current);
53 current_status = Some(rec);
54 }
55 } else {
56 current_status = Some(rec);
57 }
58 }
59 if let Some(current) = current_status.take() {
61 best_sync.push(current);
62 }
63
64 best_sync
65}
66
67pub(crate) async fn get_sync_status() -> Vec<SyncStatus> {
75 let mut synced_chunks: Vec<SyncStatus> = vec![];
76
77 let Some(session) = CassandraSession::get(true) else {
78 warn!("Failed to get Cassandra Session, trying to get current indexing status");
79 return synced_chunks;
80 };
81
82 let session = session.get_raw_session();
84
85 let mut results = match session.query_iter(GET_SYNC_STATUS, ()).await {
86 Ok(result) => {
87 match result.rows_stream::<SyncStatusQueryParams>() {
88 Ok(result) => result,
89 Err(err) => {
90 warn!(error=%err, "Failed to get sync status results from query.");
91 return synced_chunks;
92 },
93 }
94 },
95 Err(err) => {
96 warn!(error=%err, "Failed to get sync status results from query.");
97 return synced_chunks;
98 },
99 };
100
101 while let Some(next_row) = results.next().await {
103 match next_row {
104 Err(err) => warn!(error=%err, "Failed to deserialize sync status results from query."),
105 Ok(row) => {
106 debug!("Sync Status: {:?}", row);
107 synced_chunks.push(SyncStatus {
108 end_slot: row.end_slot.into(),
109 start_slot: row.start_slot.into(),
110 sync_time: from_saturating(row.sync_time.0),
111 node_id: row.node_id,
112 });
113 },
114 }
115 }
116
117 merge_consecutive_sync_records(synced_chunks)
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123
124 #[test]
125 fn test_sync_merge() {
127 let synced_chunks: Vec<SyncStatus> = vec![
130 SyncStatus {
131 end_slot: 200_000.into(),
132 start_slot: 112_001.into(),
133 sync_time: 1_200_000,
134 node_id: "test-node-1".to_string(),
135 },
136 SyncStatus {
137 end_slot: 12000.into(),
138 start_slot: 0.into(),
139 sync_time: 100_100,
140 node_id: "test-node-1".to_string(),
141 },
142 SyncStatus {
143 end_slot: 99000.into(),
144 start_slot: 56789.into(),
145 sync_time: 200_000,
146 node_id: "test-node-2".to_string(),
147 },
148 SyncStatus {
149 end_slot: 112_000.into(),
150 start_slot: 100_000.into(),
151 sync_time: 1_100_100,
152 node_id: "test-node-1".to_string(),
153 },
154 SyncStatus {
155 end_slot: 56789.into(),
156 start_slot: 12300.into(),
157 sync_time: 200_000,
158 node_id: "test-node-2".to_string(),
159 },
160 SyncStatus {
161 end_slot: 12345.into(),
162 start_slot: 0.into(),
163 sync_time: 100_000,
164 node_id: "test-node-1".to_string(),
165 },
166 ];
167
168 let merged_syncs_status = merge_consecutive_sync_records(synced_chunks);
169
170 let expected: &[SyncStatus] = &[
172 SyncStatus {
173 end_slot: 99000.into(),
174 start_slot: 0.into(),
175 sync_time: 200_000,
176 node_id: "test-node-2".to_string(),
177 },
178 SyncStatus {
179 end_slot: 200_000.into(),
180 start_slot: 100_000.into(),
181 sync_time: 1_200_000,
182 node_id: "test-node-1".to_string(),
183 },
184 ];
185
186 assert_eq!(merged_syncs_status.as_slice(), expected);
187 }
188}