cat_gateway/db/index/queries/purge/
txi_by_hash.rs

1//! Transaction Inputs (ADA or a native asset) queries used in purging data.
2use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5    prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow,
6    Session,
7};
8use tracing::error;
9
10use crate::{
11    db::{
12        index::{
13            queries::{
14                purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
15                FallibleQueryResults, SizedBatch,
16            },
17            session::CassandraSession,
18        },
19        types::{DbTransactionId, DbTxnOutputOffset},
20    },
21    settings::cassandra_db,
22};
23
24pub(crate) mod result {
25    //! Return values for TXI by hash purge queries.
26
27    use crate::db::types::{DbSlot, DbTransactionId, DbTxnOutputOffset};
28
29    /// Primary Key Row
30    pub(crate) type PrimaryKey = (DbTransactionId, DbTxnOutputOffset, DbSlot);
31}
32
33/// Select primary keys for TXI by hash.
34const SELECT_QUERY: &str = include_str!("./cql/get_txi_by_txn_hashes.cql");
35
36/// Primary Key Value.
37#[derive(SerializeRow)]
38pub(crate) struct Params {
39    /// 32 byte hash of this transaction.
40    pub(crate) txn_id: DbTransactionId,
41    /// Transaction Output Offset inside the transaction.
42    pub(crate) txo: DbTxnOutputOffset,
43}
44
45impl Debug for Params {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("Params")
48            .field("txn_id", &self.txn_id.to_string())
49            .field("txo", &self.txo)
50            .finish()
51    }
52}
53
54impl From<result::PrimaryKey> for Params {
55    fn from(value: result::PrimaryKey) -> Self {
56        Self {
57            txn_id: value.0,
58            txo: value.1,
59        }
60    }
61}
62/// Get primary key for TXI by hash query.
63pub(crate) struct PrimaryKeyQuery;
64
65impl PrimaryKeyQuery {
66    /// Prepares a query to get all TXI by hash primary keys.
67    pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
68        PreparedQueries::prepare(
69            session.clone(),
70            SELECT_QUERY,
71            scylla::statement::Consistency::All,
72            true,
73        )
74        .await
75        .inspect_err(
76            |error| error!(error=%error, "Failed to prepare get TXI by hash primary key query."),
77        )
78        .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
79    }
80
81    /// Executes a query to get all TXI by hash primary keys.
82    pub(crate) async fn execute(
83        session: &CassandraSession,
84    ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
85        let iter = session
86            .purge_execute_iter(PreparedSelectQuery::Txi)
87            .await?
88            .rows_stream::<result::PrimaryKey>()?;
89
90        Ok(iter)
91    }
92}
93
94/// Delete TXI by hash Query
95const DELETE_QUERY: &str = include_str!("./cql/delete_txi_by_txn_hashes.cql");
96
97/// Delete TXI by hash Query
98pub(crate) struct DeleteQuery;
99
100impl DeleteQuery {
101    /// Prepare Batch of Delete Queries
102    pub(crate) async fn prepare_batch(
103        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
104    ) -> anyhow::Result<SizedBatch> {
105        PreparedQueries::prepare_batch(
106            session.clone(),
107            DELETE_QUERY,
108            cfg,
109            scylla::statement::Consistency::Any,
110            true,
111            false,
112        )
113        .await
114        .inspect_err(
115            |error| error!(error=%error, "Failed to prepare delete TXI by hash primary key query."),
116        )
117        .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
118    }
119
120    /// Executes a DELETE Query
121    pub(crate) async fn execute(
122        session: &CassandraSession, params: Vec<Params>,
123    ) -> FallibleQueryResults {
124        let results = session
125            .purge_execute_batch(PreparedDeleteQuery::Txi, params)
126            .await?;
127
128        Ok(results)
129    }
130}