cat_gateway/db/index/queries/purge/
unstaked_txo_assets.rs

1//! TXO Assets by TXN Hash Queries used in purging data.
2use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5    prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow,
6    Session,
7};
8use tracing::error;
9
10use crate::{
11    db::{
12        index::{
13            queries::{
14                purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
15                FallibleQueryResults, SizedBatch,
16            },
17            session::CassandraSession,
18        },
19        types::{DbTransactionId, DbTxnOutputOffset},
20    },
21    settings::cassandra_db,
22};
23
24pub(crate) mod result {
25    //! Return values for TXO Assets by TXN Hash purge queries.
26
27    use crate::db::types::{DbTransactionId, DbTxnOutputOffset};
28
29    /// Primary Key Row
30    pub(crate) type PrimaryKey = (
31        DbTransactionId,
32        DbTxnOutputOffset,
33        Vec<u8>,
34        Vec<u8>,
35        num_bigint::BigInt,
36    );
37}
38
39/// Select primary keys for TXO Assets by TXN Hash.
40const SELECT_QUERY: &str = include_str!("./cql/get_unstaked_txo_assets_by_txn_hash.cql");
41
42/// Primary Key Value.
43#[derive(SerializeRow)]
44pub(crate) struct Params {
45    /// 32 byte hash of this transaction.
46    pub(crate) txn_id: DbTransactionId,
47    /// Offset in the txo list of the transaction the txo is in.
48    pub(crate) txo: DbTxnOutputOffset,
49    /// Asset Policy Hash - Binary 28 bytes.
50    pub(crate) policy_id: Vec<u8>,
51    /// Name of the asset, within the Policy.
52    pub(crate) asset_name: Vec<u8>,
53}
54
55impl Debug for Params {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("Params")
58            .field("txn_id", &self.txn_id)
59            .field("txo", &self.txo)
60            .field("policy_id", &self.policy_id)
61            .field("asset_name", &self.asset_name)
62            .finish()
63    }
64}
65
66impl From<result::PrimaryKey> for Params {
67    fn from(value: result::PrimaryKey) -> Self {
68        Self {
69            txn_id: value.0,
70            txo: value.1,
71            policy_id: value.2,
72            asset_name: value.3,
73        }
74    }
75}
76/// Get primary key for TXO Assets by TXN Hash query.
77pub(crate) struct PrimaryKeyQuery;
78
79impl PrimaryKeyQuery {
80    /// Prepares a query to get all TXO Assets by TXN Hash primary keys.
81    pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
82        PreparedQueries::prepare(
83            session.clone(),
84            SELECT_QUERY,
85            scylla::statement::Consistency::All,
86            true,
87        )
88        .await
89        .inspect_err(
90            |error| error!(error=%error, "Failed to prepare get TXO Assets by TXN Hash primary key query."),
91        )
92        .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
93    }
94
95    /// Executes a query to get all TXO Assets by TXN Hash primary keys.
96    pub(crate) async fn execute(
97        session: &CassandraSession,
98    ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
99        let iter = session
100            .purge_execute_iter(PreparedSelectQuery::UnstakedTxoAsset)
101            .await?
102            .rows_stream::<result::PrimaryKey>()?;
103
104        Ok(iter)
105    }
106}
107
108/// Delete TXO Assets by TXN Hash
109const DELETE_QUERY: &str = include_str!("./cql/delete_unstaked_txo_assets_by_txn_hash.cql");
110
111/// Delete TXO Assets by TXN Hash Query
112pub(crate) struct DeleteQuery;
113
114impl DeleteQuery {
115    /// Prepare Batch of Delete Queries
116    pub(crate) async fn prepare_batch(
117        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
118    ) -> anyhow::Result<SizedBatch> {
119        PreparedQueries::prepare_batch(
120            session.clone(),
121            DELETE_QUERY,
122            cfg,
123            scylla::statement::Consistency::Any,
124            true,
125            false,
126        )
127        .await
128        .inspect_err(
129            |error| error!(error=%error, "Failed to prepare delete TXO Assets by TXN Hash primary key query."),
130        )
131        .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
132    }
133
134    /// Executes a DELETE Query
135    pub(crate) async fn execute(
136        session: &CassandraSession, params: Vec<Params>,
137    ) -> FallibleQueryResults {
138        let results = session
139            .purge_execute_batch(PreparedDeleteQuery::UnstakedTxoAsset, params)
140            .await?;
141
142        Ok(results)
143    }
144}