cat_gateway/db/index/queries/purge/
txo_ada.rs1use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5 prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow,
6 Session,
7};
8use tracing::error;
9
10use crate::{
11 db::{
12 index::{
13 queries::{
14 purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
15 FallibleQueryResults, SizedBatch,
16 },
17 session::CassandraSession,
18 },
19 types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset},
20 },
21 settings::cassandra_db,
22};
23
24pub(crate) mod result {
25 use crate::db::types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset};
28
29 pub(crate) type PrimaryKey = (DbStakeAddress, DbSlot, DbTxnIndex, DbTxnOutputOffset);
31}
32
33const SELECT_QUERY: &str = include_str!("./cql/get_txo_by_stake_address.cql");
35
36#[derive(SerializeRow)]
38pub(crate) struct Params {
39 pub(crate) stake_address: DbStakeAddress,
41 pub(crate) slot_no: DbSlot,
43 pub(crate) txn_index: DbTxnIndex,
45 pub(crate) txo: DbTxnOutputOffset,
47}
48
49impl Debug for Params {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.debug_struct("Params")
52 .field("stake_address", &self.stake_address)
53 .field("slot_no", &self.slot_no)
54 .field("txn_index", &self.txn_index)
55 .field("txo", &self.txo)
56 .finish()
57 }
58}
59
60impl From<result::PrimaryKey> for Params {
61 fn from(value: result::PrimaryKey) -> Self {
62 Self {
63 stake_address: value.0,
64 slot_no: value.1,
65 txn_index: value.2,
66 txo: value.3,
67 }
68 }
69}
70pub(crate) struct PrimaryKeyQuery;
72
73impl PrimaryKeyQuery {
74 pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
76 PreparedQueries::prepare(
77 session.clone(),
78 SELECT_QUERY,
79 scylla::statement::Consistency::All,
80 true,
81 )
82 .await
83 .inspect_err(
84 |error| error!(error=%error, "Failed to prepare get TXO by stake address primary key query."),
85 )
86 .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
87 }
88
89 pub(crate) async fn execute(
91 session: &CassandraSession,
92 ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
93 let iter = session
94 .purge_execute_iter(PreparedSelectQuery::TxoAda)
95 .await?
96 .rows_stream::<result::PrimaryKey>()?;
97
98 Ok(iter)
99 }
100}
101
102const DELETE_QUERY: &str = include_str!("cql/delete_txo_by_stake_address.cql");
104
105pub(crate) struct DeleteQuery;
107
108impl DeleteQuery {
109 pub(crate) async fn prepare_batch(
111 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
112 ) -> anyhow::Result<SizedBatch> {
113 PreparedQueries::prepare_batch(
114 session.clone(),
115 DELETE_QUERY,
116 cfg,
117 scylla::statement::Consistency::Any,
118 true,
119 false,
120 )
121 .await
122 .inspect_err(
123 |error| error!(error=%error, "Failed to prepare delete TXO by stake address primary key query."),
124 )
125 .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
126 }
127
128 pub(crate) async fn execute(
130 session: &CassandraSession, params: Vec<Params>,
131 ) -> FallibleQueryResults {
132 let results = session
133 .purge_execute_batch(PreparedDeleteQuery::TxoAda, params)
134 .await?;
135
136 Ok(results)
137 }
138}