cat_gateway/db/index/queries/purge/
txo_assets.rs

1//! TXO Assets by Stake Address Queries used in purging data.
2use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5    prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow,
6    Session,
7};
8use tracing::error;
9
10use crate::{
11    db::{
12        index::{
13            queries::{
14                purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
15                FallibleQueryResults, SizedBatch,
16            },
17            session::CassandraSession,
18        },
19        types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset},
20    },
21    settings::cassandra_db,
22};
23
24pub(crate) mod result {
25    //! Return values for TXO Assets by Stake Address purge queries.
26
27    use crate::db::types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset};
28
29    /// Primary Key Row
30    pub(crate) type PrimaryKey = (
31        DbStakeAddress,
32        DbSlot,
33        DbTxnIndex,
34        DbTxnOutputOffset,
35        Vec<u8>,
36        Vec<u8>,
37    );
38}
39
40/// Select primary keys for TXO Assets by Stake Address.
41const SELECT_QUERY: &str = include_str!("./cql/get_txo_assets_by_stake_addr.cql");
42
43/// Primary Key Value.
44#[derive(SerializeRow)]
45pub(crate) struct Params {
46    /// Stake Address - Binary 29 bytes.
47    pub(crate) stake_address: DbStakeAddress,
48    /// Block Slot Number
49    pub(crate) slot_no: DbSlot,
50    /// Transaction Offset inside the block.
51    pub(crate) txn_index: DbTxnIndex,
52    /// Transaction Output Offset inside the transaction.
53    pub(crate) txo: DbTxnOutputOffset,
54    /// Asset Policy Hash - Binary 28 bytes.
55    policy_id: Vec<u8>,
56    /// Name of the asset, within the Policy.
57    asset_name: Vec<u8>,
58}
59
60impl Debug for Params {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("Params")
63            .field("stake_address", &self.stake_address)
64            .field("slot_no", &self.slot_no)
65            .field("txn_index", &self.txn_index)
66            .field("txo", &self.txo)
67            .field("policy_id", &self.policy_id)
68            .field("asset_name", &self.asset_name)
69            .finish()
70    }
71}
72
73impl From<result::PrimaryKey> for Params {
74    fn from(value: result::PrimaryKey) -> Self {
75        Self {
76            stake_address: value.0,
77            slot_no: value.1,
78            txn_index: value.2,
79            txo: value.3,
80            policy_id: value.4,
81            asset_name: value.5,
82        }
83    }
84}
85/// Get primary key for TXO Assets by Stake Address query.
86pub(crate) struct PrimaryKeyQuery;
87
88impl PrimaryKeyQuery {
89    /// Prepares a query to get all TXO Assets by stake address primary keys.
90    pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
91        PreparedQueries::prepare(
92            session.clone(),
93            SELECT_QUERY,
94            scylla::statement::Consistency::All,
95            true,
96        )
97        .await
98        .inspect_err(
99            |error| error!(error=%error, "Failed to prepare get TXO Assets by stake address primary key query."),
100        )
101        .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
102    }
103
104    /// Executes a query to get all TXO Assets by stake address primary keys.
105    pub(crate) async fn execute(
106        session: &CassandraSession,
107    ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
108        let iter = session
109            .purge_execute_iter(PreparedSelectQuery::TxoAssets)
110            .await?
111            .rows_stream::<result::PrimaryKey>()?;
112
113        Ok(iter)
114    }
115}
116
117/// Delete TXO Assets by Stake Address
118const DELETE_QUERY: &str = include_str!("cql/delete_txo_assets_by_stake_address.cql");
119
120/// Delete TXO Assets by Stake Address Query
121pub(crate) struct DeleteQuery;
122
123impl DeleteQuery {
124    /// Prepare Batch of Delete Queries
125    pub(crate) async fn prepare_batch(
126        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
127    ) -> anyhow::Result<SizedBatch> {
128        PreparedQueries::prepare_batch(
129            session.clone(),
130            DELETE_QUERY,
131            cfg,
132            scylla::statement::Consistency::Any,
133            true,
134            false,
135        )
136        .await
137        .inspect_err(
138            |error| error!(error=%error, "Failed to prepare delete TXO Assets by stake address primary key query."),
139        )
140        .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
141    }
142
143    /// Executes a DELETE Query
144    pub(crate) async fn execute(
145        session: &CassandraSession, params: Vec<Params>,
146    ) -> FallibleQueryResults {
147        let results = session
148            .purge_execute_batch(PreparedDeleteQuery::TxoAssets, params)
149            .await?;
150
151        Ok(results)
152    }
153}