cat_gateway/db/index/queries/purge/
unstaked_txo_assets.rs1use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5 prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow,
6 Session,
7};
8use tracing::error;
9
10use crate::{
11 db::{
12 index::{
13 queries::{
14 purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
15 FallibleQueryResults, SizedBatch,
16 },
17 session::CassandraSession,
18 },
19 types::{DbTransactionId, DbTxnOutputOffset},
20 },
21 settings::cassandra_db,
22};
23
24pub(crate) mod result {
25 use crate::db::types::{DbTransactionId, DbTxnOutputOffset};
28
29 pub(crate) type PrimaryKey = (
31 DbTransactionId,
32 DbTxnOutputOffset,
33 Vec<u8>,
34 Vec<u8>,
35 num_bigint::BigInt,
36 );
37}
38
39const SELECT_QUERY: &str = include_str!("./cql/get_unstaked_txo_assets_by_txn_hash.cql");
41
42#[derive(SerializeRow)]
44pub(crate) struct Params {
45 pub(crate) txn_id: DbTransactionId,
47 pub(crate) txo: DbTxnOutputOffset,
49 pub(crate) policy_id: Vec<u8>,
51 pub(crate) asset_name: Vec<u8>,
53}
54
55impl Debug for Params {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("Params")
58 .field("txn_id", &self.txn_id)
59 .field("txo", &self.txo)
60 .field("policy_id", &self.policy_id)
61 .field("asset_name", &self.asset_name)
62 .finish()
63 }
64}
65
66impl From<result::PrimaryKey> for Params {
67 fn from(value: result::PrimaryKey) -> Self {
68 Self {
69 txn_id: value.0,
70 txo: value.1,
71 policy_id: value.2,
72 asset_name: value.3,
73 }
74 }
75}
76pub(crate) struct PrimaryKeyQuery;
78
79impl PrimaryKeyQuery {
80 pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
82 PreparedQueries::prepare(
83 session.clone(),
84 SELECT_QUERY,
85 scylla::statement::Consistency::All,
86 true,
87 )
88 .await
89 .inspect_err(
90 |error| error!(error=%error, "Failed to prepare get TXO Assets by TXN Hash primary key query."),
91 )
92 .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
93 }
94
95 pub(crate) async fn execute(
97 session: &CassandraSession,
98 ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
99 let iter = session
100 .purge_execute_iter(PreparedSelectQuery::UnstakedTxoAsset)
101 .await?
102 .rows_stream::<result::PrimaryKey>()?;
103
104 Ok(iter)
105 }
106}
107
108const DELETE_QUERY: &str = include_str!("./cql/delete_unstaked_txo_assets_by_txn_hash.cql");
110
111pub(crate) struct DeleteQuery;
113
114impl DeleteQuery {
115 pub(crate) async fn prepare_batch(
117 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
118 ) -> anyhow::Result<SizedBatch> {
119 PreparedQueries::prepare_batch(
120 session.clone(),
121 DELETE_QUERY,
122 cfg,
123 scylla::statement::Consistency::Any,
124 true,
125 false,
126 )
127 .await
128 .inspect_err(
129 |error| error!(error=%error, "Failed to prepare delete TXO Assets by TXN Hash primary key query."),
130 )
131 .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
132 }
133
134 pub(crate) async fn execute(
136 session: &CassandraSession, params: Vec<Params>,
137 ) -> FallibleQueryResults {
138 let results = session
139 .purge_execute_batch(PreparedDeleteQuery::UnstakedTxoAsset, params)
140 .await?;
141
142 Ok(results)
143 }
144}