cat_gateway/db/index/block/rbac509/
mod.rs

1//! Index Role-Based Access Control (RBAC) Registration.
2
3pub(crate) mod insert_catalyst_id_for_stake_address;
4pub(crate) mod insert_catalyst_id_for_txn_id;
5pub(crate) mod insert_rbac509;
6pub(crate) mod insert_rbac509_invalid;
7
8use std::sync::Arc;
9
10use cardano_blockchain_types::{
11    Cip0134Uri, MultiEraBlock, Slot, StakeAddress, TransactionId, TxnIndex,
12};
13use catalyst_types::id_uri::IdUri;
14use pallas::ledger::addresses::Address;
15use rbac_registration::cardano::cip509::{Cip509, Cip509RbacMetadata};
16use scylla::Session;
17use tracing::error;
18
19use crate::{
20    db::index::{
21        queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
22        session::CassandraSession,
23    },
24    settings::cassandra_db::EnvVars,
25};
26
27/// Index RBAC 509 Registration Query Parameters
28#[derive(Debug)]
29pub(crate) struct Rbac509InsertQuery {
30    /// RBAC Registration Data captured during indexing.
31    pub(crate) registrations: Vec<insert_rbac509::Params>,
32    /// An invalid RBAC registration data.
33    pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
34    /// A Catalyst ID for transaction ID Data captured during indexing.
35    pub(crate) catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
36    /// A Catalyst ID for stake address data captured during indexing.
37    pub(crate) catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
38}
39
40impl Rbac509InsertQuery {
41    /// Creates new data set for RBAC 509 Registrations Insert Query Batch.
42    pub(crate) fn new() -> Self {
43        Rbac509InsertQuery {
44            registrations: Vec::new(),
45            invalid: Vec::new(),
46            catalyst_id_for_txn_id: Vec::new(),
47            catalyst_id_for_stake_address: Vec::new(),
48        }
49    }
50
51    /// Prepare Batch of Insert RBAC 509 Registration Data Queries
52    pub(crate) async fn prepare_batch(
53        session: &Arc<Session>, cfg: &EnvVars,
54    ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
55        Ok((
56            insert_rbac509::Params::prepare_batch(session, cfg).await?,
57            insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
58            insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
59            insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
60        ))
61    }
62
63    /// Index the RBAC 509 registrations in a transaction.
64    pub(crate) async fn index(
65        &mut self, session: &Arc<CassandraSession>, txn_hash: TransactionId, index: TxnIndex,
66        block: &MultiEraBlock,
67    ) {
68        let slot = block.slot();
69        let cip509 = match Cip509::new(block, index, &[]) {
70            Ok(Some(v)) => v,
71            Ok(None) => {
72                // Nothing to index.
73                return;
74            },
75            Err(e) => {
76                // This registration is either completely corrupted or someone else is using "our"
77                // label (`MetadatumLabel::CIP509_RBAC`). We don't want to index it even as
78                // incorrect.
79                error!(
80                    slot = ?slot,
81                    index = ?index,
82                    "Invalid RBAC Registration Metadata in transaction: {e:?}"
83                );
84                return;
85            },
86        };
87
88        // This should never happen, but let's check anyway.
89        if slot != cip509.origin().point().slot_or_default() {
90            error!(
91                "Cip509 slot mismatch: expected {slot:?}, got {:?}",
92                cip509.origin().point().slot_or_default()
93            );
94        }
95        if txn_hash != cip509.txn_hash() {
96            error!(
97                "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
98                cip509.txn_hash()
99            );
100        }
101
102        let Some(catalyst_id) = catalyst_id(session, &cip509, txn_hash, slot, index).await else {
103            error!("Unable to determine Catalyst id for registration: slot = {slot:?}, index = {index:?}, txn_hash = {txn_hash:?}");
104            return;
105        };
106
107        let previous_transaction = cip509.previous_transaction();
108        let purpose = cip509.purpose();
109        match cip509.consume() {
110            Ok((purpose, metadata, _)) => {
111                self.registrations.push(insert_rbac509::Params::new(
112                    catalyst_id.clone(),
113                    txn_hash,
114                    slot,
115                    index,
116                    purpose,
117                    previous_transaction,
118                ));
119                self.catalyst_id_for_txn_id
120                    .push(insert_catalyst_id_for_txn_id::Params::new(
121                        catalyst_id.clone(),
122                        txn_hash,
123                    ));
124                for address in stake_addresses(&metadata) {
125                    self.catalyst_id_for_stake_address.push(
126                        insert_catalyst_id_for_stake_address::Params::new(
127                            address,
128                            slot,
129                            catalyst_id.clone(),
130                        ),
131                    );
132                }
133            },
134            Err(report) => {
135                self.invalid.push(insert_rbac509_invalid::Params::new(
136                    catalyst_id,
137                    txn_hash,
138                    slot,
139                    index,
140                    purpose,
141                    previous_transaction,
142                    &report,
143                ));
144            },
145        }
146    }
147
148    /// Execute the RBAC 509 Registration Indexing Queries.
149    ///
150    /// Consumes the `self` and returns a vector of futures.
151    pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
152        let mut query_handles: FallibleQueryTasks = Vec::new();
153
154        if !self.registrations.is_empty() {
155            let inner_session = session.clone();
156            query_handles.push(tokio::spawn(async move {
157                inner_session
158                    .execute_batch(PreparedQuery::Rbac509InsertQuery, self.registrations)
159                    .await
160            }));
161        }
162
163        if !self.invalid.is_empty() {
164            let inner_session = session.clone();
165            query_handles.push(tokio::spawn(async move {
166                inner_session
167                    .execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, self.invalid)
168                    .await
169            }));
170        }
171
172        if !self.catalyst_id_for_txn_id.is_empty() {
173            let inner_session = session.clone();
174            query_handles.push(tokio::spawn(async move {
175                inner_session
176                    .execute_batch(
177                        PreparedQuery::CatalystIdForTxnIdInsertQuery,
178                        self.catalyst_id_for_txn_id,
179                    )
180                    .await
181            }));
182        }
183
184        if !self.catalyst_id_for_stake_address.is_empty() {
185            let inner_session = session.clone();
186            query_handles.push(tokio::spawn(async move {
187                inner_session
188                    .execute_batch(
189                        PreparedQuery::CatalystIdForStakeAddressInsertQuery,
190                        self.catalyst_id_for_stake_address,
191                    )
192                    .await
193            }));
194        }
195
196        query_handles
197    }
198}
199
200/// Returns a Catalyst ID of the given registration.
201async fn catalyst_id(
202    session: &Arc<CassandraSession>, cip509: &Cip509, txn_hash: TransactionId, slot: Slot,
203    index: TxnIndex,
204) -> Option<IdUri> {
205    use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::{
206        cache_for_transaction_id, Query,
207    };
208
209    let id = match cip509.previous_transaction() {
210        Some(previous) => {
211            Query::get_latest(session, previous.into())
212                .await
213                .inspect_err(|e| error!("{e:?}"))
214                .ok()
215                .flatten()?
216                .catalyst_id
217                .into()
218        },
219        None => cip509.catalyst_id()?.as_short_id(),
220    };
221
222    cache_for_transaction_id(txn_hash, id.clone(), slot, index);
223
224    Some(id)
225}
226
227/// Returns stake addresses of the role 0.
228fn stake_addresses(metadata: &Cip509RbacMetadata) -> Vec<StakeAddress> {
229    let mut result = Vec::new();
230
231    if let Some(uris) = metadata.certificate_uris.x_uris().get(&0) {
232        result.extend(convert_stake_addresses(uris));
233    }
234    if let Some(uris) = metadata.certificate_uris.c_uris().get(&0) {
235        result.extend(convert_stake_addresses(uris));
236    }
237
238    result
239}
240
241/// Converts a list of `Cip0134Uri` to a list of stake addresses.
242fn convert_stake_addresses(uris: &[Cip0134Uri]) -> Vec<StakeAddress> {
243    uris.iter()
244        .filter_map(|uri| {
245            match uri.address() {
246                Address::Stake(a) => Some(a.clone().into()),
247                _ => None,
248            }
249        })
250        .collect()
251}