cat_gateway/db/index/queries/purge/
unstaked_txo_ada.rs1use std::{fmt::Debug, sync::Arc};
4
5use scylla::{
6 prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow,
7 Session,
8};
9use tracing::error;
10
11use crate::{
12 db::{
13 index::{
14 queries::{
15 purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
16 FallibleQueryResults, SizedBatch,
17 },
18 session::CassandraSession,
19 },
20 types::{DbTransactionId, DbTxnOutputOffset},
21 },
22 settings::cassandra_db,
23};
24
25pub(crate) mod result {
26 use crate::db::types::{DbTransactionId, DbTxnOutputOffset};
29
30 pub(crate) type PrimaryKey = (DbTransactionId, DbTxnOutputOffset, num_bigint::BigInt);
32}
33
34const SELECT_QUERY: &str = include_str!("./cql/get_unstaked_txo_by_txn_hash.cql");
36
37#[derive(SerializeRow)]
39pub(crate) struct Params {
40 pub(crate) txn_id: DbTransactionId,
42 pub(crate) txo: DbTxnOutputOffset,
44}
45
46impl Debug for Params {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("Params")
49 .field("txn_id", &self.txn_id)
50 .field("txo", &self.txo)
51 .finish()
52 }
53}
54
55impl From<result::PrimaryKey> for Params {
56 fn from(value: result::PrimaryKey) -> Self {
57 Self {
58 txn_id: value.0,
59 txo: value.1,
60 }
61 }
62}
63pub(crate) struct PrimaryKeyQuery;
65
66impl PrimaryKeyQuery {
67 pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
69 PreparedQueries::prepare(
70 session.clone(),
71 SELECT_QUERY,
72 scylla::statement::Consistency::All,
73 true,
74 )
75 .await
76 .inspect_err(
77 |error| error!(error=%error, "Failed to prepare get Unstaked TXO ADA primary key query."),
78 )
79 .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
80 }
81
82 pub(crate) async fn execute(
84 session: &CassandraSession,
85 ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
86 let iter = session
87 .purge_execute_iter(PreparedSelectQuery::UnstakedTxoAda)
88 .await?
89 .rows_stream::<result::PrimaryKey>()?;
90
91 Ok(iter)
92 }
93}
94
95const DELETE_QUERY: &str = include_str!("./cql/delete_unstaked_txo_by_txn_hash.cql");
97
98pub(crate) struct DeleteQuery;
100
101impl DeleteQuery {
102 pub(crate) async fn prepare_batch(
104 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
105 ) -> anyhow::Result<SizedBatch> {
106 PreparedQueries::prepare_batch(
107 session.clone(),
108 DELETE_QUERY,
109 cfg,
110 scylla::statement::Consistency::Any,
111 true,
112 false,
113 )
114 .await
115 .inspect_err(
116 |error| error!(error=%error, "Failed to prepare delete Unstaked TXO ADA primary key query."),
117 )
118 .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
119 }
120
121 pub(crate) async fn execute(
123 session: &CassandraSession, params: Vec<Params>,
124 ) -> FallibleQueryResults {
125 let results = session
126 .purge_execute_batch(PreparedDeleteQuery::UnstakedTxoAda, params)
127 .await?;
128
129 Ok(results)
130 }
131}