cat_gateway/db/index/queries/purge/
txo_assets.rs1use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5 prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow,
6 Session,
7};
8use tracing::error;
9
10use crate::{
11 db::{
12 index::{
13 queries::{
14 purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
15 FallibleQueryResults, SizedBatch,
16 },
17 session::CassandraSession,
18 },
19 types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset},
20 },
21 settings::cassandra_db,
22};
23
24pub(crate) mod result {
25 use crate::db::types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset};
28
29 pub(crate) type PrimaryKey = (
31 DbStakeAddress,
32 DbSlot,
33 DbTxnIndex,
34 DbTxnOutputOffset,
35 Vec<u8>,
36 Vec<u8>,
37 );
38}
39
40const SELECT_QUERY: &str = include_str!("./cql/get_txo_assets_by_stake_addr.cql");
42
43#[derive(SerializeRow)]
45pub(crate) struct Params {
46 pub(crate) stake_address: DbStakeAddress,
48 pub(crate) slot_no: DbSlot,
50 pub(crate) txn_index: DbTxnIndex,
52 pub(crate) txo: DbTxnOutputOffset,
54 policy_id: Vec<u8>,
56 asset_name: Vec<u8>,
58}
59
60impl Debug for Params {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("Params")
63 .field("stake_address", &self.stake_address)
64 .field("slot_no", &self.slot_no)
65 .field("txn_index", &self.txn_index)
66 .field("txo", &self.txo)
67 .field("policy_id", &self.policy_id)
68 .field("asset_name", &self.asset_name)
69 .finish()
70 }
71}
72
73impl From<result::PrimaryKey> for Params {
74 fn from(value: result::PrimaryKey) -> Self {
75 Self {
76 stake_address: value.0,
77 slot_no: value.1,
78 txn_index: value.2,
79 txo: value.3,
80 policy_id: value.4,
81 asset_name: value.5,
82 }
83 }
84}
85pub(crate) struct PrimaryKeyQuery;
87
88impl PrimaryKeyQuery {
89 pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
91 PreparedQueries::prepare(
92 session.clone(),
93 SELECT_QUERY,
94 scylla::statement::Consistency::All,
95 true,
96 )
97 .await
98 .inspect_err(
99 |error| error!(error=%error, "Failed to prepare get TXO Assets by stake address primary key query."),
100 )
101 .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
102 }
103
104 pub(crate) async fn execute(
106 session: &CassandraSession,
107 ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
108 let iter = session
109 .purge_execute_iter(PreparedSelectQuery::TxoAssets)
110 .await?
111 .rows_stream::<result::PrimaryKey>()?;
112
113 Ok(iter)
114 }
115}
116
117const DELETE_QUERY: &str = include_str!("cql/delete_txo_assets_by_stake_address.cql");
119
120pub(crate) struct DeleteQuery;
122
123impl DeleteQuery {
124 pub(crate) async fn prepare_batch(
126 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
127 ) -> anyhow::Result<SizedBatch> {
128 PreparedQueries::prepare_batch(
129 session.clone(),
130 DELETE_QUERY,
131 cfg,
132 scylla::statement::Consistency::Any,
133 true,
134 false,
135 )
136 .await
137 .inspect_err(
138 |error| error!(error=%error, "Failed to prepare delete TXO Assets by stake address primary key query."),
139 )
140 .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
141 }
142
143 pub(crate) async fn execute(
145 session: &CassandraSession, params: Vec<Params>,
146 ) -> FallibleQueryResults {
147 let results = session
148 .purge_execute_batch(PreparedDeleteQuery::TxoAssets, params)
149 .await?;
150
151 Ok(results)
152 }
153}