cat_gateway/db/index/queries/rbac/
get_rbac_registrations.rs1use std::sync::Arc;
4
5use anyhow::Context;
6use cardano_blockchain_types::{Network, Point, Slot, TxnIndex};
7use cardano_chain_follower::ChainFollower;
8use catalyst_signed_doc::CatalystId;
9use futures::{TryFutureExt, TryStreamExt};
10use rbac_registration::{cardano::cip509::Cip509, registration::cardano::RegistrationChain};
11use scylla::{
12 prepared_statement::PreparedStatement, statement::Consistency,
13 transport::iterator::TypedRowStream, DeserializeRow, SerializeRow, Session,
14};
15use tracing::{debug, error};
16
17use crate::db::{
18 index::{
19 queries::{PreparedQueries, PreparedSelectQuery},
20 session::CassandraSession,
21 },
22 types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4},
23};
24
25const QUERY: &str = include_str!("../cql/get_rbac_registrations_catalyst_id.cql");
27
28#[derive(SerializeRow)]
30pub(crate) struct QueryParams {
31 pub catalyst_id: DbCatalystId,
33}
34
35#[derive(DeserializeRow, Clone)]
37pub(crate) struct Query {
38 #[allow(dead_code)]
40 pub txn_id: DbTransactionId,
41 pub slot_no: DbSlot,
43 pub txn_index: DbTxnIndex,
45 pub prv_txn_id: Option<DbTransactionId>,
47 #[allow(dead_code)]
49 pub purpose: DbUuidV4,
50}
51
52impl Query {
53 pub(crate) async fn prepare(session: Arc<Session>) -> anyhow::Result<PreparedStatement> {
55 PreparedQueries::prepare(session, QUERY, Consistency::All, true)
56 .await
57 .inspect_err(
58 |e| error!(error=%e, "Failed to prepare get RBAC registrations by Catalyst ID query"),
59 )
60 }
61
62 pub(crate) async fn execute(
64 session: &CassandraSession, params: QueryParams,
65 ) -> anyhow::Result<TypedRowStream<Query>> {
66 session
67 .execute_iter(PreparedSelectQuery::RbacRegistrationsByCatalystId, params)
68 .await?
69 .rows_stream::<Query>()
70 .map_err(Into::into)
71 }
72}
73
74pub(crate) async fn indexed_registrations(
77 session: &CassandraSession, catalyst_id: &CatalystId,
78) -> anyhow::Result<Vec<Query>> {
79 let mut result: Vec<_> = Query::execute(session, QueryParams {
80 catalyst_id: catalyst_id.clone().into(),
81 })
82 .and_then(|r| r.try_collect().map_err(Into::into))
83 .await?;
84
85 result.sort_by_key(|r| r.slot_no);
86 Ok(result)
87}
88
89pub(crate) async fn build_reg_chain<OnSuccessFn: FnMut(bool, Slot, &RegistrationChain)>(
93 mut reg_queries_iter: impl Iterator<Item = (bool, Query)>, network: Network,
94 mut on_success: OnSuccessFn,
95) -> anyhow::Result<Option<RegistrationChain>> {
96 let Some((is_persistent, root)) = reg_queries_iter.next() else {
97 return Ok(None);
98 };
99
100 let slot_no = root.slot_no.into();
101 let root = load_cip509_from_chain(network, root.slot_no.into(), root.txn_index.into())
102 .await
103 .context("Failed to get root registration")?;
104 let mut chain = RegistrationChain::new(root).context("Invalid root registration")?;
105 on_success(is_persistent, slot_no, &chain);
106
107 for (is_persistent, reg) in reg_queries_iter {
108 let slot_no = reg.slot_no.into();
111 let cip509 = load_cip509_from_chain(network, reg.slot_no.into(), reg.txn_index.into())
112 .await
113 .with_context(|| {
114 format!(
115 "Invalid or missing registration at {:?} block {:?} transaction",
116 reg.slot_no, reg.txn_index,
117 )
118 })?;
119 match chain.update(cip509) {
120 Ok(c) => {
121 chain = c;
122 on_success(is_persistent, slot_no, &chain);
123 },
124 Err(e) => {
125 debug!(
128 "Unable to apply registration from {:?} block {:?} txn index: {e:?}",
129 reg.slot_no, reg.txn_index
130 );
131 },
132 }
133 }
134
135 Ok(Some(chain))
136}
137
138pub(crate) async fn load_cip509_from_chain(
141 network: Network, slot: Slot, txn_index: TxnIndex,
142) -> anyhow::Result<Cip509> {
143 let point = Point::fuzzy(slot);
144 let block = ChainFollower::get_block(network, point)
145 .await
146 .context("Unable to get block")?
147 .data;
148 if block.point().slot_or_default() != slot {
149 anyhow::bail!(
152 "Unable to find exact {slot:?} block. Found block slot {:?}",
153 block.point().slot_or_default()
154 );
155 }
156 Cip509::new(&block, txn_index, &[])
158 .with_context(|| {
159 format!("Invalid RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
160 })?
161 .with_context(|| {
162 format!("No RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
163 })
164}