cat_gateway/db/index/queries/purge/
rbac509_invalid_registration.rs1use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5    prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow,
6    Session,
7};
8use tracing::error;
9
10use crate::{
11    db::{
12        index::{
13            queries::{
14                purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
15                FallibleQueryResults, SizedBatch,
16            },
17            session::CassandraSession,
18        },
19        types::{DbCatalystId, DbTransactionId},
20    },
21    settings::cassandra_db,
22};
23
24pub(crate) mod result {
25    use crate::db::types::{DbCatalystId, DbSlot, DbTransactionId};
28
29    pub(crate) type PrimaryKey = (DbCatalystId, DbTransactionId, DbSlot);
31}
32
33const SELECT_QUERY: &str = include_str!("cql/get_rbac_invalid_registration.cql");
35
36#[derive(SerializeRow)]
38pub(crate) struct Params {
39    pub catalyst_id: DbCatalystId,
41    pub txn_id: DbTransactionId,
43}
44
45impl Debug for Params {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("Params")
48            .field("catalyst_id", &self.catalyst_id)
49            .field("txn_id", &self.txn_id)
50            .finish()
51    }
52}
53
54impl From<result::PrimaryKey> for Params {
55    fn from(value: result::PrimaryKey) -> Self {
56        Self {
57            catalyst_id: value.0,
58            txn_id: value.1,
59        }
60    }
61}
62pub(crate) struct PrimaryKeyQuery;
64
65impl PrimaryKeyQuery {
66    pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
68        PreparedQueries::prepare(
69            session.clone(),
70            SELECT_QUERY,
71            scylla::statement::Consistency::All,
72            true,
73        )
74            .await
75            .inspect_err(
76                |error| error!(error=%error, "Failed to prepare get RBAC 509 invalid registration primary key query."),
77            )
78            .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
79    }
80
81    pub(crate) async fn execute(
83        session: &CassandraSession,
84    ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
85        let iter = session
86            .purge_execute_iter(PreparedSelectQuery::Rbac509Invalid)
87            .await?
88            .rows_stream::<result::PrimaryKey>()?;
89
90        Ok(iter)
91    }
92}
93
94const DELETE_QUERY: &str = include_str!("cql/delete_rbac_invalid_registration.cql");
96
97pub(crate) struct DeleteQuery;
99
100impl DeleteQuery {
101    pub(crate) async fn prepare_batch(
103        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
104    ) -> anyhow::Result<SizedBatch> {
105        PreparedQueries::prepare_batch(
106            session.clone(),
107            DELETE_QUERY,
108            cfg,
109            scylla::statement::Consistency::Any,
110            true,
111            false,
112        )
113        .await
114        .inspect_err(
115            |error| error!(error=%error, "Failed to prepare delete RBAC 509 invalid registration primary key query."),
116        )
117        .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
118    }
119
120    pub(crate) async fn execute(
122        session: &CassandraSession, params: Vec<Params>,
123    ) -> FallibleQueryResults {
124        let results = session
125            .purge_execute_batch(PreparedDeleteQuery::Rbac509Invalid, params)
126            .await?;
127        Ok(results)
128    }
129}