cat_gateway/db/index/queries/sync_status/
update.rs1use std::{sync::Arc, time::SystemTime};
4
5use cardano_blockchain_types::Slot;
6use row::SyncStatusQueryParams;
7use scylla::{frame::value::CqlTimestamp, prepared_statement::PreparedStatement, Session};
8use tokio::task;
9use tracing::{error, warn};
10
11use crate::{
12 db::index::{
13 queries::{PreparedQueries, PreparedUpsertQuery},
14 session::CassandraSession,
15 },
16 service::utilities::convert::from_saturating,
17 settings::Settings,
18};
19
20const INSERT_SYNC_STATUS_QUERY: &str = include_str!("../cql/insert_sync_status.cql");
22
23pub(crate) mod row {
25 use scylla::{frame::value::CqlTimestamp, DeserializeRow, SerializeRow};
26
27 use crate::db::types::DbSlot;
28
29 #[derive(SerializeRow, DeserializeRow, Debug)]
31 pub(crate) struct SyncStatusQueryParams {
32 pub(crate) end_slot: DbSlot,
34 pub(crate) start_slot: DbSlot,
36 pub(crate) sync_time: CqlTimestamp,
38 pub(crate) node_id: String,
40 }
41}
42
43impl SyncStatusQueryParams {
44 pub(crate) fn new(end_slot: Slot, start_slot: Slot) -> Self {
46 let sync_time = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
47 Ok(now) => now.as_millis(),
48 Err(_) => 0, };
50
51 Self {
52 end_slot: end_slot.into(),
53 start_slot: start_slot.into(),
54 sync_time: CqlTimestamp(from_saturating(sync_time)),
55 node_id: Settings::service_id().to_owned(),
56 }
57 }
58}
59
60pub(crate) struct SyncStatusInsertQuery;
62
63impl SyncStatusInsertQuery {
64 pub(crate) async fn prepare(session: Arc<Session>) -> anyhow::Result<PreparedStatement> {
66 PreparedQueries::prepare(
67 session,
68 INSERT_SYNC_STATUS_QUERY,
69 scylla::statement::Consistency::All,
70 true,
71 )
72 .await
73 .inspect_err(
74 |error| error!(error=%error, "Failed to prepare get Sync Status Insert query."),
75 )
76 .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_SYNC_STATUS_QUERY}"))
77 }
78
79 pub(crate) async fn execute(
81 session: &CassandraSession, params: SyncStatusQueryParams,
82 ) -> anyhow::Result<()> {
83 session
84 .execute_upsert(PreparedUpsertQuery::SyncStatusInsert, params)
85 .await
86 }
87}
88
89pub(crate) fn update_sync_status(end_slot: Slot, start_slot: Slot) {
98 task::spawn(async move {
99 let Some(session) = CassandraSession::get(true) else {
100 warn!(
101 start_slot = ?start_slot,
102 end_slot = ?end_slot,
103 "Failed to get Cassandra Session, trying to record indexing status"
104 );
105 return;
106 };
107
108 if let Err(err) = SyncStatusInsertQuery::execute(
109 &session,
110 SyncStatusQueryParams::new(end_slot, start_slot),
111 )
112 .await
113 {
114 warn!(
115 error=%err,
116 start_slot = ?start_slot,
117 end_slot = ?end_slot,
118 "Failed to store Sync Status"
119 );
120 };
121 });
122}