cat_gateway/db/index/queries/purge/
mod.rs

1//! Queries for purging volatile data.
2
3pub(crate) mod catalyst_id_for_stake_address;
4pub(crate) mod catalyst_id_for_txn_id;
5pub(crate) mod cip36_registration;
6pub(crate) mod cip36_registration_for_vote_key;
7pub(crate) mod cip36_registration_invalid;
8pub(crate) mod rbac509_invalid_registration;
9pub(crate) mod rbac509_registration;
10pub(crate) mod stake_registration;
11pub(crate) mod txi_by_hash;
12pub(crate) mod txo_ada;
13pub(crate) mod txo_assets;
14pub(crate) mod unstaked_txo_ada;
15pub(crate) mod unstaked_txo_assets;
16
17use std::{fmt::Debug, sync::Arc};
18
19use scylla::{
20    prepared_statement::PreparedStatement, serialize::row::SerializeRow,
21    transport::iterator::QueryPager, Session,
22};
23
24use super::{FallibleQueryResults, SizedBatch};
25use crate::settings::cassandra_db;
26
27/// No parameters
28const NO_PARAMS: () = ();
29
30/// All prepared DELETE query statements (purge DB table rows).
31#[derive(strum_macros::Display)]
32pub(crate) enum PreparedDeleteQuery {
33    /// TXO Delete query.
34    TxoAda,
35    /// TXO Assets Delete query.
36    TxoAssets,
37    /// Unstaked TXO Delete query.
38    UnstakedTxoAda,
39    /// Unstaked TXO Asset Delete query.
40    UnstakedTxoAsset,
41    /// TXI by TXN Hash Delete query.
42    Txi,
43    /// Stake Registration Delete query.
44    StakeRegistration,
45    /// CIP 36 Registration Delete Query.
46    Cip36Registration,
47    /// CIP 36 Registration Invalid Delete query.
48    Cip36RegistrationInvalid,
49    /// CIP 36 Registration for vote key Delete query.
50    Cip36RegistrationForVoteKey,
51    /// RBAC 509 Registration Delete query.
52    Rbac509,
53    /// Invalid RBAC 509 Registration Delete query.
54    Rbac509Invalid,
55    /// Catalyst ID For Transaction ID Delete query.
56    CatalystIdForTxnId,
57    /// Catalyst ID For Stake Address Delete query.
58    CatalystIdForStakeAddress,
59}
60
61/// All prepared SELECT query statements (primary keys from table).
62#[derive(strum_macros::Display)]
63pub(crate) enum PreparedSelectQuery {
64    /// TXO Select query.
65    TxoAda,
66    /// TXO Asset Select query.
67    TxoAssets,
68    /// Unstaked TXO Select query.
69    UnstakedTxoAda,
70    /// Unstaked TXO Asset Select query.
71    UnstakedTxoAsset,
72    /// TXI by TXN Hash Select query.
73    Txi,
74    /// Stake Registration Select query.
75    StakeRegistration,
76    /// CIP 36 Registration Select Query.
77    Cip36Registration,
78    /// CIP 36 Registration Invalid Select query.
79    Cip36RegistrationInvalid,
80    /// CIP 36 Registration for vote key Select query.
81    Cip36RegistrationForVoteKey,
82    /// RBAC 509 Registration Select query.
83    Rbac509,
84    /// Invalid RBAC 509 Registration Select query.
85    Rbac509Invalid,
86    /// Catalyst ID For Transaction ID Select query.
87    CatalystIdForTxnId,
88    /// Catalyst ID For Stake Address Select query.
89    CatalystIdForStakeAddress,
90}
91
92/// All prepared purge queries for a session.
93pub(crate) struct PreparedQueries {
94    /// TXO ADA Primary Key Query.
95    select_txo_ada: PreparedStatement,
96    /// TXO Delete Query.
97    delete_txo_ada: SizedBatch,
98    /// TXO Asset Primary Key Query.
99    select_txo_assets: PreparedStatement,
100    /// TXO Assets Delete Query.
101    delete_txo_assets: SizedBatch,
102    /// Unstaked TXO ADA Primary Key Query.
103    select_unstaked_txo_ada: PreparedStatement,
104    /// Unstaked TXO ADA Delete Query.
105    delete_unstaked_txo_ada: SizedBatch,
106    /// Unstaked TXO Assets Primary Key Query.
107    select_unstaked_txo_assets: PreparedStatement,
108    /// Unstaked TXO Asset Delete Query.
109    delete_unstaked_txo_assets: SizedBatch,
110    /// TXI by TXN Hash by TXN Hash Primary Key Query.
111    select_txi_by_hash: PreparedStatement,
112    /// TXI by TXN Hash Delete Query.
113    delete_txi_by_hash: SizedBatch,
114    /// Stake Registration Primary Key Query.
115    select_stake_registration: PreparedStatement,
116    /// Stake Registration Delete Query.
117    delete_stake_registration: SizedBatch,
118    /// CIP36 Registrations Primary Key Query.
119    select_cip36_registration: PreparedStatement,
120    /// CIP36 Registrations Delete Query.
121    delete_cip36_registration: SizedBatch,
122    /// CIP36 Registration Invalid Primary Key Query.
123    select_cip36_registration_invalid: PreparedStatement,
124    /// CIP36 Registration Invalid Delete Query.
125    delete_cip36_registration_invalid: SizedBatch,
126    /// CIP36 Registration for Vote Key Primary Key Query.
127    select_cip36_registration_for_vote_key: PreparedStatement,
128    /// CIP36 Registration for Vote Key Delete Query.
129    delete_cip36_registration_for_vote_key: SizedBatch,
130    /// RBAC 509 Registrations Primary Key Query.
131    select_rbac509_registration: PreparedStatement,
132    /// RBAC 509 Registrations Delete Query.
133    delete_rbac509_registration: SizedBatch,
134    /// RBAC 509 invalid registrations Primary Key Query.
135    select_rbac509_invalid_registration: PreparedStatement,
136    /// RBAC 509 invalid registrations Delete Query.
137    delete_rbac509_invalid_registration: SizedBatch,
138    /// Catalyst ID for TX ID Primary Key Query..
139    select_catalyst_id_for_txn_id: PreparedStatement,
140    /// Catalyst ID for TX ID Delete Query..
141    delete_catalyst_id_for_txn_id: SizedBatch,
142    /// Catalyst ID for Stake Address Primary Key Query..
143    select_catalyst_id_for_stake_address: PreparedStatement,
144    /// Catalyst ID for Stake Address Delete Query..
145    delete_catalyst_id_for_stake_address: SizedBatch,
146}
147
148impl PreparedQueries {
149    /// Create new prepared queries for a given session.
150    pub(crate) async fn new(
151        session: Arc<Session>, cfg: &cassandra_db::EnvVars,
152    ) -> anyhow::Result<Self> {
153        // We initialize like this, so that all errors preparing querys get shown before aborting.
154        Ok(Self {
155            select_txo_ada: txo_ada::PrimaryKeyQuery::prepare(&session).await?,
156            delete_txo_ada: txo_ada::DeleteQuery::prepare_batch(&session, cfg).await?,
157            select_txo_assets: txo_assets::PrimaryKeyQuery::prepare(&session).await?,
158            delete_txo_assets: txo_assets::DeleteQuery::prepare_batch(&session, cfg).await?,
159            select_unstaked_txo_ada: unstaked_txo_ada::PrimaryKeyQuery::prepare(&session).await?,
160            delete_unstaked_txo_ada: unstaked_txo_ada::DeleteQuery::prepare_batch(&session, cfg)
161                .await?,
162            select_unstaked_txo_assets: unstaked_txo_assets::PrimaryKeyQuery::prepare(&session)
163                .await?,
164            delete_unstaked_txo_assets: unstaked_txo_assets::DeleteQuery::prepare_batch(
165                &session, cfg,
166            )
167            .await?,
168            select_txi_by_hash: txi_by_hash::PrimaryKeyQuery::prepare(&session).await?,
169            delete_txi_by_hash: txi_by_hash::DeleteQuery::prepare_batch(&session, cfg).await?,
170            select_stake_registration: stake_registration::PrimaryKeyQuery::prepare(&session)
171                .await?,
172            delete_stake_registration: stake_registration::DeleteQuery::prepare_batch(
173                &session, cfg,
174            )
175            .await?,
176            select_cip36_registration: cip36_registration::PrimaryKeyQuery::prepare(&session)
177                .await?,
178            delete_cip36_registration: cip36_registration::DeleteQuery::prepare_batch(
179                &session, cfg,
180            )
181            .await?,
182            select_cip36_registration_invalid:
183                cip36_registration_invalid::PrimaryKeyQuery::prepare(&session).await?,
184            delete_cip36_registration_invalid:
185                cip36_registration_invalid::DeleteQuery::prepare_batch(&session, cfg).await?,
186            select_cip36_registration_for_vote_key:
187                cip36_registration_for_vote_key::PrimaryKeyQuery::prepare(&session).await?,
188            delete_cip36_registration_for_vote_key:
189                cip36_registration_for_vote_key::DeleteQuery::prepare_batch(&session, cfg).await?,
190            select_rbac509_registration: rbac509_registration::PrimaryKeyQuery::prepare(&session)
191                .await?,
192            delete_rbac509_registration: rbac509_registration::DeleteQuery::prepare_batch(
193                &session, cfg,
194            )
195            .await?,
196            select_rbac509_invalid_registration:
197                rbac509_invalid_registration::PrimaryKeyQuery::prepare(&session).await?,
198            delete_rbac509_invalid_registration:
199                rbac509_invalid_registration::DeleteQuery::prepare_batch(&session, cfg).await?,
200            select_catalyst_id_for_txn_id: catalyst_id_for_txn_id::PrimaryKeyQuery::prepare(
201                &session,
202            )
203            .await?,
204            delete_catalyst_id_for_txn_id: catalyst_id_for_txn_id::DeleteQuery::prepare_batch(
205                &session, cfg,
206            )
207            .await?,
208            select_catalyst_id_for_stake_address:
209                catalyst_id_for_stake_address::PrimaryKeyQuery::prepare(&session).await?,
210            delete_catalyst_id_for_stake_address:
211                catalyst_id_for_stake_address::DeleteQuery::prepare_batch(&session, cfg).await?,
212        })
213    }
214
215    /// Prepares a statement.
216    pub(crate) async fn prepare(
217        session: Arc<Session>, query: &str, consistency: scylla::statement::Consistency,
218        idempotent: bool,
219    ) -> anyhow::Result<PreparedStatement> {
220        super::PreparedQueries::prepare(session, query, consistency, idempotent).await
221    }
222
223    /// Prepares all permutations of the batch from 1 to max.
224    /// It is necessary to do this because batches are pre-sized, they can not be dynamic.
225    /// Preparing the batches in advance is a very larger performance increase.
226    pub(crate) async fn prepare_batch(
227        session: Arc<Session>, query: &str, cfg: &cassandra_db::EnvVars,
228        consistency: scylla::statement::Consistency, idempotent: bool, logged: bool,
229    ) -> anyhow::Result<SizedBatch> {
230        super::PreparedQueries::prepare_batch(session, query, cfg, consistency, idempotent, logged)
231            .await
232    }
233
234    /// Executes a select query with the given parameters.
235    ///
236    /// Returns an iterator that iterates over all the result pages that the query
237    /// returns.
238    pub(crate) async fn execute_iter(
239        &self, session: Arc<Session>, select_query: PreparedSelectQuery,
240    ) -> anyhow::Result<QueryPager> {
241        let prepared_stmt = match select_query {
242            PreparedSelectQuery::TxoAda => &self.select_txo_ada,
243            PreparedSelectQuery::TxoAssets => &self.select_txo_assets,
244            PreparedSelectQuery::UnstakedTxoAda => &self.select_unstaked_txo_ada,
245            PreparedSelectQuery::UnstakedTxoAsset => &self.select_unstaked_txo_assets,
246            PreparedSelectQuery::Txi => &self.select_txi_by_hash,
247            PreparedSelectQuery::StakeRegistration => &self.select_stake_registration,
248            PreparedSelectQuery::Cip36Registration => &self.select_cip36_registration,
249            PreparedSelectQuery::Cip36RegistrationInvalid => {
250                &self.select_cip36_registration_invalid
251            },
252            PreparedSelectQuery::Cip36RegistrationForVoteKey => {
253                &self.select_cip36_registration_for_vote_key
254            },
255            PreparedSelectQuery::Rbac509 => &self.select_rbac509_registration,
256            PreparedSelectQuery::Rbac509Invalid => &self.select_rbac509_invalid_registration,
257            PreparedSelectQuery::CatalystIdForTxnId => &self.select_catalyst_id_for_txn_id,
258            PreparedSelectQuery::CatalystIdForStakeAddress => {
259                &self.select_catalyst_id_for_stake_address
260            },
261        };
262
263        super::session_execute_iter(session, prepared_stmt, NO_PARAMS).await
264    }
265
266    /// Execute a purge query with the given parameters.
267    pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
268        &self, session: Arc<Session>, cfg: Arc<cassandra_db::EnvVars>, query: PreparedDeleteQuery,
269        values: Vec<T>,
270    ) -> FallibleQueryResults {
271        let query_map = match query {
272            PreparedDeleteQuery::TxoAda => &self.delete_txo_ada,
273            PreparedDeleteQuery::TxoAssets => &self.delete_txo_assets,
274            PreparedDeleteQuery::UnstakedTxoAda => &self.delete_unstaked_txo_ada,
275            PreparedDeleteQuery::UnstakedTxoAsset => &self.delete_unstaked_txo_assets,
276            PreparedDeleteQuery::Txi => &self.delete_txi_by_hash,
277            PreparedDeleteQuery::StakeRegistration => &self.delete_stake_registration,
278            PreparedDeleteQuery::Cip36Registration => &self.delete_cip36_registration,
279            PreparedDeleteQuery::Cip36RegistrationInvalid => {
280                &self.delete_cip36_registration_invalid
281            },
282            PreparedDeleteQuery::Cip36RegistrationForVoteKey => {
283                &self.delete_cip36_registration_for_vote_key
284            },
285            PreparedDeleteQuery::Rbac509 => &self.delete_rbac509_registration,
286            PreparedDeleteQuery::Rbac509Invalid => &self.delete_rbac509_invalid_registration,
287            PreparedDeleteQuery::CatalystIdForTxnId => &self.delete_catalyst_id_for_txn_id,
288            PreparedDeleteQuery::CatalystIdForStakeAddress => {
289                &self.delete_catalyst_id_for_stake_address
290            },
291        };
292
293        super::session_execute_batch(session, query_map, cfg, query, values).await
294    }
295}