cat_gateway/db/index/block/rbac509/
mod.rs1pub(crate) mod insert_catalyst_id_for_stake_address;
4pub(crate) mod insert_catalyst_id_for_txn_id;
5pub(crate) mod insert_rbac509;
6pub(crate) mod insert_rbac509_invalid;
7
8use std::sync::Arc;
9
10use anyhow::{bail, Context};
11use cardano_blockchain_types::{
12 Cip0134Uri, MultiEraBlock, Slot, StakeAddress, TransactionId, TxnIndex,
13};
14use catalyst_types::catalyst_id::CatalystId;
15use pallas::ledger::addresses::Address;
16use rbac_registration::cardano::cip509::{Cip509, Cip509RbacMetadata};
17use scylla::Session;
18use tracing::{debug, error};
19
20use crate::{
21 db::index::{
22 queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
23 session::CassandraSession,
24 },
25 settings::cassandra_db::EnvVars,
26};
27
28#[derive(Debug)]
30pub(crate) struct Rbac509InsertQuery {
31 pub(crate) registrations: Vec<insert_rbac509::Params>,
33 pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
35 pub(crate) catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
37 pub(crate) catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
39}
40
41impl Rbac509InsertQuery {
42 pub(crate) fn new() -> Self {
44 Rbac509InsertQuery {
45 registrations: Vec::new(),
46 invalid: Vec::new(),
47 catalyst_id_for_txn_id: Vec::new(),
48 catalyst_id_for_stake_address: Vec::new(),
49 }
50 }
51
52 pub(crate) async fn prepare_batch(
54 session: &Arc<Session>, cfg: &EnvVars,
55 ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
56 Ok((
57 insert_rbac509::Params::prepare_batch(session, cfg).await?,
58 insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
59 insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
60 insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
61 ))
62 }
63
64 pub(crate) async fn index(
66 &mut self, session: &Arc<CassandraSession>, txn_hash: TransactionId, index: TxnIndex,
67 block: &MultiEraBlock,
68 ) {
69 let slot = block.slot();
70 let cip509 = match Cip509::new(block, index, &[]) {
71 Ok(Some(v)) => v,
72 Ok(None) => {
73 return;
75 },
76 Err(e) => {
77 debug!(
81 slot = ?slot,
82 index = ?index,
83 "Invalid RBAC Registration Metadata in transaction: {e:?}"
84 );
85 return;
86 },
87 };
88
89 if slot != cip509.origin().point().slot_or_default() {
91 error!(
92 "Cip509 slot mismatch: expected {slot:?}, got {:?}",
93 cip509.origin().point().slot_or_default()
94 );
95 }
96 if txn_hash != cip509.txn_hash() {
97 error!(
98 "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
99 cip509.txn_hash()
100 );
101 }
102
103 let catalyst_id = match catalyst_id(
104 session,
105 &cip509,
106 txn_hash,
107 slot,
108 index,
109 block.is_immutable(),
110 )
111 .await
112 {
113 Ok(v) => v,
114 Err(e) => {
115 debug!("Unable to determine Catalyst id for registration: slot = {slot:?}, index = {index:?}, txn_hash = {txn_hash:?}: {e:?}");
116 return;
117 },
118 };
119
120 let previous_transaction = cip509.previous_transaction();
121 let purpose = cip509.purpose();
122 match cip509.consume() {
123 Ok((purpose, metadata, _)) => {
124 self.registrations.push(insert_rbac509::Params::new(
125 catalyst_id.clone(),
126 txn_hash,
127 slot,
128 index,
129 purpose,
130 previous_transaction,
131 ));
132 self.catalyst_id_for_txn_id
133 .push(insert_catalyst_id_for_txn_id::Params::new(
134 catalyst_id.clone(),
135 txn_hash,
136 ));
137 for address in stake_addresses(&metadata) {
138 self.catalyst_id_for_stake_address.push(
139 insert_catalyst_id_for_stake_address::Params::new(
140 address,
141 slot,
142 catalyst_id.clone(),
143 ),
144 );
145 }
146 },
147 Err(report) => {
148 self.invalid.push(insert_rbac509_invalid::Params::new(
149 catalyst_id,
150 txn_hash,
151 slot,
152 index,
153 purpose,
154 previous_transaction,
155 &report,
156 ));
157 },
158 }
159 }
160
161 pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
165 let mut query_handles: FallibleQueryTasks = Vec::new();
166
167 if !self.registrations.is_empty() {
168 let inner_session = session.clone();
169 query_handles.push(tokio::spawn(async move {
170 inner_session
171 .execute_batch(PreparedQuery::Rbac509InsertQuery, self.registrations)
172 .await
173 }));
174 }
175
176 if !self.invalid.is_empty() {
177 let inner_session = session.clone();
178 query_handles.push(tokio::spawn(async move {
179 inner_session
180 .execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, self.invalid)
181 .await
182 }));
183 }
184
185 if !self.catalyst_id_for_txn_id.is_empty() {
186 let inner_session = session.clone();
187 query_handles.push(tokio::spawn(async move {
188 inner_session
189 .execute_batch(
190 PreparedQuery::CatalystIdForTxnIdInsertQuery,
191 self.catalyst_id_for_txn_id,
192 )
193 .await
194 }));
195 }
196
197 if !self.catalyst_id_for_stake_address.is_empty() {
198 let inner_session = session.clone();
199 query_handles.push(tokio::spawn(async move {
200 inner_session
201 .execute_batch(
202 PreparedQuery::CatalystIdForStakeAddressInsertQuery,
203 self.catalyst_id_for_stake_address,
204 )
205 .await
206 }));
207 }
208
209 query_handles
210 }
211}
212
213async fn catalyst_id(
215 session: &Arc<CassandraSession>, cip509: &Cip509, txn_id: TransactionId, slot: Slot,
216 index: TxnIndex, is_immutable: bool,
217) -> anyhow::Result<CatalystId> {
218 use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::cache_for_transaction_id;
219
220 let id = match cip509.previous_transaction() {
221 Some(previous) => query_catalyst_id(session, previous, is_immutable).await?,
222 None => {
223 cip509
224 .catalyst_id()
225 .context("Empty Catalyst ID in root RBAC registration")?
226 .as_short_id()
227 },
228 };
229
230 cache_for_transaction_id(txn_id, id.clone(), slot, index);
231
232 Ok(id)
233}
234
235async fn query_catalyst_id(
237 session: &Arc<CassandraSession>, txn_id: TransactionId, is_immutable: bool,
238) -> anyhow::Result<CatalystId> {
239 use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query;
240
241 if let Some(q) = Query::get_latest(session, txn_id.into())
242 .await
243 .context("Failed to query Catalyst ID from transaction ID")?
244 {
245 Ok(q.catalyst_id.into())
246 } else {
247 if is_immutable {
248 bail!("Unable to find Catalyst ID in the persistent DB");
249 }
250
251 let persistent_session =
254 CassandraSession::get(true).context("Failed to get persistent DB session")?;
255 Query::get_latest(&persistent_session, txn_id.into())
256 .await
257 .transpose()
258 .context("Unable to find Catalyst ID in the persistent DB")?
259 .map(|q| q.catalyst_id.into())
260 }
261}
262
263fn stake_addresses(metadata: &Cip509RbacMetadata) -> Vec<StakeAddress> {
265 let mut result = Vec::new();
266
267 if let Some(uris) = metadata.certificate_uris.x_uris().get(&0) {
268 result.extend(convert_stake_addresses(uris));
269 }
270 if let Some(uris) = metadata.certificate_uris.c_uris().get(&0) {
271 result.extend(convert_stake_addresses(uris));
272 }
273
274 result
275}
276
277fn convert_stake_addresses(uris: &[Cip0134Uri]) -> Vec<StakeAddress> {
279 uris.iter()
280 .filter_map(|uri| {
281 match uri.address() {
282 Address::Stake(a) => Some(a.clone().into()),
283 _ => None,
284 }
285 })
286 .collect()
287}