1pub(crate) mod purge;
6pub(crate) mod rbac;
7pub(crate) mod registrations;
8pub(crate) mod staked_ada;
9pub(crate) mod sync_status;
10
11use std::{fmt::Debug, sync::Arc};
12
13use anyhow::bail;
14use crossbeam_skiplist::SkipMap;
15use registrations::{
16 get_all_invalids::GetAllInvalidRegistrationsQuery,
17 get_all_registrations::GetAllRegistrationsQuery, get_from_stake_addr::GetRegistrationQuery,
18 get_from_stake_address::GetStakeAddrQuery, get_from_vote_key::GetStakeAddrFromVoteKeyQuery,
19 get_invalid::GetInvalidRegistrationQuery,
20};
21use scylla::{
22 batch::Batch,
23 prepared_statement::PreparedStatement,
24 serialize::row::SerializeRow,
25 transport::{errors::QueryError, iterator::QueryPager},
26 QueryResult, Session,
27};
28use staked_ada::{
29 get_assets_by_stake_address::GetAssetsByStakeAddressQuery,
30 get_txi_by_txn_hash::GetTxiByTxnHashesQuery,
31 get_txo_by_stake_address::GetTxoByStakeAddressQuery, update_txo_spent::UpdateTxoSpentQuery,
32};
33use sync_status::update::SyncStatusInsertQuery;
34use tracing::error;
35
36use super::block::{
37 certs::CertInsertQuery, cip36::Cip36InsertQuery, rbac509::Rbac509InsertQuery,
38 txi::TxiInsertQuery, txo::TxoInsertQuery,
39};
40use crate::{
41 db::index::{
42 queries::rbac::{
43 get_catalyst_id_from_stake_address, get_catalyst_id_from_transaction_id,
44 get_rbac_invalid_registrations, get_rbac_registrations,
45 },
46 session::CassandraSessionError,
47 },
48 service::utilities::health::set_index_db_liveness,
49 settings::cassandra_db,
50};
51
52pub(crate) type SizedBatch = SkipMap<u16, Arc<Batch>>;
54
55#[derive(strum_macros::Display)]
57#[allow(clippy::enum_variant_names)]
58pub(crate) enum PreparedQuery {
59 TxoAdaInsertQuery,
61 TxoAssetInsertQuery,
63 UnstakedTxoAdaInsertQuery,
65 UnstakedTxoAssetInsertQuery,
67 TxiInsertQuery,
69 StakeRegistrationInsertQuery,
71 Cip36RegistrationInsertQuery,
73 Cip36RegistrationInsertErrorQuery,
75 Cip36RegistrationForVoteKeyInsertQuery,
77 TxoSpentUpdateQuery,
79 Rbac509InsertQuery,
81 Rbac509InvalidInsertQuery,
83 CatalystIdForTxnIdInsertQuery,
85 CatalystIdForStakeAddressInsertQuery,
87}
88
89pub(crate) enum PreparedSelectQuery {
91 TxoByStakeAddress,
93 TxiByTransactionHash,
95 AssetsByStakeAddress,
97 RegistrationFromStakeAddr,
99 InvalidRegistrationsFromStakeAddr,
101 StakeAddrFromStakeHash,
103 StakeAddrFromVoteKey,
105 CatalystIdByTransactionId,
107 CatalystIdByStakeAddress,
109 RbacRegistrationsByCatalystId,
111 RbacInvalidRegistrationsByCatalystId,
113 GetAllRegistrations,
115 GetAllInvalidRegistrations,
117}
118
119pub(crate) enum PreparedUpsertQuery {
121 SyncStatusInsert,
123}
124
125#[allow(clippy::struct_field_names)]
127pub(crate) struct PreparedQueries {
128 txo_insert_queries: SizedBatch,
130 txo_asset_insert_queries: SizedBatch,
132 unstaked_txo_insert_queries: SizedBatch,
134 unstaked_txo_asset_insert_queries: SizedBatch,
136 txi_insert_queries: SizedBatch,
138 stake_registration_insert_queries: SizedBatch,
140 cip36_registration_insert_queries: SizedBatch,
142 cip36_registration_error_insert_queries: SizedBatch,
144 cip36_registration_for_vote_key_insert_queries: SizedBatch,
146 txo_spent_update_queries: SizedBatch,
148 txo_by_stake_address_query: PreparedStatement,
150 txi_by_txn_hash_query: PreparedStatement,
152 rbac509_registration_insert_queries: SizedBatch,
154 rbac509_invalid_registration_insert_queries: SizedBatch,
156 catalyst_id_for_txn_id_insert_queries: SizedBatch,
158 catalyst_id_for_stake_address_insert_queries: SizedBatch,
160 native_assets_by_stake_address_query: PreparedStatement,
162 registration_from_stake_addr_query: PreparedStatement,
164 stake_addr_from_stake_address_query: PreparedStatement,
166 stake_addr_from_vote_key_query: PreparedStatement,
168 invalid_registrations_from_stake_addr_query: PreparedStatement,
170 sync_status_insert: PreparedStatement,
172 catalyst_id_by_stake_address_query: PreparedStatement,
174 catalyst_id_by_transaction_id_query: PreparedStatement,
176 rbac_registrations_by_catalyst_id_query: PreparedStatement,
178 rbac_invalid_registrations_by_catalyst_id_query: PreparedStatement,
180 get_all_registrations_query: PreparedStatement,
182 get_all_invalid_registrations_query: PreparedStatement,
184}
185
186pub(crate) type FallibleQueryResults = anyhow::Result<Vec<QueryResult>>;
188pub(crate) type FallibleQueryTasks = Vec<tokio::task::JoinHandle<FallibleQueryResults>>;
190
191impl PreparedQueries {
192 #[allow(clippy::too_many_lines)]
194 pub(crate) async fn new(
195 session: Arc<Session>, cfg: &cassandra_db::EnvVars,
196 ) -> anyhow::Result<Self> {
197 let txi_insert_queries = TxiInsertQuery::prepare_batch(&session, cfg).await?;
199 let all_txo_queries = TxoInsertQuery::prepare_batch(&session, cfg).await;
200 let stake_registration_insert_queries =
201 CertInsertQuery::prepare_batch(&session, cfg).await?;
202 let all_cip36_queries = Cip36InsertQuery::prepare_batch(&session, cfg).await;
203 let txo_spent_update_queries =
204 UpdateTxoSpentQuery::prepare_batch(session.clone(), cfg).await?;
205 let txo_by_stake_address_query = GetTxoByStakeAddressQuery::prepare(session.clone()).await;
206 let txi_by_txn_hash_query = GetTxiByTxnHashesQuery::prepare(session.clone()).await;
207 let all_rbac_queries = Rbac509InsertQuery::prepare_batch(&session, cfg).await;
208 let native_assets_by_stake_address_query =
209 GetAssetsByStakeAddressQuery::prepare(session.clone()).await;
210 let registration_from_stake_addr_query =
211 GetRegistrationQuery::prepare(session.clone()).await;
212 let stake_addr_from_stake_address = GetStakeAddrQuery::prepare(session.clone()).await;
213 let stake_addr_from_vote_key = GetStakeAddrFromVoteKeyQuery::prepare(session.clone()).await;
214 let invalid_registrations = GetInvalidRegistrationQuery::prepare(session.clone()).await;
215 let get_all_registrations_query = GetAllRegistrationsQuery::prepare(session.clone()).await;
216 let get_all_invalid_registrations_query =
217 GetAllInvalidRegistrationsQuery::prepare(session.clone()).await;
218 let sync_status_insert = SyncStatusInsertQuery::prepare(session.clone()).await?;
219 let catalyst_id_by_stake_address_query =
220 get_catalyst_id_from_stake_address::Query::prepare(session.clone()).await?;
221 let catalyst_id_by_transaction_id_query =
222 get_catalyst_id_from_transaction_id::Query::prepare(session.clone()).await?;
223 let rbac_registrations_by_catalyst_id_query =
224 get_rbac_registrations::Query::prepare(session.clone()).await?;
225 let rbac_invalid_registrations_by_catalyst_id_query =
226 get_rbac_invalid_registrations::Query::prepare(session.clone()).await?;
227
228 let (
229 txo_insert_queries,
230 unstaked_txo_insert_queries,
231 txo_asset_insert_queries,
232 unstaked_txo_asset_insert_queries,
233 ) = all_txo_queries?;
234
235 let (
236 cip36_registration_insert_queries,
237 cip36_registration_error_insert_queries,
238 cip36_registration_for_vote_key_insert_queries,
239 ) = all_cip36_queries?;
240
241 let (
242 rbac509_registration_insert_queries,
243 rbac509_invalid_registration_insert_queries,
244 catalyst_id_for_txn_id_insert_queries,
245 catalyst_id_for_stake_address_insert_queries,
246 ) = all_rbac_queries?;
247
248 Ok(Self {
249 txo_insert_queries,
250 txo_asset_insert_queries,
251 unstaked_txo_insert_queries,
252 unstaked_txo_asset_insert_queries,
253 txi_insert_queries,
254 stake_registration_insert_queries,
255 cip36_registration_insert_queries,
256 cip36_registration_error_insert_queries,
257 cip36_registration_for_vote_key_insert_queries,
258 txo_spent_update_queries,
259 txo_by_stake_address_query: txo_by_stake_address_query?,
260 txi_by_txn_hash_query: txi_by_txn_hash_query?,
261 rbac509_registration_insert_queries,
262 rbac509_invalid_registration_insert_queries,
263 catalyst_id_for_txn_id_insert_queries,
264 catalyst_id_for_stake_address_insert_queries,
265 native_assets_by_stake_address_query: native_assets_by_stake_address_query?,
266 registration_from_stake_addr_query: registration_from_stake_addr_query?,
267 stake_addr_from_stake_address_query: stake_addr_from_stake_address?,
268 stake_addr_from_vote_key_query: stake_addr_from_vote_key?,
269 invalid_registrations_from_stake_addr_query: invalid_registrations?,
270 sync_status_insert,
271 rbac_registrations_by_catalyst_id_query,
272 rbac_invalid_registrations_by_catalyst_id_query,
273 catalyst_id_by_stake_address_query,
274 catalyst_id_by_transaction_id_query,
275 get_all_registrations_query: get_all_registrations_query?,
276 get_all_invalid_registrations_query: get_all_invalid_registrations_query?,
277 })
278 }
279
280 pub(crate) async fn prepare(
282 session: Arc<Session>, query: &str, consistency: scylla::statement::Consistency,
283 idempotent: bool,
284 ) -> anyhow::Result<PreparedStatement> {
285 let mut prepared = session.prepare(query).await?;
286 prepared.set_consistency(consistency);
287 prepared.set_is_idempotent(idempotent);
288
289 Ok(prepared)
290 }
291
292 pub(crate) async fn prepare_batch(
296 session: Arc<Session>, query: &str, cfg: &cassandra_db::EnvVars,
297 consistency: scylla::statement::Consistency, idempotent: bool, logged: bool,
298 ) -> anyhow::Result<SizedBatch> {
299 let sized_batches: SizedBatch = SkipMap::new();
300
301 let prepared = Self::prepare(session, query, consistency, idempotent).await?;
304
305 for batch_size in cassandra_db::MIN_BATCH_SIZE..=cfg.max_batch_size {
306 let mut batch: Batch = Batch::new(if logged {
307 scylla::batch::BatchType::Logged
308 } else {
309 scylla::batch::BatchType::Unlogged
310 });
311 batch.set_consistency(consistency);
312 batch.set_is_idempotent(idempotent);
313 for _ in cassandra_db::MIN_BATCH_SIZE..=batch_size {
314 batch.append_statement(prepared.clone());
315 }
316
317 sized_batches.insert(batch_size.try_into()?, Arc::new(batch));
318 }
319
320 Ok(sized_batches)
321 }
322
323 pub(crate) async fn execute_upsert<P>(
327 &self, session: Arc<Session>, upsert_query: PreparedUpsertQuery, params: P,
328 ) -> anyhow::Result<()>
329 where P: SerializeRow {
330 let prepared_stmt = match upsert_query {
331 PreparedUpsertQuery::SyncStatusInsert => &self.sync_status_insert,
332 };
333
334 session
335 .execute_unpaged(prepared_stmt, params)
336 .await
337 .map_err(|e| {
338 match e {
339 QueryError::ConnectionPoolError(err) => {
340 set_index_db_liveness(false);
341 error!(error = %err, "Index DB connection failed. Liveness set to false.");
342 CassandraSessionError::ConnectionUnavailable { source: err.into() }.into()
343 },
344 _ => anyhow::anyhow!(e),
345 }
346 })?;
347
348 Ok(())
349 }
350
351 pub(crate) async fn execute_iter<P>(
356 &self, session: Arc<Session>, select_query: PreparedSelectQuery, params: P,
357 ) -> anyhow::Result<QueryPager>
358 where P: SerializeRow {
359 let prepared_stmt = match select_query {
360 PreparedSelectQuery::TxoByStakeAddress => &self.txo_by_stake_address_query,
361 PreparedSelectQuery::TxiByTransactionHash => &self.txi_by_txn_hash_query,
362 PreparedSelectQuery::AssetsByStakeAddress => &self.native_assets_by_stake_address_query,
363 PreparedSelectQuery::RegistrationFromStakeAddr => {
364 &self.registration_from_stake_addr_query
365 },
366 PreparedSelectQuery::StakeAddrFromStakeHash => {
367 &self.stake_addr_from_stake_address_query
368 },
369 PreparedSelectQuery::StakeAddrFromVoteKey => &self.stake_addr_from_vote_key_query,
370 PreparedSelectQuery::InvalidRegistrationsFromStakeAddr => {
371 &self.invalid_registrations_from_stake_addr_query
372 },
373 PreparedSelectQuery::RbacRegistrationsByCatalystId => {
374 &self.rbac_registrations_by_catalyst_id_query
375 },
376 PreparedSelectQuery::RbacInvalidRegistrationsByCatalystId => {
377 &self.rbac_invalid_registrations_by_catalyst_id_query
378 },
379 PreparedSelectQuery::CatalystIdByTransactionId => {
380 &self.catalyst_id_by_transaction_id_query
381 },
382 PreparedSelectQuery::CatalystIdByStakeAddress => {
383 &self.catalyst_id_by_stake_address_query
384 },
385 PreparedSelectQuery::GetAllRegistrations => &self.get_all_registrations_query,
386 PreparedSelectQuery::GetAllInvalidRegistrations => {
387 &self.get_all_invalid_registrations_query
388 },
389 };
390 session_execute_iter(session, prepared_stmt, params).await
391 }
392
393 pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
401 &self, session: Arc<Session>, cfg: Arc<cassandra_db::EnvVars>, query: PreparedQuery,
402 values: Vec<T>,
403 ) -> FallibleQueryResults {
404 let query_map = match query {
405 PreparedQuery::TxoAdaInsertQuery => &self.txo_insert_queries,
406 PreparedQuery::TxoAssetInsertQuery => &self.txo_asset_insert_queries,
407 PreparedQuery::UnstakedTxoAdaInsertQuery => &self.unstaked_txo_insert_queries,
408 PreparedQuery::UnstakedTxoAssetInsertQuery => &self.unstaked_txo_asset_insert_queries,
409 PreparedQuery::TxiInsertQuery => &self.txi_insert_queries,
410 PreparedQuery::StakeRegistrationInsertQuery => &self.stake_registration_insert_queries,
411 PreparedQuery::Cip36RegistrationInsertQuery => &self.cip36_registration_insert_queries,
412 PreparedQuery::Cip36RegistrationInsertErrorQuery => {
413 &self.cip36_registration_error_insert_queries
414 },
415 PreparedQuery::Cip36RegistrationForVoteKeyInsertQuery => {
416 &self.cip36_registration_for_vote_key_insert_queries
417 },
418 PreparedQuery::TxoSpentUpdateQuery => &self.txo_spent_update_queries,
419 PreparedQuery::Rbac509InsertQuery => &self.rbac509_registration_insert_queries,
420 PreparedQuery::Rbac509InvalidInsertQuery => {
421 &self.rbac509_invalid_registration_insert_queries
422 },
423 PreparedQuery::CatalystIdForTxnIdInsertQuery => {
424 &self.catalyst_id_for_txn_id_insert_queries
425 },
426 PreparedQuery::CatalystIdForStakeAddressInsertQuery => {
427 &self.catalyst_id_for_stake_address_insert_queries
428 },
429 };
430 session_execute_batch(session, query_map, cfg, query, values).await
431 }
432}
433
434async fn session_execute_batch<T: SerializeRow + Debug, Q: std::fmt::Display>(
442 session: Arc<Session>, query_map: &SizedBatch, cfg: Arc<cassandra_db::EnvVars>, query: Q,
443 values: Vec<T>,
444) -> FallibleQueryResults {
445 let mut results: Vec<QueryResult> = Vec::new();
446 let mut errors = Vec::new();
447
448 let chunks = values.chunks(cfg.max_batch_size.try_into().unwrap_or(1));
449 let query_str = format!("{query}");
450
451 for chunk in chunks {
452 let chunk_size: u16 = chunk.len().try_into()?;
453 let Some(batch_query) = query_map.get(&chunk_size) else {
454 bail!("No batch query found for size {}", chunk_size);
456 };
457 let batch_query_statements = batch_query.value().clone();
458 match session.batch(&batch_query_statements, chunk).await {
459 Ok(result) => results.push(result),
460 Err(err) => {
461 let chunk_str = format!("{chunk:?}");
462 if let QueryError::ConnectionPoolError(_) = err {
463 set_index_db_liveness(false);
464 error!(error=%err, query=query_str, chunk=chunk_str, "Index DB connection failed. Liveness set to false.");
465 bail!(CassandraSessionError::ConnectionUnavailable { source: err.into() })
466 };
467 error!(error=%err, query=query_str, chunk=chunk_str, "Query Execution Failed");
468 errors.push(err);
469 },
471 }
472 }
473
474 if !errors.is_empty() {
475 bail!("Query Failed: {query_str}! {errors:?}");
476 }
477
478 Ok(results)
479}
480
481pub(crate) async fn session_execute_iter<P>(
486 session: Arc<Session>, prepared_stmt: &PreparedStatement, params: P,
487) -> anyhow::Result<QueryPager>
488where P: SerializeRow {
489 session
490 .execute_iter(prepared_stmt.clone(), params)
491 .await
492 .map_err(|e| {
493 if let QueryError::ConnectionPoolError(err) = e {
494 set_index_db_liveness(false);
495 error!(error = %err, "Index DB connection failed. Liveness set to false.");
496 CassandraSessionError::ConnectionUnavailable { source: err.into() }.into()
497 } else {
498 e.into()
499 }
500 })
501}