cat_gateway/db/index/queries/
mod.rs

1//! Pre-prepare queries for a given session.
2//!
3//! This improves query execution time.
4
5pub(crate) mod purge;
6pub(crate) mod rbac;
7pub(crate) mod registrations;
8pub(crate) mod staked_ada;
9pub(crate) mod sync_status;
10
11use std::{fmt::Debug, sync::Arc};
12
13use anyhow::bail;
14use crossbeam_skiplist::SkipMap;
15use registrations::{
16    get_all_invalids::GetAllInvalidRegistrationsQuery,
17    get_all_registrations::GetAllRegistrationsQuery, get_from_stake_addr::GetRegistrationQuery,
18    get_from_stake_address::GetStakeAddrQuery, get_from_vote_key::GetStakeAddrFromVoteKeyQuery,
19    get_invalid::GetInvalidRegistrationQuery,
20};
21use scylla::{
22    batch::Batch,
23    prepared_statement::PreparedStatement,
24    serialize::row::SerializeRow,
25    transport::{errors::QueryError, iterator::QueryPager},
26    QueryResult, Session,
27};
28use staked_ada::{
29    get_assets_by_stake_address::GetAssetsByStakeAddressQuery,
30    get_txi_by_txn_hash::GetTxiByTxnHashesQuery,
31    get_txo_by_stake_address::GetTxoByStakeAddressQuery, update_txo_spent::UpdateTxoSpentQuery,
32};
33use sync_status::update::SyncStatusInsertQuery;
34use tracing::error;
35
36use super::block::{
37    certs::CertInsertQuery, cip36::Cip36InsertQuery, rbac509::Rbac509InsertQuery,
38    txi::TxiInsertQuery, txo::TxoInsertQuery,
39};
40use crate::{
41    db::index::{
42        queries::rbac::{
43            get_catalyst_id_from_stake_address, get_catalyst_id_from_transaction_id,
44            get_rbac_invalid_registrations, get_rbac_registrations,
45        },
46        session::CassandraSessionError,
47    },
48    service::utilities::health::set_index_db_liveness,
49    settings::cassandra_db,
50};
51
52/// Batches of different sizes, prepared and ready for use.
53pub(crate) type SizedBatch = SkipMap<u16, Arc<Batch>>;
54
55/// All Prepared insert Queries that we know about.
56#[derive(strum_macros::Display)]
57#[allow(clippy::enum_variant_names)]
58pub(crate) enum PreparedQuery {
59    /// TXO Insert query.
60    TxoAdaInsertQuery,
61    /// TXO Asset Insert query.
62    TxoAssetInsertQuery,
63    /// Unstaked TXO Insert query.
64    UnstakedTxoAdaInsertQuery,
65    /// Unstaked TXO Asset Insert query.
66    UnstakedTxoAssetInsertQuery,
67    /// TXI Insert query.
68    TxiInsertQuery,
69    /// Stake Registration Insert query.
70    StakeRegistrationInsertQuery,
71    /// CIP 36 Registration Insert Query.
72    Cip36RegistrationInsertQuery,
73    /// CIP 36 Registration Error Insert query.
74    Cip36RegistrationInsertErrorQuery,
75    /// CIP 36 Registration for voting key Insert query.
76    Cip36RegistrationForVoteKeyInsertQuery,
77    /// TXO spent Update query.
78    TxoSpentUpdateQuery,
79    /// RBAC 509 Registration Insert query.
80    Rbac509InsertQuery,
81    /// An invalid RBAC 509 registration Insert query.
82    Rbac509InvalidInsertQuery,
83    /// A Catalyst ID for transaction ID insert query.
84    CatalystIdForTxnIdInsertQuery,
85    /// A Catalyst ID for stake address insert query.
86    CatalystIdForStakeAddressInsertQuery,
87}
88
89/// All prepared SELECT query statements (return data).
90pub(crate) enum PreparedSelectQuery {
91    /// Get TXO by stake address query.
92    TxoByStakeAddress,
93    /// Get TXI by transaction hash query.
94    TxiByTransactionHash,
95    /// Get native assets by stake address query.
96    AssetsByStakeAddress,
97    /// Get Registrations
98    RegistrationFromStakeAddr,
99    /// Get invalid Registration
100    InvalidRegistrationsFromStakeAddr,
101    /// Get stake addr from stake hash
102    StakeAddrFromStakeHash,
103    /// Get stake addr from vote key
104    StakeAddrFromVoteKey,
105    /// Get Catalyst ID by transaction ID.
106    CatalystIdByTransactionId,
107    /// Get Catalyst ID by stake address.
108    CatalystIdByStakeAddress,
109    /// Get RBAC registrations by Catalyst ID.
110    RbacRegistrationsByCatalystId,
111    /// Get invalid RBAC registrations by Catalyst ID.
112    RbacInvalidRegistrationsByCatalystId,
113    /// Get all registrations for snapshot
114    GetAllRegistrations,
115    /// Get all invalid registrations for snapshot
116    GetAllInvalidRegistrations,
117}
118
119/// All prepared UPSERT query statements (inserts/updates a single value of data).
120pub(crate) enum PreparedUpsertQuery {
121    /// Sync Status Insert
122    SyncStatusInsert,
123}
124
125/// All prepared queries for a session.
126#[allow(clippy::struct_field_names)]
127pub(crate) struct PreparedQueries {
128    /// TXO Insert query.
129    txo_insert_queries: SizedBatch,
130    /// TXO Asset Insert query.
131    txo_asset_insert_queries: SizedBatch,
132    /// Unstaked TXO Insert query.
133    unstaked_txo_insert_queries: SizedBatch,
134    /// Unstaked TXO Asset Insert query.
135    unstaked_txo_asset_insert_queries: SizedBatch,
136    /// TXI Insert query.
137    txi_insert_queries: SizedBatch,
138    /// TXI Insert query.
139    stake_registration_insert_queries: SizedBatch,
140    /// CIP36 Registrations.
141    cip36_registration_insert_queries: SizedBatch,
142    /// CIP36 Registration errors.
143    cip36_registration_error_insert_queries: SizedBatch,
144    /// CIP36 Registration for Stake Address Insert query.
145    cip36_registration_for_vote_key_insert_queries: SizedBatch,
146    /// Update TXO spent query.
147    txo_spent_update_queries: SizedBatch,
148    /// Get TXO by stake address query.
149    txo_by_stake_address_query: PreparedStatement,
150    /// Get TXI by transaction hash.
151    txi_by_txn_hash_query: PreparedStatement,
152    /// RBAC 509 Registrations.
153    rbac509_registration_insert_queries: SizedBatch,
154    /// Invalid RBAC 509 registrations.
155    rbac509_invalid_registration_insert_queries: SizedBatch,
156    /// Catalyst ID for transaction ID insert query.
157    catalyst_id_for_txn_id_insert_queries: SizedBatch,
158    /// Catalyst ID for stake address insert query.
159    catalyst_id_for_stake_address_insert_queries: SizedBatch,
160    /// Get native assets by stake address query.
161    native_assets_by_stake_address_query: PreparedStatement,
162    /// Get registrations
163    registration_from_stake_addr_query: PreparedStatement,
164    /// stake addr from stake hash
165    stake_addr_from_stake_address_query: PreparedStatement,
166    /// stake addr from vote key
167    stake_addr_from_vote_key_query: PreparedStatement,
168    /// Get invalid registrations
169    invalid_registrations_from_stake_addr_query: PreparedStatement,
170    /// Insert Sync Status update.
171    sync_status_insert: PreparedStatement,
172    /// Get Catalyst ID by stake address.
173    catalyst_id_by_stake_address_query: PreparedStatement,
174    /// Get Catalyst ID by transaction ID.
175    catalyst_id_by_transaction_id_query: PreparedStatement,
176    /// Get RBAC registrations by Catalyst ID.
177    rbac_registrations_by_catalyst_id_query: PreparedStatement,
178    /// Get invalid RBAC registrations by Catalyst ID.
179    rbac_invalid_registrations_by_catalyst_id_query: PreparedStatement,
180    /// Get all registrations for snapshot
181    get_all_registrations_query: PreparedStatement,
182    /// Get all invalid registrations for snapshot
183    get_all_invalid_registrations_query: PreparedStatement,
184}
185
186/// A set of query responses that can fail.
187pub(crate) type FallibleQueryResults = anyhow::Result<Vec<QueryResult>>;
188/// A set of query responses from tasks that can fail.
189pub(crate) type FallibleQueryTasks = Vec<tokio::task::JoinHandle<FallibleQueryResults>>;
190
191impl PreparedQueries {
192    /// Create new prepared queries for a given session.
193    #[allow(clippy::too_many_lines)]
194    pub(crate) async fn new(
195        session: Arc<Session>, cfg: &cassandra_db::EnvVars,
196    ) -> anyhow::Result<Self> {
197        // We initialize like this, so that all errors preparing querys get shown before aborting.
198        let txi_insert_queries = TxiInsertQuery::prepare_batch(&session, cfg).await?;
199        let all_txo_queries = TxoInsertQuery::prepare_batch(&session, cfg).await;
200        let stake_registration_insert_queries =
201            CertInsertQuery::prepare_batch(&session, cfg).await?;
202        let all_cip36_queries = Cip36InsertQuery::prepare_batch(&session, cfg).await;
203        let txo_spent_update_queries =
204            UpdateTxoSpentQuery::prepare_batch(session.clone(), cfg).await?;
205        let txo_by_stake_address_query = GetTxoByStakeAddressQuery::prepare(session.clone()).await;
206        let txi_by_txn_hash_query = GetTxiByTxnHashesQuery::prepare(session.clone()).await;
207        let all_rbac_queries = Rbac509InsertQuery::prepare_batch(&session, cfg).await;
208        let native_assets_by_stake_address_query =
209            GetAssetsByStakeAddressQuery::prepare(session.clone()).await;
210        let registration_from_stake_addr_query =
211            GetRegistrationQuery::prepare(session.clone()).await;
212        let stake_addr_from_stake_address = GetStakeAddrQuery::prepare(session.clone()).await;
213        let stake_addr_from_vote_key = GetStakeAddrFromVoteKeyQuery::prepare(session.clone()).await;
214        let invalid_registrations = GetInvalidRegistrationQuery::prepare(session.clone()).await;
215        let get_all_registrations_query = GetAllRegistrationsQuery::prepare(session.clone()).await;
216        let get_all_invalid_registrations_query =
217            GetAllInvalidRegistrationsQuery::prepare(session.clone()).await;
218        let sync_status_insert = SyncStatusInsertQuery::prepare(session.clone()).await?;
219        let catalyst_id_by_stake_address_query =
220            get_catalyst_id_from_stake_address::Query::prepare(session.clone()).await?;
221        let catalyst_id_by_transaction_id_query =
222            get_catalyst_id_from_transaction_id::Query::prepare(session.clone()).await?;
223        let rbac_registrations_by_catalyst_id_query =
224            get_rbac_registrations::Query::prepare(session.clone()).await?;
225        let rbac_invalid_registrations_by_catalyst_id_query =
226            get_rbac_invalid_registrations::Query::prepare(session.clone()).await?;
227
228        let (
229            txo_insert_queries,
230            unstaked_txo_insert_queries,
231            txo_asset_insert_queries,
232            unstaked_txo_asset_insert_queries,
233        ) = all_txo_queries?;
234
235        let (
236            cip36_registration_insert_queries,
237            cip36_registration_error_insert_queries,
238            cip36_registration_for_vote_key_insert_queries,
239        ) = all_cip36_queries?;
240
241        let (
242            rbac509_registration_insert_queries,
243            rbac509_invalid_registration_insert_queries,
244            catalyst_id_for_txn_id_insert_queries,
245            catalyst_id_for_stake_address_insert_queries,
246        ) = all_rbac_queries?;
247
248        Ok(Self {
249            txo_insert_queries,
250            txo_asset_insert_queries,
251            unstaked_txo_insert_queries,
252            unstaked_txo_asset_insert_queries,
253            txi_insert_queries,
254            stake_registration_insert_queries,
255            cip36_registration_insert_queries,
256            cip36_registration_error_insert_queries,
257            cip36_registration_for_vote_key_insert_queries,
258            txo_spent_update_queries,
259            txo_by_stake_address_query: txo_by_stake_address_query?,
260            txi_by_txn_hash_query: txi_by_txn_hash_query?,
261            rbac509_registration_insert_queries,
262            rbac509_invalid_registration_insert_queries,
263            catalyst_id_for_txn_id_insert_queries,
264            catalyst_id_for_stake_address_insert_queries,
265            native_assets_by_stake_address_query: native_assets_by_stake_address_query?,
266            registration_from_stake_addr_query: registration_from_stake_addr_query?,
267            stake_addr_from_stake_address_query: stake_addr_from_stake_address?,
268            stake_addr_from_vote_key_query: stake_addr_from_vote_key?,
269            invalid_registrations_from_stake_addr_query: invalid_registrations?,
270            sync_status_insert,
271            rbac_registrations_by_catalyst_id_query,
272            rbac_invalid_registrations_by_catalyst_id_query,
273            catalyst_id_by_stake_address_query,
274            catalyst_id_by_transaction_id_query,
275            get_all_registrations_query: get_all_registrations_query?,
276            get_all_invalid_registrations_query: get_all_invalid_registrations_query?,
277        })
278    }
279
280    /// Prepares a statement.
281    pub(crate) async fn prepare(
282        session: Arc<Session>, query: &str, consistency: scylla::statement::Consistency,
283        idempotent: bool,
284    ) -> anyhow::Result<PreparedStatement> {
285        let mut prepared = session.prepare(query).await?;
286        prepared.set_consistency(consistency);
287        prepared.set_is_idempotent(idempotent);
288
289        Ok(prepared)
290    }
291
292    /// Prepares all permutations of the batch from 1 to max.
293    /// It is necessary to do this because batches are pre-sized, they can not be dynamic.
294    /// Preparing the batches in advance is a very larger performance increase.
295    pub(crate) async fn prepare_batch(
296        session: Arc<Session>, query: &str, cfg: &cassandra_db::EnvVars,
297        consistency: scylla::statement::Consistency, idempotent: bool, logged: bool,
298    ) -> anyhow::Result<SizedBatch> {
299        let sized_batches: SizedBatch = SkipMap::new();
300
301        // First prepare the query. Only needs to be done once, all queries on a batch are the
302        // same.
303        let prepared = Self::prepare(session, query, consistency, idempotent).await?;
304
305        for batch_size in cassandra_db::MIN_BATCH_SIZE..=cfg.max_batch_size {
306            let mut batch: Batch = Batch::new(if logged {
307                scylla::batch::BatchType::Logged
308            } else {
309                scylla::batch::BatchType::Unlogged
310            });
311            batch.set_consistency(consistency);
312            batch.set_is_idempotent(idempotent);
313            for _ in cassandra_db::MIN_BATCH_SIZE..=batch_size {
314                batch.append_statement(prepared.clone());
315            }
316
317            sized_batches.insert(batch_size.try_into()?, Arc::new(batch));
318        }
319
320        Ok(sized_batches)
321    }
322
323    /// Executes a single query with the given parameters.
324    ///
325    /// Returns no data, and an error if the query fails.
326    pub(crate) async fn execute_upsert<P>(
327        &self, session: Arc<Session>, upsert_query: PreparedUpsertQuery, params: P,
328    ) -> anyhow::Result<()>
329    where P: SerializeRow {
330        let prepared_stmt = match upsert_query {
331            PreparedUpsertQuery::SyncStatusInsert => &self.sync_status_insert,
332        };
333
334        session
335            .execute_unpaged(prepared_stmt, params)
336            .await
337            .map_err(|e| {
338                match e {
339                    QueryError::ConnectionPoolError(err) => {
340                        set_index_db_liveness(false);
341                        error!(error = %err, "Index DB connection failed. Liveness set to false.");
342                        CassandraSessionError::ConnectionUnavailable { source: err.into() }.into()
343                    },
344                    _ => anyhow::anyhow!(e),
345                }
346            })?;
347
348        Ok(())
349    }
350
351    /// Executes a select query with the given parameters.
352    ///
353    /// Returns an iterator that iterates over all the result pages that the query
354    /// returns.
355    pub(crate) async fn execute_iter<P>(
356        &self, session: Arc<Session>, select_query: PreparedSelectQuery, params: P,
357    ) -> anyhow::Result<QueryPager>
358    where P: SerializeRow {
359        let prepared_stmt = match select_query {
360            PreparedSelectQuery::TxoByStakeAddress => &self.txo_by_stake_address_query,
361            PreparedSelectQuery::TxiByTransactionHash => &self.txi_by_txn_hash_query,
362            PreparedSelectQuery::AssetsByStakeAddress => &self.native_assets_by_stake_address_query,
363            PreparedSelectQuery::RegistrationFromStakeAddr => {
364                &self.registration_from_stake_addr_query
365            },
366            PreparedSelectQuery::StakeAddrFromStakeHash => {
367                &self.stake_addr_from_stake_address_query
368            },
369            PreparedSelectQuery::StakeAddrFromVoteKey => &self.stake_addr_from_vote_key_query,
370            PreparedSelectQuery::InvalidRegistrationsFromStakeAddr => {
371                &self.invalid_registrations_from_stake_addr_query
372            },
373            PreparedSelectQuery::RbacRegistrationsByCatalystId => {
374                &self.rbac_registrations_by_catalyst_id_query
375            },
376            PreparedSelectQuery::RbacInvalidRegistrationsByCatalystId => {
377                &self.rbac_invalid_registrations_by_catalyst_id_query
378            },
379            PreparedSelectQuery::CatalystIdByTransactionId => {
380                &self.catalyst_id_by_transaction_id_query
381            },
382            PreparedSelectQuery::CatalystIdByStakeAddress => {
383                &self.catalyst_id_by_stake_address_query
384            },
385            PreparedSelectQuery::GetAllRegistrations => &self.get_all_registrations_query,
386            PreparedSelectQuery::GetAllInvalidRegistrations => {
387                &self.get_all_invalid_registrations_query
388            },
389        };
390        session_execute_iter(session, prepared_stmt, params).await
391    }
392
393    /// Execute a Batch query with the given parameters.
394    ///
395    /// Values should be a Vec of values which implement `SerializeRow` and they MUST be
396    /// the same, and must match the query being executed.
397    ///
398    /// This will divide the batch into optimal sized chunks and execute them until all
399    /// values have been executed or the first error is encountered.
400    pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
401        &self, session: Arc<Session>, cfg: Arc<cassandra_db::EnvVars>, query: PreparedQuery,
402        values: Vec<T>,
403    ) -> FallibleQueryResults {
404        let query_map = match query {
405            PreparedQuery::TxoAdaInsertQuery => &self.txo_insert_queries,
406            PreparedQuery::TxoAssetInsertQuery => &self.txo_asset_insert_queries,
407            PreparedQuery::UnstakedTxoAdaInsertQuery => &self.unstaked_txo_insert_queries,
408            PreparedQuery::UnstakedTxoAssetInsertQuery => &self.unstaked_txo_asset_insert_queries,
409            PreparedQuery::TxiInsertQuery => &self.txi_insert_queries,
410            PreparedQuery::StakeRegistrationInsertQuery => &self.stake_registration_insert_queries,
411            PreparedQuery::Cip36RegistrationInsertQuery => &self.cip36_registration_insert_queries,
412            PreparedQuery::Cip36RegistrationInsertErrorQuery => {
413                &self.cip36_registration_error_insert_queries
414            },
415            PreparedQuery::Cip36RegistrationForVoteKeyInsertQuery => {
416                &self.cip36_registration_for_vote_key_insert_queries
417            },
418            PreparedQuery::TxoSpentUpdateQuery => &self.txo_spent_update_queries,
419            PreparedQuery::Rbac509InsertQuery => &self.rbac509_registration_insert_queries,
420            PreparedQuery::Rbac509InvalidInsertQuery => {
421                &self.rbac509_invalid_registration_insert_queries
422            },
423            PreparedQuery::CatalystIdForTxnIdInsertQuery => {
424                &self.catalyst_id_for_txn_id_insert_queries
425            },
426            PreparedQuery::CatalystIdForStakeAddressInsertQuery => {
427                &self.catalyst_id_for_stake_address_insert_queries
428            },
429        };
430        session_execute_batch(session, query_map, cfg, query, values).await
431    }
432}
433
434/// Execute a Batch query with the given parameters.
435///
436/// Values should be a Vec of values which implement `SerializeRow` and they MUST be
437/// the same, and must match the query being executed.
438///
439/// This will divide the batch into optimal sized chunks and execute them until all
440/// values have been executed or the first error is encountered.
441async fn session_execute_batch<T: SerializeRow + Debug, Q: std::fmt::Display>(
442    session: Arc<Session>, query_map: &SizedBatch, cfg: Arc<cassandra_db::EnvVars>, query: Q,
443    values: Vec<T>,
444) -> FallibleQueryResults {
445    let mut results: Vec<QueryResult> = Vec::new();
446    let mut errors = Vec::new();
447
448    let chunks = values.chunks(cfg.max_batch_size.try_into().unwrap_or(1));
449    let query_str = format!("{query}");
450
451    for chunk in chunks {
452        let chunk_size: u16 = chunk.len().try_into()?;
453        let Some(batch_query) = query_map.get(&chunk_size) else {
454            // This should not actually occur.
455            bail!("No batch query found for size {}", chunk_size);
456        };
457        let batch_query_statements = batch_query.value().clone();
458        match session.batch(&batch_query_statements, chunk).await {
459            Ok(result) => results.push(result),
460            Err(err) => {
461                let chunk_str = format!("{chunk:?}");
462                if let QueryError::ConnectionPoolError(_) = err {
463                    set_index_db_liveness(false);
464                    error!(error=%err, query=query_str, chunk=chunk_str, "Index DB connection failed. Liveness set to false.");
465                    bail!(CassandraSessionError::ConnectionUnavailable { source: err.into() })
466                };
467                error!(error=%err, query=query_str, chunk=chunk_str, "Query Execution Failed");
468                errors.push(err);
469                // Defer failure until all batches have been processed.
470            },
471        }
472    }
473
474    if !errors.is_empty() {
475        bail!("Query Failed: {query_str}! {errors:?}");
476    }
477
478    Ok(results)
479}
480
481/// Executes a select query with the given parameters.
482///
483/// Returns an iterator that iterates over all the result pages that the query
484/// returns.
485pub(crate) async fn session_execute_iter<P>(
486    session: Arc<Session>, prepared_stmt: &PreparedStatement, params: P,
487) -> anyhow::Result<QueryPager>
488where P: SerializeRow {
489    session
490        .execute_iter(prepared_stmt.clone(), params)
491        .await
492        .map_err(|e| {
493            if let QueryError::ConnectionPoolError(err) = e {
494                set_index_db_liveness(false);
495                error!(error = %err, "Index DB connection failed. Liveness set to false.");
496                CassandraSessionError::ConnectionUnavailable { source: err.into() }.into()
497            } else {
498                e.into()
499            }
500        })
501}