cat_gateway/db/index/block/rbac509/
mod.rs

1//! Index Role-Based Access Control (RBAC) Registration.
2
3pub(crate) mod insert_catalyst_id_for_stake_address;
4pub(crate) mod insert_catalyst_id_for_txn_id;
5pub(crate) mod insert_rbac509;
6pub(crate) mod insert_rbac509_invalid;
7
8use std::sync::Arc;
9
10use anyhow::{bail, Context};
11use cardano_blockchain_types::{
12    Cip0134Uri, MultiEraBlock, Slot, StakeAddress, TransactionId, TxnIndex,
13};
14use catalyst_types::catalyst_id::CatalystId;
15use pallas::ledger::addresses::Address;
16use rbac_registration::cardano::cip509::{Cip509, Cip509RbacMetadata};
17use scylla::Session;
18use tracing::{debug, error};
19
20use crate::{
21    db::index::{
22        queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
23        session::CassandraSession,
24    },
25    settings::cassandra_db::EnvVars,
26};
27
28/// Index RBAC 509 Registration Query Parameters
29#[derive(Debug)]
30pub(crate) struct Rbac509InsertQuery {
31    /// RBAC Registration Data captured during indexing.
32    pub(crate) registrations: Vec<insert_rbac509::Params>,
33    /// An invalid RBAC registration data.
34    pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
35    /// A Catalyst ID for transaction ID Data captured during indexing.
36    pub(crate) catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
37    /// A Catalyst ID for stake address data captured during indexing.
38    pub(crate) catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
39}
40
41impl Rbac509InsertQuery {
42    /// Creates new data set for RBAC 509 Registrations Insert Query Batch.
43    pub(crate) fn new() -> Self {
44        Rbac509InsertQuery {
45            registrations: Vec::new(),
46            invalid: Vec::new(),
47            catalyst_id_for_txn_id: Vec::new(),
48            catalyst_id_for_stake_address: Vec::new(),
49        }
50    }
51
52    /// Prepare Batch of Insert RBAC 509 Registration Data Queries
53    pub(crate) async fn prepare_batch(
54        session: &Arc<Session>, cfg: &EnvVars,
55    ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
56        Ok((
57            insert_rbac509::Params::prepare_batch(session, cfg).await?,
58            insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
59            insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
60            insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
61        ))
62    }
63
64    /// Index the RBAC 509 registrations in a transaction.
65    pub(crate) async fn index(
66        &mut self, session: &Arc<CassandraSession>, txn_hash: TransactionId, index: TxnIndex,
67        block: &MultiEraBlock,
68    ) {
69        let slot = block.slot();
70        let cip509 = match Cip509::new(block, index, &[]) {
71            Ok(Some(v)) => v,
72            Ok(None) => {
73                // Nothing to index.
74                return;
75            },
76            Err(e) => {
77                // This registration is either completely corrupted or someone else is using "our"
78                // label (`MetadatumLabel::CIP509_RBAC`). We don't want to index it even as
79                // incorrect.
80                debug!(
81                    slot = ?slot,
82                    index = ?index,
83                    "Invalid RBAC Registration Metadata in transaction: {e:?}"
84                );
85                return;
86            },
87        };
88
89        // This should never happen, but let's check anyway.
90        if slot != cip509.origin().point().slot_or_default() {
91            error!(
92                "Cip509 slot mismatch: expected {slot:?}, got {:?}",
93                cip509.origin().point().slot_or_default()
94            );
95        }
96        if txn_hash != cip509.txn_hash() {
97            error!(
98                "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
99                cip509.txn_hash()
100            );
101        }
102
103        let catalyst_id = match catalyst_id(
104            session,
105            &cip509,
106            txn_hash,
107            slot,
108            index,
109            block.is_immutable(),
110        )
111        .await
112        {
113            Ok(v) => v,
114            Err(e) => {
115                debug!("Unable to determine Catalyst id for registration: slot = {slot:?}, index = {index:?}, txn_hash = {txn_hash:?}: {e:?}");
116                return;
117            },
118        };
119
120        let previous_transaction = cip509.previous_transaction();
121        let purpose = cip509.purpose();
122        match cip509.consume() {
123            Ok((purpose, metadata, _)) => {
124                self.registrations.push(insert_rbac509::Params::new(
125                    catalyst_id.clone(),
126                    txn_hash,
127                    slot,
128                    index,
129                    purpose,
130                    previous_transaction,
131                ));
132                self.catalyst_id_for_txn_id
133                    .push(insert_catalyst_id_for_txn_id::Params::new(
134                        catalyst_id.clone(),
135                        txn_hash,
136                    ));
137                for address in stake_addresses(&metadata) {
138                    self.catalyst_id_for_stake_address.push(
139                        insert_catalyst_id_for_stake_address::Params::new(
140                            address,
141                            slot,
142                            catalyst_id.clone(),
143                        ),
144                    );
145                }
146            },
147            Err(report) => {
148                self.invalid.push(insert_rbac509_invalid::Params::new(
149                    catalyst_id,
150                    txn_hash,
151                    slot,
152                    index,
153                    purpose,
154                    previous_transaction,
155                    &report,
156                ));
157            },
158        }
159    }
160
161    /// Execute the RBAC 509 Registration Indexing Queries.
162    ///
163    /// Consumes the `self` and returns a vector of futures.
164    pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
165        let mut query_handles: FallibleQueryTasks = Vec::new();
166
167        if !self.registrations.is_empty() {
168            let inner_session = session.clone();
169            query_handles.push(tokio::spawn(async move {
170                inner_session
171                    .execute_batch(PreparedQuery::Rbac509InsertQuery, self.registrations)
172                    .await
173            }));
174        }
175
176        if !self.invalid.is_empty() {
177            let inner_session = session.clone();
178            query_handles.push(tokio::spawn(async move {
179                inner_session
180                    .execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, self.invalid)
181                    .await
182            }));
183        }
184
185        if !self.catalyst_id_for_txn_id.is_empty() {
186            let inner_session = session.clone();
187            query_handles.push(tokio::spawn(async move {
188                inner_session
189                    .execute_batch(
190                        PreparedQuery::CatalystIdForTxnIdInsertQuery,
191                        self.catalyst_id_for_txn_id,
192                    )
193                    .await
194            }));
195        }
196
197        if !self.catalyst_id_for_stake_address.is_empty() {
198            let inner_session = session.clone();
199            query_handles.push(tokio::spawn(async move {
200                inner_session
201                    .execute_batch(
202                        PreparedQuery::CatalystIdForStakeAddressInsertQuery,
203                        self.catalyst_id_for_stake_address,
204                    )
205                    .await
206            }));
207        }
208
209        query_handles
210    }
211}
212
213/// Returns a Catalyst ID of the given registration.
214async fn catalyst_id(
215    session: &Arc<CassandraSession>, cip509: &Cip509, txn_id: TransactionId, slot: Slot,
216    index: TxnIndex, is_immutable: bool,
217) -> anyhow::Result<CatalystId> {
218    use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::cache_for_transaction_id;
219
220    let id = match cip509.previous_transaction() {
221        Some(previous) => query_catalyst_id(session, previous, is_immutable).await?,
222        None => {
223            cip509
224                .catalyst_id()
225                .context("Empty Catalyst ID in root RBAC registration")?
226                .as_short_id()
227        },
228    };
229
230    cache_for_transaction_id(txn_id, id.clone(), slot, index);
231
232    Ok(id)
233}
234
235/// Queries a Catalyst ID from the database by the given transaction ID.
236async fn query_catalyst_id(
237    session: &Arc<CassandraSession>, txn_id: TransactionId, is_immutable: bool,
238) -> anyhow::Result<CatalystId> {
239    use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query;
240
241    if let Some(q) = Query::get_latest(session, txn_id.into())
242        .await
243        .context("Failed to query Catalyst ID from transaction ID")?
244    {
245        Ok(q.catalyst_id.into())
246    } else {
247        if is_immutable {
248            bail!("Unable to find Catalyst ID in the persistent DB");
249        }
250
251        // If this block is a volatile/mutable one then try to look up a Catalyst ID in the
252        // persistent database.
253        let persistent_session =
254            CassandraSession::get(true).context("Failed to get persistent DB session")?;
255        Query::get_latest(&persistent_session, txn_id.into())
256            .await
257            .transpose()
258            .context("Unable to find Catalyst ID in the persistent DB")?
259            .map(|q| q.catalyst_id.into())
260    }
261}
262
263/// Returns stake addresses of the role 0.
264fn stake_addresses(metadata: &Cip509RbacMetadata) -> Vec<StakeAddress> {
265    let mut result = Vec::new();
266
267    if let Some(uris) = metadata.certificate_uris.x_uris().get(&0) {
268        result.extend(convert_stake_addresses(uris));
269    }
270    if let Some(uris) = metadata.certificate_uris.c_uris().get(&0) {
271        result.extend(convert_stake_addresses(uris));
272    }
273
274    result
275}
276
277/// Converts a list of `Cip0134Uri` to a list of stake addresses.
278fn convert_stake_addresses(uris: &[Cip0134Uri]) -> Vec<StakeAddress> {
279    uris.iter()
280        .filter_map(|uri| {
281            match uri.address() {
282                Address::Stake(a) => Some(a.clone().into()),
283                _ => None,
284            }
285        })
286        .collect()
287}