cat_gateway/db/index/queries/
mod.rs

1//! Pre-prepare queries for a given session.
2//!
3//! This improves query execution time.
4
5pub(crate) mod purge;
6pub(crate) mod rbac;
7pub(crate) mod registrations;
8pub(crate) mod staked_ada;
9pub(crate) mod sync_status;
10
11use std::{fmt::Debug, sync::Arc};
12
13use anyhow::bail;
14use crossbeam_skiplist::SkipMap;
15use registrations::{
16    get_all_invalids::GetAllInvalidRegistrationsQuery,
17    get_all_registrations::GetAllRegistrationsQuery, get_from_stake_addr::GetRegistrationQuery,
18    get_from_stake_address::GetStakeAddrQuery, get_from_vote_key::GetStakeAddrFromVoteKeyQuery,
19    get_invalid::GetInvalidRegistrationQuery,
20};
21use scylla::{
22    batch::Batch, prepared_statement::PreparedStatement, serialize::row::SerializeRow,
23    transport::iterator::QueryPager, QueryResult, Session,
24};
25use staked_ada::{
26    get_assets_by_stake_address::GetAssetsByStakeAddressQuery,
27    get_txi_by_txn_hash::GetTxiByTxnHashesQuery,
28    get_txo_by_stake_address::GetTxoByStakeAddressQuery, update_txo_spent::UpdateTxoSpentQuery,
29};
30use sync_status::update::SyncStatusInsertQuery;
31use tracing::error;
32
33use super::block::{
34    certs::CertInsertQuery, cip36::Cip36InsertQuery, rbac509::Rbac509InsertQuery,
35    txi::TxiInsertQuery, txo::TxoInsertQuery,
36};
37use crate::{
38    db::index::queries::rbac::{
39        get_catalyst_id_from_stake_address, get_catalyst_id_from_transaction_id,
40        get_rbac_invalid_registrations, get_rbac_registrations,
41    },
42    settings::cassandra_db,
43};
44
45/// Batches of different sizes, prepared and ready for use.
46pub(crate) type SizedBatch = SkipMap<u16, Arc<Batch>>;
47
48/// All Prepared insert Queries that we know about.
49#[derive(strum_macros::Display)]
50#[allow(clippy::enum_variant_names)]
51pub(crate) enum PreparedQuery {
52    /// TXO Insert query.
53    TxoAdaInsertQuery,
54    /// TXO Asset Insert query.
55    TxoAssetInsertQuery,
56    /// Unstaked TXO Insert query.
57    UnstakedTxoAdaInsertQuery,
58    /// Unstaked TXO Asset Insert query.
59    UnstakedTxoAssetInsertQuery,
60    /// TXI Insert query.
61    TxiInsertQuery,
62    /// Stake Registration Insert query.
63    StakeRegistrationInsertQuery,
64    /// CIP 36 Registration Insert Query.
65    Cip36RegistrationInsertQuery,
66    /// CIP 36 Registration Error Insert query.
67    Cip36RegistrationInsertErrorQuery,
68    /// CIP 36 Registration for stake address Insert query.
69    Cip36RegistrationForStakeAddrInsertQuery,
70    /// TXO spent Update query.
71    TxoSpentUpdateQuery,
72    /// RBAC 509 Registration Insert query.
73    Rbac509InsertQuery,
74    /// An invalid RBAC 509 registration Insert query.
75    Rbac509InvalidInsertQuery,
76    /// A Catalyst ID for transaction ID insert query.
77    CatalystIdForTxnIdInsertQuery,
78    /// A Catalyst ID for stake address insert query.
79    CatalystIdForStakeAddressInsertQuery,
80}
81
82/// All prepared SELECT query statements (return data).
83pub(crate) enum PreparedSelectQuery {
84    /// Get TXO by stake address query.
85    TxoByStakeAddress,
86    /// Get TXI by transaction hash query.
87    TxiByTransactionHash,
88    /// Get native assets by stake address query.
89    AssetsByStakeAddress,
90    /// Get Registrations
91    RegistrationFromStakeAddr,
92    /// Get invalid Registration
93    InvalidRegistrationsFromStakeAddr,
94    /// Get stake addr from stake hash
95    StakeAddrFromStakeHash,
96    /// Get stake addr from vote key
97    StakeAddrFromVoteKey,
98    /// Get Catalyst ID by transaction ID.
99    CatalystIdByTransactionId,
100    /// Get Catalyst ID by stake address.
101    CatalystIdByStakeAddress,
102    /// Get RBAC registrations by Catalyst ID.
103    RbacRegistrationsByCatalystId,
104    /// Get invalid RBAC registrations by Catalyst ID.
105    RbacInvalidRegistrationsByCatalystId,
106    /// Get all registrations for snapshot
107    GetAllRegistrations,
108    /// Get all invalid registrations for snapshot
109    GetAllInvalidRegistrations,
110}
111
112/// All prepared UPSERT query statements (inserts/updates a single value of data).
113pub(crate) enum PreparedUpsertQuery {
114    /// Sync Status Insert
115    SyncStatusInsert,
116}
117
118/// All prepared queries for a session.
119#[allow(clippy::struct_field_names)]
120pub(crate) struct PreparedQueries {
121    /// TXO Insert query.
122    txo_insert_queries: SizedBatch,
123    /// TXO Asset Insert query.
124    txo_asset_insert_queries: SizedBatch,
125    /// Unstaked TXO Insert query.
126    unstaked_txo_insert_queries: SizedBatch,
127    /// Unstaked TXO Asset Insert query.
128    unstaked_txo_asset_insert_queries: SizedBatch,
129    /// TXI Insert query.
130    txi_insert_queries: SizedBatch,
131    /// TXI Insert query.
132    stake_registration_insert_queries: SizedBatch,
133    /// CIP36 Registrations.
134    cip36_registration_insert_queries: SizedBatch,
135    /// CIP36 Registration errors.
136    cip36_registration_error_insert_queries: SizedBatch,
137    /// CIP36 Registration for Stake Address Insert query.
138    cip36_registration_for_stake_address_insert_queries: SizedBatch,
139    /// Update TXO spent query.
140    txo_spent_update_queries: SizedBatch,
141    /// Get TXO by stake address query.
142    txo_by_stake_address_query: PreparedStatement,
143    /// Get TXI by transaction hash.
144    txi_by_txn_hash_query: PreparedStatement,
145    /// RBAC 509 Registrations.
146    rbac509_registration_insert_queries: SizedBatch,
147    /// Invalid RBAC 509 registrations.
148    rbac509_invalid_registration_insert_queries: SizedBatch,
149    /// Catalyst ID for transaction ID insert query.
150    catalyst_id_for_txn_id_insert_queries: SizedBatch,
151    /// Catalyst ID for stake address insert query.
152    catalyst_id_for_stake_address_insert_queries: SizedBatch,
153    /// Get native assets by stake address query.
154    native_assets_by_stake_address_query: PreparedStatement,
155    /// Get registrations
156    registration_from_stake_addr_query: PreparedStatement,
157    /// stake addr from stake hash
158    stake_addr_from_stake_address_query: PreparedStatement,
159    /// stake addr from vote key
160    stake_addr_from_vote_key_query: PreparedStatement,
161    /// Get invalid registrations
162    invalid_registrations_from_stake_addr_query: PreparedStatement,
163    /// Insert Sync Status update.
164    sync_status_insert: PreparedStatement,
165    /// Get Catalyst ID by stake address.
166    catalyst_id_by_stake_address_query: PreparedStatement,
167    /// Get Catalyst ID by transaction ID.
168    catalyst_id_by_transaction_id_query: PreparedStatement,
169    /// Get RBAC registrations by Catalyst ID.
170    rbac_registrations_by_catalyst_id_query: PreparedStatement,
171    /// Get invalid RBAC registrations by Catalyst ID.
172    rbac_invalid_registrations_by_catalyst_id_query: PreparedStatement,
173    /// Get all registrations for snapshot
174    get_all_registrations_query: PreparedStatement,
175    /// Get all invalid registrations for snapshot
176    get_all_invalid_registrations_query: PreparedStatement,
177}
178
179/// A set of query responses that can fail.
180pub(crate) type FallibleQueryResults = anyhow::Result<Vec<QueryResult>>;
181/// A set of query responses from tasks that can fail.
182pub(crate) type FallibleQueryTasks = Vec<tokio::task::JoinHandle<FallibleQueryResults>>;
183
184impl PreparedQueries {
185    /// Create new prepared queries for a given session.
186    #[allow(clippy::too_many_lines)]
187    pub(crate) async fn new(
188        session: Arc<Session>, cfg: &cassandra_db::EnvVars,
189    ) -> anyhow::Result<Self> {
190        // We initialize like this, so that all errors preparing querys get shown before aborting.
191        let txi_insert_queries = TxiInsertQuery::prepare_batch(&session, cfg).await?;
192        let all_txo_queries = TxoInsertQuery::prepare_batch(&session, cfg).await;
193        let stake_registration_insert_queries =
194            CertInsertQuery::prepare_batch(&session, cfg).await?;
195        let all_cip36_queries = Cip36InsertQuery::prepare_batch(&session, cfg).await;
196        let txo_spent_update_queries =
197            UpdateTxoSpentQuery::prepare_batch(session.clone(), cfg).await?;
198        let txo_by_stake_address_query = GetTxoByStakeAddressQuery::prepare(session.clone()).await;
199        let txi_by_txn_hash_query = GetTxiByTxnHashesQuery::prepare(session.clone()).await;
200        let all_rbac_queries = Rbac509InsertQuery::prepare_batch(&session, cfg).await;
201        let native_assets_by_stake_address_query =
202            GetAssetsByStakeAddressQuery::prepare(session.clone()).await;
203        let registration_from_stake_addr_query =
204            GetRegistrationQuery::prepare(session.clone()).await;
205        let stake_addr_from_stake_address = GetStakeAddrQuery::prepare(session.clone()).await;
206        let stake_addr_from_vote_key = GetStakeAddrFromVoteKeyQuery::prepare(session.clone()).await;
207        let invalid_registrations = GetInvalidRegistrationQuery::prepare(session.clone()).await;
208        let get_all_registrations_query = GetAllRegistrationsQuery::prepare(session.clone()).await;
209        let get_all_invalid_registrations_query =
210            GetAllInvalidRegistrationsQuery::prepare(session.clone()).await;
211        let sync_status_insert = SyncStatusInsertQuery::prepare(session.clone()).await?;
212        let catalyst_id_by_stake_address_query =
213            get_catalyst_id_from_stake_address::Query::prepare(session.clone()).await?;
214        let catalyst_id_by_transaction_id_query =
215            get_catalyst_id_from_transaction_id::Query::prepare(session.clone()).await?;
216        let rbac_registrations_by_catalyst_id_query =
217            get_rbac_registrations::Query::prepare(session.clone()).await?;
218        let rbac_invalid_registrations_by_catalyst_id_query =
219            get_rbac_invalid_registrations::Query::prepare(session.clone()).await?;
220
221        let (
222            txo_insert_queries,
223            unstaked_txo_insert_queries,
224            txo_asset_insert_queries,
225            unstaked_txo_asset_insert_queries,
226        ) = all_txo_queries?;
227
228        let (
229            cip36_registration_insert_queries,
230            cip36_registration_error_insert_queries,
231            cip36_registration_for_stake_address_insert_queries,
232        ) = all_cip36_queries?;
233
234        let (
235            rbac509_registration_insert_queries,
236            rbac509_invalid_registration_insert_queries,
237            catalyst_id_for_txn_id_insert_queries,
238            catalyst_id_for_stake_address_insert_queries,
239        ) = all_rbac_queries?;
240
241        Ok(Self {
242            txo_insert_queries,
243            txo_asset_insert_queries,
244            unstaked_txo_insert_queries,
245            unstaked_txo_asset_insert_queries,
246            txi_insert_queries,
247            stake_registration_insert_queries,
248            cip36_registration_insert_queries,
249            cip36_registration_error_insert_queries,
250            cip36_registration_for_stake_address_insert_queries,
251            txo_spent_update_queries,
252            txo_by_stake_address_query: txo_by_stake_address_query?,
253            txi_by_txn_hash_query: txi_by_txn_hash_query?,
254            rbac509_registration_insert_queries,
255            rbac509_invalid_registration_insert_queries,
256            catalyst_id_for_txn_id_insert_queries,
257            catalyst_id_for_stake_address_insert_queries,
258            native_assets_by_stake_address_query: native_assets_by_stake_address_query?,
259            registration_from_stake_addr_query: registration_from_stake_addr_query?,
260            stake_addr_from_stake_address_query: stake_addr_from_stake_address?,
261            stake_addr_from_vote_key_query: stake_addr_from_vote_key?,
262            invalid_registrations_from_stake_addr_query: invalid_registrations?,
263            sync_status_insert,
264            rbac_registrations_by_catalyst_id_query,
265            rbac_invalid_registrations_by_catalyst_id_query,
266            catalyst_id_by_stake_address_query,
267            catalyst_id_by_transaction_id_query,
268            get_all_registrations_query: get_all_registrations_query?,
269            get_all_invalid_registrations_query: get_all_invalid_registrations_query?,
270        })
271    }
272
273    /// Prepares a statement.
274    pub(crate) async fn prepare(
275        session: Arc<Session>, query: &str, consistency: scylla::statement::Consistency,
276        idempotent: bool,
277    ) -> anyhow::Result<PreparedStatement> {
278        let mut prepared = session.prepare(query).await?;
279        prepared.set_consistency(consistency);
280        prepared.set_is_idempotent(idempotent);
281
282        Ok(prepared)
283    }
284
285    /// Prepares all permutations of the batch from 1 to max.
286    /// It is necessary to do this because batches are pre-sized, they can not be dynamic.
287    /// Preparing the batches in advance is a very larger performance increase.
288    pub(crate) async fn prepare_batch(
289        session: Arc<Session>, query: &str, cfg: &cassandra_db::EnvVars,
290        consistency: scylla::statement::Consistency, idempotent: bool, logged: bool,
291    ) -> anyhow::Result<SizedBatch> {
292        let sized_batches: SizedBatch = SkipMap::new();
293
294        // First prepare the query. Only needs to be done once, all queries on a batch are the
295        // same.
296        let prepared = Self::prepare(session, query, consistency, idempotent).await?;
297
298        for batch_size in cassandra_db::MIN_BATCH_SIZE..=cfg.max_batch_size {
299            let mut batch: Batch = Batch::new(if logged {
300                scylla::batch::BatchType::Logged
301            } else {
302                scylla::batch::BatchType::Unlogged
303            });
304            batch.set_consistency(consistency);
305            batch.set_is_idempotent(idempotent);
306            for _ in cassandra_db::MIN_BATCH_SIZE..=batch_size {
307                batch.append_statement(prepared.clone());
308            }
309
310            sized_batches.insert(batch_size.try_into()?, Arc::new(batch));
311        }
312
313        Ok(sized_batches)
314    }
315
316    /// Executes a single query with the given parameters.
317    ///
318    /// Returns no data, and an error if the query fails.
319    pub(crate) async fn execute_upsert<P>(
320        &self, session: Arc<Session>, upsert_query: PreparedUpsertQuery, params: P,
321    ) -> anyhow::Result<()>
322    where P: SerializeRow {
323        let prepared_stmt = match upsert_query {
324            PreparedUpsertQuery::SyncStatusInsert => &self.sync_status_insert,
325        };
326
327        session
328            .execute_unpaged(prepared_stmt, params)
329            .await
330            .map_err(|e| anyhow::anyhow!(e))?;
331
332        Ok(())
333    }
334
335    /// Executes a select query with the given parameters.
336    ///
337    /// Returns an iterator that iterates over all the result pages that the query
338    /// returns.
339    pub(crate) async fn execute_iter<P>(
340        &self, session: Arc<Session>, select_query: PreparedSelectQuery, params: P,
341    ) -> anyhow::Result<QueryPager>
342    where P: SerializeRow {
343        let prepared_stmt = match select_query {
344            PreparedSelectQuery::TxoByStakeAddress => &self.txo_by_stake_address_query,
345            PreparedSelectQuery::TxiByTransactionHash => &self.txi_by_txn_hash_query,
346            PreparedSelectQuery::AssetsByStakeAddress => &self.native_assets_by_stake_address_query,
347            PreparedSelectQuery::RegistrationFromStakeAddr => {
348                &self.registration_from_stake_addr_query
349            },
350            PreparedSelectQuery::StakeAddrFromStakeHash => {
351                &self.stake_addr_from_stake_address_query
352            },
353            PreparedSelectQuery::StakeAddrFromVoteKey => &self.stake_addr_from_vote_key_query,
354            PreparedSelectQuery::InvalidRegistrationsFromStakeAddr => {
355                &self.invalid_registrations_from_stake_addr_query
356            },
357            PreparedSelectQuery::RbacRegistrationsByCatalystId => {
358                &self.rbac_registrations_by_catalyst_id_query
359            },
360            PreparedSelectQuery::RbacInvalidRegistrationsByCatalystId => {
361                &self.rbac_invalid_registrations_by_catalyst_id_query
362            },
363            PreparedSelectQuery::CatalystIdByTransactionId => {
364                &self.catalyst_id_by_transaction_id_query
365            },
366            PreparedSelectQuery::CatalystIdByStakeAddress => {
367                &self.catalyst_id_by_stake_address_query
368            },
369            PreparedSelectQuery::GetAllRegistrations => &self.get_all_registrations_query,
370            PreparedSelectQuery::GetAllInvalidRegistrations => {
371                &self.get_all_invalid_registrations_query
372            },
373        };
374        session_execute_iter(session, prepared_stmt, params).await
375    }
376
377    /// Execute a Batch query with the given parameters.
378    ///
379    /// Values should be a Vec of values which implement `SerializeRow` and they MUST be
380    /// the same, and must match the query being executed.
381    ///
382    /// This will divide the batch into optimal sized chunks and execute them until all
383    /// values have been executed or the first error is encountered.
384    pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
385        &self, session: Arc<Session>, cfg: Arc<cassandra_db::EnvVars>, query: PreparedQuery,
386        values: Vec<T>,
387    ) -> FallibleQueryResults {
388        let query_map = match query {
389            PreparedQuery::TxoAdaInsertQuery => &self.txo_insert_queries,
390            PreparedQuery::TxoAssetInsertQuery => &self.txo_asset_insert_queries,
391            PreparedQuery::UnstakedTxoAdaInsertQuery => &self.unstaked_txo_insert_queries,
392            PreparedQuery::UnstakedTxoAssetInsertQuery => &self.unstaked_txo_asset_insert_queries,
393            PreparedQuery::TxiInsertQuery => &self.txi_insert_queries,
394            PreparedQuery::StakeRegistrationInsertQuery => &self.stake_registration_insert_queries,
395            PreparedQuery::Cip36RegistrationInsertQuery => &self.cip36_registration_insert_queries,
396            PreparedQuery::Cip36RegistrationInsertErrorQuery => {
397                &self.cip36_registration_error_insert_queries
398            },
399            PreparedQuery::Cip36RegistrationForStakeAddrInsertQuery => {
400                &self.cip36_registration_for_stake_address_insert_queries
401            },
402            PreparedQuery::TxoSpentUpdateQuery => &self.txo_spent_update_queries,
403            PreparedQuery::Rbac509InsertQuery => &self.rbac509_registration_insert_queries,
404            PreparedQuery::Rbac509InvalidInsertQuery => {
405                &self.rbac509_invalid_registration_insert_queries
406            },
407            PreparedQuery::CatalystIdForTxnIdInsertQuery => {
408                &self.catalyst_id_for_txn_id_insert_queries
409            },
410            PreparedQuery::CatalystIdForStakeAddressInsertQuery => {
411                &self.catalyst_id_for_stake_address_insert_queries
412            },
413        };
414        session_execute_batch(session, query_map, cfg, query, values).await
415    }
416}
417
418/// Execute a Batch query with the given parameters.
419///
420/// Values should be a Vec of values which implement `SerializeRow` and they MUST be
421/// the same, and must match the query being executed.
422///
423/// This will divide the batch into optimal sized chunks and execute them until all
424/// values have been executed or the first error is encountered.
425async fn session_execute_batch<T: SerializeRow + Debug, Q: std::fmt::Display>(
426    session: Arc<Session>, query_map: &SizedBatch, cfg: Arc<cassandra_db::EnvVars>, query: Q,
427    values: Vec<T>,
428) -> FallibleQueryResults {
429    let mut results: Vec<QueryResult> = Vec::new();
430    let mut errors = Vec::new();
431
432    let chunks = values.chunks(cfg.max_batch_size.try_into().unwrap_or(1));
433    let query_str = format!("{query}");
434
435    for chunk in chunks {
436        let chunk_size: u16 = chunk.len().try_into()?;
437        let Some(batch_query) = query_map.get(&chunk_size) else {
438            // This should not actually occur.
439            bail!("No batch query found for size {}", chunk_size);
440        };
441        let batch_query_statements = batch_query.value().clone();
442        match session.batch(&batch_query_statements, chunk).await {
443            Ok(result) => results.push(result),
444            Err(err) => {
445                let chunk_str = format!("{chunk:?}");
446                error!(error=%err, query=query_str, chunk=chunk_str, "Query Execution Failed");
447                errors.push(err);
448                // Defer failure until all batches have been processed.
449            },
450        }
451    }
452
453    if !errors.is_empty() {
454        bail!("Query Failed: {query_str}! {errors:?}");
455    }
456
457    Ok(results)
458}
459
460/// Executes a select query with the given parameters.
461///
462/// Returns an iterator that iterates over all the result pages that the query
463/// returns.
464pub(crate) async fn session_execute_iter<P>(
465    session: Arc<Session>, prepared_stmt: &PreparedStatement, params: P,
466) -> anyhow::Result<QueryPager>
467where P: SerializeRow {
468    session
469        .execute_iter(prepared_stmt.clone(), params)
470        .await
471        .map_err(|e| anyhow::anyhow!(e))
472}