cat_gateway/db/index/queries/purge/
txo_ada.rs

1//! TXO by Stake Address Queries used in purging data.
2use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5    prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow,
6    Session,
7};
8use tracing::error;
9
10use crate::{
11    db::{
12        index::{
13            queries::{
14                purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
15                FallibleQueryResults, SizedBatch,
16            },
17            session::CassandraSession,
18        },
19        types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset},
20    },
21    settings::cassandra_db,
22};
23
24pub(crate) mod result {
25    //! Return values for TXO by Stake Address purge queries.
26
27    use crate::db::types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset};
28
29    /// Primary Key Row
30    pub(crate) type PrimaryKey = (DbStakeAddress, DbSlot, DbTxnIndex, DbTxnOutputOffset);
31}
32
33/// Select primary keys for TXO by Stake Address.
34const SELECT_QUERY: &str = include_str!("./cql/get_txo_by_stake_address.cql");
35
36/// Primary Key Value.
37#[derive(SerializeRow)]
38pub(crate) struct Params {
39    /// Stake Address - Binary 29 bytes.
40    pub(crate) stake_address: DbStakeAddress,
41    /// Block Slot Number
42    pub(crate) slot_no: DbSlot,
43    /// Transaction Offset inside the block.
44    pub(crate) txn_index: DbTxnIndex,
45    /// Transaction Output Offset inside the transaction.
46    pub(crate) txo: DbTxnOutputOffset,
47}
48
49impl Debug for Params {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.debug_struct("Params")
52            .field("stake_address", &self.stake_address)
53            .field("slot_no", &self.slot_no)
54            .field("txn_index", &self.txn_index)
55            .field("txo", &self.txo)
56            .finish()
57    }
58}
59
60impl From<result::PrimaryKey> for Params {
61    fn from(value: result::PrimaryKey) -> Self {
62        Self {
63            stake_address: value.0,
64            slot_no: value.1,
65            txn_index: value.2,
66            txo: value.3,
67        }
68    }
69}
70/// Get primary key for TXO by Stake Address query.
71pub(crate) struct PrimaryKeyQuery;
72
73impl PrimaryKeyQuery {
74    /// Prepares a query to get all TXO by stake address primary keys.
75    pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
76        PreparedQueries::prepare(
77            session.clone(),
78            SELECT_QUERY,
79            scylla::statement::Consistency::All,
80            true,
81        )
82        .await
83        .inspect_err(
84            |error| error!(error=%error, "Failed to prepare get TXO by stake address primary key query."),
85        )
86        .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
87    }
88
89    /// Executes a query to get all TXO by stake address primary keys.
90    pub(crate) async fn execute(
91        session: &CassandraSession,
92    ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
93        let iter = session
94            .purge_execute_iter(PreparedSelectQuery::TxoAda)
95            .await?
96            .rows_stream::<result::PrimaryKey>()?;
97
98        Ok(iter)
99    }
100}
101
102/// Delete TXO by Stake Address
103const DELETE_QUERY: &str = include_str!("cql/delete_txo_by_stake_address.cql");
104
105/// Delete TXO by Stake Address Query
106pub(crate) struct DeleteQuery;
107
108impl DeleteQuery {
109    /// Prepare Batch of Delete Queries
110    pub(crate) async fn prepare_batch(
111        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
112    ) -> anyhow::Result<SizedBatch> {
113        PreparedQueries::prepare_batch(
114            session.clone(),
115            DELETE_QUERY,
116            cfg,
117            scylla::statement::Consistency::Any,
118            true,
119            false,
120        )
121        .await
122        .inspect_err(
123            |error| error!(error=%error, "Failed to prepare delete TXO by stake address primary key query."),
124        )
125        .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
126    }
127
128    /// Executes a DELETE Query
129    pub(crate) async fn execute(
130        session: &CassandraSession, params: Vec<Params>,
131    ) -> FallibleQueryResults {
132        let results = session
133            .purge_execute_batch(PreparedDeleteQuery::TxoAda, params)
134            .await?;
135
136        Ok(results)
137    }
138}