cat_gateway/db/index/queries/purge/
txi_by_hash.rs1use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5 prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow,
6 Session,
7};
8use tracing::error;
9
10use crate::{
11 db::{
12 index::{
13 queries::{
14 purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
15 FallibleQueryResults, SizedBatch,
16 },
17 session::CassandraSession,
18 },
19 types::{DbTransactionId, DbTxnOutputOffset},
20 },
21 settings::cassandra_db,
22};
23
24pub(crate) mod result {
25 use crate::db::types::{DbSlot, DbTransactionId, DbTxnOutputOffset};
28
29 pub(crate) type PrimaryKey = (DbTransactionId, DbTxnOutputOffset, DbSlot);
31}
32
33const SELECT_QUERY: &str = include_str!("./cql/get_txi_by_txn_hashes.cql");
35
36#[derive(SerializeRow)]
38pub(crate) struct Params {
39 pub(crate) txn_id: DbTransactionId,
41 pub(crate) txo: DbTxnOutputOffset,
43}
44
45impl Debug for Params {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("Params")
48 .field("txn_id", &self.txn_id.to_string())
49 .field("txo", &self.txo)
50 .finish()
51 }
52}
53
54impl From<result::PrimaryKey> for Params {
55 fn from(value: result::PrimaryKey) -> Self {
56 Self {
57 txn_id: value.0,
58 txo: value.1,
59 }
60 }
61}
62pub(crate) struct PrimaryKeyQuery;
64
65impl PrimaryKeyQuery {
66 pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
68 PreparedQueries::prepare(
69 session.clone(),
70 SELECT_QUERY,
71 scylla::statement::Consistency::All,
72 true,
73 )
74 .await
75 .inspect_err(
76 |error| error!(error=%error, "Failed to prepare get TXI by hash primary key query."),
77 )
78 .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
79 }
80
81 pub(crate) async fn execute(
83 session: &CassandraSession,
84 ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
85 let iter = session
86 .purge_execute_iter(PreparedSelectQuery::Txi)
87 .await?
88 .rows_stream::<result::PrimaryKey>()?;
89
90 Ok(iter)
91 }
92}
93
94const DELETE_QUERY: &str = include_str!("./cql/delete_txi_by_txn_hashes.cql");
96
97pub(crate) struct DeleteQuery;
99
100impl DeleteQuery {
101 pub(crate) async fn prepare_batch(
103 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
104 ) -> anyhow::Result<SizedBatch> {
105 PreparedQueries::prepare_batch(
106 session.clone(),
107 DELETE_QUERY,
108 cfg,
109 scylla::statement::Consistency::Any,
110 true,
111 false,
112 )
113 .await
114 .inspect_err(
115 |error| error!(error=%error, "Failed to prepare delete TXI by hash primary key query."),
116 )
117 .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
118 }
119
120 pub(crate) async fn execute(
122 session: &CassandraSession, params: Vec<Params>,
123 ) -> FallibleQueryResults {
124 let results = session
125 .purge_execute_batch(PreparedDeleteQuery::Txi, params)
126 .await?;
127
128 Ok(results)
129 }
130}