cat_gateway/db/index/queries/purge/
cip36_registration_invalid.rs1use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5 prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow,
6 Session,
7};
8use tracing::error;
9
10use crate::{
11 db::{
12 index::{
13 queries::{
14 purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
15 FallibleQueryResults, SizedBatch,
16 },
17 session::CassandraSession,
18 },
19 types::{DbSlot, DbTxnIndex},
20 },
21 settings::cassandra_db,
22};
23
24pub(crate) mod result {
25 use crate::db::types::{DbSlot, DbTxnIndex};
28
29 pub(crate) type PrimaryKey = (Vec<u8>, DbSlot, DbTxnIndex);
31}
32
33const SELECT_QUERY: &str = include_str!("./cql/get_cip36_registration_invalid.cql");
35
36#[derive(SerializeRow)]
38pub(crate) struct Params {
39 pub(crate) stake_public_key: Vec<u8>,
41 pub(crate) slot_no: DbSlot,
43 pub(crate) txn_index: DbTxnIndex,
45}
46
47impl Debug for Params {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("Params")
50 .field("stake_public_key", &self.stake_public_key)
51 .field("slot_no", &self.slot_no)
52 .field("txn_index", &self.txn_index)
53 .finish()
54 }
55}
56
57impl From<result::PrimaryKey> for Params {
58 fn from(value: result::PrimaryKey) -> Self {
59 Self {
60 stake_public_key: value.0,
61 slot_no: value.1,
62 txn_index: value.2,
63 }
64 }
65}
66pub(crate) struct PrimaryKeyQuery;
68
69impl PrimaryKeyQuery {
70 pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
72 PreparedQueries::prepare(
73 session.clone(),
74 SELECT_QUERY,
75 scylla::statement::Consistency::All,
76 true,
77 )
78 .await
79 .inspect_err(
80 |error| error!(error=%error, "Failed to prepare get CIP-36 invalid registration primary key query."),
81 )
82 .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
83 }
84
85 pub(crate) async fn execute(
87 session: &CassandraSession,
88 ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
89 let iter = session
90 .purge_execute_iter(PreparedSelectQuery::Cip36RegistrationInvalid)
91 .await?
92 .rows_stream::<result::PrimaryKey>()?;
93
94 Ok(iter)
95 }
96}
97
98const DELETE_QUERY: &str = include_str!("./cql/delete_cip36_registration_invalid.cql");
100
101pub(crate) struct DeleteQuery;
103
104impl DeleteQuery {
105 pub(crate) async fn prepare_batch(
107 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
108 ) -> anyhow::Result<SizedBatch> {
109 PreparedQueries::prepare_batch(
110 session.clone(),
111 DELETE_QUERY,
112 cfg,
113 scylla::statement::Consistency::Any,
114 true,
115 false,
116 )
117 .await
118 .inspect_err(
119 |error| error!(error=%error, "Failed to prepare delete CIP-36 invalid registration primary key query."),
120 )
121 .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
122 }
123
124 pub(crate) async fn execute(
126 session: &CassandraSession, params: Vec<Params>,
127 ) -> FallibleQueryResults {
128 let results = session
129 .purge_execute_batch(PreparedDeleteQuery::Cip36RegistrationInvalid, params)
130 .await?;
131
132 Ok(results)
133 }
134}