cat_gateway/db/index/queries/rbac/
get_rbac_registrations.rs

1//! Get RBAC registrations by Catalyst ID.
2
3use std::sync::Arc;
4
5use anyhow::Context;
6use cardano_blockchain_types::{Network, Point, Slot, TxnIndex};
7use cardano_chain_follower::ChainFollower;
8use catalyst_signed_doc::CatalystId;
9use futures::{TryFutureExt, TryStreamExt};
10use rbac_registration::{cardano::cip509::Cip509, registration::cardano::RegistrationChain};
11use scylla::{
12    prepared_statement::PreparedStatement, statement::Consistency,
13    transport::iterator::TypedRowStream, DeserializeRow, SerializeRow, Session,
14};
15use tracing::{debug, error};
16
17use crate::db::{
18    index::{
19        queries::{PreparedQueries, PreparedSelectQuery},
20        session::CassandraSession,
21    },
22    types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4},
23};
24
25/// Get registrations by Catalyst ID query.
26const QUERY: &str = include_str!("../cql/get_rbac_registrations_catalyst_id.cql");
27
28/// Get registrations by Catalyst ID query params.
29#[derive(SerializeRow)]
30pub(crate) struct QueryParams {
31    /// A Catalyst ID.
32    pub catalyst_id: DbCatalystId,
33}
34
35/// Get registrations by Catalyst ID query.
36#[derive(DeserializeRow, Clone)]
37pub(crate) struct Query {
38    /// Registration transaction id.
39    #[allow(dead_code)]
40    pub txn_id: DbTransactionId,
41    /// A block slot number.
42    pub slot_no: DbSlot,
43    /// A transaction index.
44    pub txn_index: DbTxnIndex,
45    /// A previous  transaction id.
46    pub prv_txn_id: Option<DbTransactionId>,
47    /// A registration purpose.
48    #[allow(dead_code)]
49    pub purpose: DbUuidV4,
50}
51
52impl Query {
53    /// Prepares a query.
54    pub(crate) async fn prepare(session: Arc<Session>) -> anyhow::Result<PreparedStatement> {
55        PreparedQueries::prepare(session, QUERY, Consistency::All, true)
56            .await
57            .inspect_err(
58                |e| error!(error=%e, "Failed to prepare get RBAC registrations by Catalyst ID query"),
59            )
60    }
61
62    /// Executes a get registrations by Catalyst ID query.
63    pub(crate) async fn execute(
64        session: &CassandraSession, params: QueryParams,
65    ) -> anyhow::Result<TypedRowStream<Query>> {
66        session
67            .execute_iter(PreparedSelectQuery::RbacRegistrationsByCatalystId, params)
68            .await?
69            .rows_stream::<Query>()
70            .map_err(Into::into)
71    }
72}
73
74/// Returns a sorted list of all registrations for the given Catalyst ID from the
75/// database.
76pub(crate) async fn indexed_registrations(
77    session: &CassandraSession, catalyst_id: &CatalystId,
78) -> anyhow::Result<Vec<Query>> {
79    let mut result: Vec<_> = Query::execute(session, QueryParams {
80        catalyst_id: catalyst_id.clone().into(),
81    })
82    .and_then(|r| r.try_collect().map_err(Into::into))
83    .await?;
84
85    result.sort_by_key(|r| r.slot_no);
86    Ok(result)
87}
88
89/// Build a registration chain from the given indexed data.
90///
91/// # NOTE: provided `reg_queries` must be sorted by `slot_no`, look into `indexed_registrations` function.
92pub(crate) async fn build_reg_chain<OnSuccessFn: FnMut(bool, Slot, &RegistrationChain)>(
93    mut reg_queries_iter: impl Iterator<Item = (bool, Query)>, network: Network,
94    mut on_success: OnSuccessFn,
95) -> anyhow::Result<Option<RegistrationChain>> {
96    let Some((is_persistent, root)) = reg_queries_iter.next() else {
97        return Ok(None);
98    };
99
100    let slot_no = root.slot_no.into();
101    let root = load_cip509_from_chain(network, root.slot_no.into(), root.txn_index.into())
102        .await
103        .context("Failed to get root registration")?;
104    let mut chain = RegistrationChain::new(root).context("Invalid root registration")?;
105    on_success(is_persistent, slot_no, &chain);
106
107    for (is_persistent, reg) in reg_queries_iter {
108        // We only store valid registrations in this table, so an error here indicates a bug in
109        // our indexing logic.
110        let slot_no = reg.slot_no.into();
111        let cip509 = load_cip509_from_chain(network, reg.slot_no.into(), reg.txn_index.into())
112            .await
113            .with_context(|| {
114                format!(
115                    "Invalid or missing registration at {:?} block {:?} transaction",
116                    reg.slot_no, reg.txn_index,
117                )
118            })?;
119        match chain.update(cip509) {
120            Ok(c) => {
121                chain = c;
122                on_success(is_persistent, slot_no, &chain);
123            },
124            Err(e) => {
125                // This isn't a hard error because while the individual registration can be valid it
126                // can be invalid in the context of the whole registration chain.
127                debug!(
128                    "Unable to apply registration from {:?} block {:?} txn index: {e:?}",
129                    reg.slot_no, reg.txn_index
130                );
131            },
132        }
133    }
134
135    Ok(Some(chain))
136}
137
138/// A helper function to load a RBAC registration `Cip509` by the given block and slot
139/// from the `cardano-chain-follower`.
140pub(crate) async fn load_cip509_from_chain(
141    network: Network, slot: Slot, txn_index: TxnIndex,
142) -> anyhow::Result<Cip509> {
143    let point = Point::fuzzy(slot);
144    let block = ChainFollower::get_block(network, point)
145        .await
146        .context("Unable to get block")?
147        .data;
148    if block.point().slot_or_default() != slot {
149        // The `ChainFollower::get_block` function can return the next consecutive block if it
150        // cannot find the exact one. This shouldn't happen, but we need to check anyway.
151        anyhow::bail!(
152            "Unable to find exact {slot:?} block. Found block slot {:?}",
153            block.point().slot_or_default()
154        );
155    }
156    // We perform validation during indexing, so this normally should never fail.
157    Cip509::new(&block, txn_index, &[])
158        .with_context(|| {
159            format!("Invalid RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
160        })?
161        .with_context(|| {
162            format!("No RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
163        })
164}