cat_gateway/db/index/queries/rbac/
get_rbac_registrations.rs1use std::sync::Arc;
4
5use anyhow::Context;
6use cardano_blockchain_types::{Network, Point, Slot, TxnIndex};
7use cardano_chain_follower::ChainFollower;
8use catalyst_signed_doc::CatalystId;
9use futures::{TryFutureExt, TryStreamExt};
10use rbac_registration::{cardano::cip509::Cip509, registration::cardano::RegistrationChain};
11use scylla::{
12    prepared_statement::PreparedStatement, statement::Consistency,
13    transport::iterator::TypedRowStream, DeserializeRow, SerializeRow, Session,
14};
15use tracing::{debug, error};
16
17use crate::db::{
18    index::{
19        queries::{PreparedQueries, PreparedSelectQuery},
20        session::CassandraSession,
21    },
22    types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4},
23};
24
25const QUERY: &str = include_str!("../cql/get_rbac_registrations_catalyst_id.cql");
27
28#[derive(SerializeRow)]
30pub(crate) struct QueryParams {
31    pub catalyst_id: DbCatalystId,
33}
34
35#[derive(DeserializeRow, Clone)]
37pub(crate) struct Query {
38    #[allow(dead_code)]
40    pub txn_id: DbTransactionId,
41    pub slot_no: DbSlot,
43    pub txn_index: DbTxnIndex,
45    pub prv_txn_id: Option<DbTransactionId>,
47    #[allow(dead_code)]
49    pub purpose: DbUuidV4,
50}
51
52impl Query {
53    pub(crate) async fn prepare(session: Arc<Session>) -> anyhow::Result<PreparedStatement> {
55        PreparedQueries::prepare(session, QUERY, Consistency::All, true)
56            .await
57            .inspect_err(
58                |e| error!(error=%e, "Failed to prepare get RBAC registrations by Catalyst ID query"),
59            )
60    }
61
62    pub(crate) async fn execute(
64        session: &CassandraSession, params: QueryParams,
65    ) -> anyhow::Result<TypedRowStream<Query>> {
66        session
67            .execute_iter(PreparedSelectQuery::RbacRegistrationsByCatalystId, params)
68            .await?
69            .rows_stream::<Query>()
70            .map_err(Into::into)
71    }
72}
73
74pub(crate) async fn indexed_registrations(
77    session: &CassandraSession, catalyst_id: &CatalystId,
78) -> anyhow::Result<Vec<Query>> {
79    let mut result: Vec<_> = Query::execute(session, QueryParams {
80        catalyst_id: catalyst_id.clone().into(),
81    })
82    .and_then(|r| r.try_collect().map_err(Into::into))
83    .await?;
84
85    result.sort_by_key(|r| r.slot_no);
86    Ok(result)
87}
88
89pub(crate) async fn build_reg_chain<OnSuccessFn: FnMut(bool, Slot, &RegistrationChain)>(
93    mut reg_queries_iter: impl Iterator<Item = (bool, Query)>, network: Network,
94    mut on_success: OnSuccessFn,
95) -> anyhow::Result<Option<RegistrationChain>> {
96    let Some((is_persistent, root)) = reg_queries_iter.next() else {
97        return Ok(None);
98    };
99
100    let slot_no = root.slot_no.into();
101    let root = load_cip509_from_chain(network, root.slot_no.into(), root.txn_index.into())
102        .await
103        .context("Failed to get root registration")?;
104    let mut chain = RegistrationChain::new(root).context("Invalid root registration")?;
105    on_success(is_persistent, slot_no, &chain);
106
107    for (is_persistent, reg) in reg_queries_iter {
108        let slot_no = reg.slot_no.into();
111        let cip509 = load_cip509_from_chain(network, reg.slot_no.into(), reg.txn_index.into())
112            .await
113            .with_context(|| {
114                format!(
115                    "Invalid or missing registration at {:?} block {:?} transaction",
116                    reg.slot_no, reg.txn_index,
117                )
118            })?;
119        match chain.update(cip509) {
120            Ok(c) => {
121                chain = c;
122                on_success(is_persistent, slot_no, &chain);
123            },
124            Err(e) => {
125                debug!(
128                    "Unable to apply registration from {:?} block {:?} txn index: {e:?}",
129                    reg.slot_no, reg.txn_index
130                );
131            },
132        }
133    }
134
135    Ok(Some(chain))
136}
137
138pub(crate) async fn load_cip509_from_chain(
141    network: Network, slot: Slot, txn_index: TxnIndex,
142) -> anyhow::Result<Cip509> {
143    let point = Point::fuzzy(slot);
144    let block = ChainFollower::get_block(network, point)
145        .await
146        .context("Unable to get block")?
147        .data;
148    if block.point().slot_or_default() != slot {
149        anyhow::bail!(
152            "Unable to find exact {slot:?} block. Found block slot {:?}",
153            block.point().slot_or_default()
154        );
155    }
156    Cip509::new(&block, txn_index, &[])
158        .with_context(|| {
159            format!("Invalid RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
160        })?
161        .with_context(|| {
162            format!("No RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
163        })
164}