cat_gateway/db/index/queries/purge/
unstaked_txo_ada.rs

1//! Unstaked Transaction Outputs (ADA), by their transaction hash, queries used in purging
2//! data.
3use std::{fmt::Debug, sync::Arc};
4
5use scylla::{
6    prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow,
7    Session,
8};
9use tracing::error;
10
11use crate::{
12    db::{
13        index::{
14            queries::{
15                purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
16                FallibleQueryResults, SizedBatch,
17            },
18            session::CassandraSession,
19        },
20        types::{DbTransactionId, DbTxnOutputOffset},
21    },
22    settings::cassandra_db,
23};
24
25pub(crate) mod result {
26    //! Return values for Unstaked TXO ADA purge queries.
27
28    use crate::db::types::{DbTransactionId, DbTxnOutputOffset};
29
30    /// Primary Key Row
31    pub(crate) type PrimaryKey = (DbTransactionId, DbTxnOutputOffset, num_bigint::BigInt);
32}
33
34/// Select primary keys for Unstaked TXO ADA.
35const SELECT_QUERY: &str = include_str!("./cql/get_unstaked_txo_by_txn_hash.cql");
36
37/// Primary Key Value.
38#[derive(SerializeRow)]
39pub(crate) struct Params {
40    /// 32 byte hash of this transaction.
41    pub(crate) txn_id: DbTransactionId,
42    /// Transaction Output Offset inside the transaction.
43    pub(crate) txo: DbTxnOutputOffset,
44}
45
46impl Debug for Params {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("Params")
49            .field("txn_id", &self.txn_id)
50            .field("txo", &self.txo)
51            .finish()
52    }
53}
54
55impl From<result::PrimaryKey> for Params {
56    fn from(value: result::PrimaryKey) -> Self {
57        Self {
58            txn_id: value.0,
59            txo: value.1,
60        }
61    }
62}
63/// Get primary key for Unstaked TXO ADA query.
64pub(crate) struct PrimaryKeyQuery;
65
66impl PrimaryKeyQuery {
67    /// Prepares a query to get all Unstaked TXO ADA primary keys.
68    pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
69        PreparedQueries::prepare(
70            session.clone(),
71            SELECT_QUERY,
72            scylla::statement::Consistency::All,
73            true,
74        )
75        .await
76        .inspect_err(
77            |error| error!(error=%error, "Failed to prepare get Unstaked TXO ADA primary key query."),
78        )
79        .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
80    }
81
82    /// Executes a query to get all Unstaked TXO ADA primary keys.
83    pub(crate) async fn execute(
84        session: &CassandraSession,
85    ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
86        let iter = session
87            .purge_execute_iter(PreparedSelectQuery::UnstakedTxoAda)
88            .await?
89            .rows_stream::<result::PrimaryKey>()?;
90
91        Ok(iter)
92    }
93}
94
95/// Delete Unstaked TXO by Stake Address
96const DELETE_QUERY: &str = include_str!("./cql/delete_unstaked_txo_by_txn_hash.cql");
97
98/// Delete TXO by Stake Address Query
99pub(crate) struct DeleteQuery;
100
101impl DeleteQuery {
102    /// Prepare Batch of Delete Queries
103    pub(crate) async fn prepare_batch(
104        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
105    ) -> anyhow::Result<SizedBatch> {
106        PreparedQueries::prepare_batch(
107            session.clone(),
108            DELETE_QUERY,
109            cfg,
110            scylla::statement::Consistency::Any,
111            true,
112            false,
113        )
114        .await
115        .inspect_err(
116            |error| error!(error=%error, "Failed to prepare delete Unstaked TXO ADA primary key query."),
117        )
118        .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
119    }
120
121    /// Executes a DELETE Query
122    pub(crate) async fn execute(
123        session: &CassandraSession, params: Vec<Params>,
124    ) -> FallibleQueryResults {
125        let results = session
126            .purge_execute_batch(PreparedDeleteQuery::UnstakedTxoAda, params)
127            .await?;
128
129        Ok(results)
130    }
131}