cat_gateway/db/index/block/rbac509/
mod.rs1pub(crate) mod insert_catalyst_id_for_stake_address;
4pub(crate) mod insert_catalyst_id_for_txn_id;
5pub(crate) mod insert_rbac509;
6pub(crate) mod insert_rbac509_invalid;
7
8use std::sync::Arc;
9
10use cardano_blockchain_types::{
11 Cip0134Uri, MultiEraBlock, Slot, StakeAddress, TransactionId, TxnIndex,
12};
13use catalyst_types::id_uri::IdUri;
14use pallas::ledger::addresses::Address;
15use rbac_registration::cardano::cip509::{Cip509, Cip509RbacMetadata};
16use scylla::Session;
17use tracing::error;
18
19use crate::{
20 db::index::{
21 queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
22 session::CassandraSession,
23 },
24 settings::cassandra_db::EnvVars,
25};
26
27#[derive(Debug)]
29pub(crate) struct Rbac509InsertQuery {
30 pub(crate) registrations: Vec<insert_rbac509::Params>,
32 pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
34 pub(crate) catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
36 pub(crate) catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
38}
39
40impl Rbac509InsertQuery {
41 pub(crate) fn new() -> Self {
43 Rbac509InsertQuery {
44 registrations: Vec::new(),
45 invalid: Vec::new(),
46 catalyst_id_for_txn_id: Vec::new(),
47 catalyst_id_for_stake_address: Vec::new(),
48 }
49 }
50
51 pub(crate) async fn prepare_batch(
53 session: &Arc<Session>, cfg: &EnvVars,
54 ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
55 Ok((
56 insert_rbac509::Params::prepare_batch(session, cfg).await?,
57 insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
58 insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
59 insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
60 ))
61 }
62
63 pub(crate) async fn index(
65 &mut self, session: &Arc<CassandraSession>, txn_hash: TransactionId, index: TxnIndex,
66 block: &MultiEraBlock,
67 ) {
68 let slot = block.slot();
69 let cip509 = match Cip509::new(block, index, &[]) {
70 Ok(Some(v)) => v,
71 Ok(None) => {
72 return;
74 },
75 Err(e) => {
76 error!(
80 slot = ?slot,
81 index = ?index,
82 "Invalid RBAC Registration Metadata in transaction: {e:?}"
83 );
84 return;
85 },
86 };
87
88 if slot != cip509.origin().point().slot_or_default() {
90 error!(
91 "Cip509 slot mismatch: expected {slot:?}, got {:?}",
92 cip509.origin().point().slot_or_default()
93 );
94 }
95 if txn_hash != cip509.txn_hash() {
96 error!(
97 "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
98 cip509.txn_hash()
99 );
100 }
101
102 let Some(catalyst_id) = catalyst_id(session, &cip509, txn_hash, slot, index).await else {
103 error!("Unable to determine Catalyst id for registration: slot = {slot:?}, index = {index:?}, txn_hash = {txn_hash:?}");
104 return;
105 };
106
107 let previous_transaction = cip509.previous_transaction();
108 let purpose = cip509.purpose();
109 match cip509.consume() {
110 Ok((purpose, metadata, _)) => {
111 self.registrations.push(insert_rbac509::Params::new(
112 catalyst_id.clone(),
113 txn_hash,
114 slot,
115 index,
116 purpose,
117 previous_transaction,
118 ));
119 self.catalyst_id_for_txn_id
120 .push(insert_catalyst_id_for_txn_id::Params::new(
121 catalyst_id.clone(),
122 txn_hash,
123 ));
124 for address in stake_addresses(&metadata) {
125 self.catalyst_id_for_stake_address.push(
126 insert_catalyst_id_for_stake_address::Params::new(
127 address,
128 slot,
129 catalyst_id.clone(),
130 ),
131 );
132 }
133 },
134 Err(report) => {
135 self.invalid.push(insert_rbac509_invalid::Params::new(
136 catalyst_id,
137 txn_hash,
138 slot,
139 index,
140 purpose,
141 previous_transaction,
142 &report,
143 ));
144 },
145 }
146 }
147
148 pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
152 let mut query_handles: FallibleQueryTasks = Vec::new();
153
154 if !self.registrations.is_empty() {
155 let inner_session = session.clone();
156 query_handles.push(tokio::spawn(async move {
157 inner_session
158 .execute_batch(PreparedQuery::Rbac509InsertQuery, self.registrations)
159 .await
160 }));
161 }
162
163 if !self.invalid.is_empty() {
164 let inner_session = session.clone();
165 query_handles.push(tokio::spawn(async move {
166 inner_session
167 .execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, self.invalid)
168 .await
169 }));
170 }
171
172 if !self.catalyst_id_for_txn_id.is_empty() {
173 let inner_session = session.clone();
174 query_handles.push(tokio::spawn(async move {
175 inner_session
176 .execute_batch(
177 PreparedQuery::CatalystIdForTxnIdInsertQuery,
178 self.catalyst_id_for_txn_id,
179 )
180 .await
181 }));
182 }
183
184 if !self.catalyst_id_for_stake_address.is_empty() {
185 let inner_session = session.clone();
186 query_handles.push(tokio::spawn(async move {
187 inner_session
188 .execute_batch(
189 PreparedQuery::CatalystIdForStakeAddressInsertQuery,
190 self.catalyst_id_for_stake_address,
191 )
192 .await
193 }));
194 }
195
196 query_handles
197 }
198}
199
200async fn catalyst_id(
202 session: &Arc<CassandraSession>, cip509: &Cip509, txn_hash: TransactionId, slot: Slot,
203 index: TxnIndex,
204) -> Option<IdUri> {
205 use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::{
206 cache_for_transaction_id, Query,
207 };
208
209 let id = match cip509.previous_transaction() {
210 Some(previous) => {
211 Query::get_latest(session, previous.into())
212 .await
213 .inspect_err(|e| error!("{e:?}"))
214 .ok()
215 .flatten()?
216 .catalyst_id
217 .into()
218 },
219 None => cip509.catalyst_id()?.as_short_id(),
220 };
221
222 cache_for_transaction_id(txn_hash, id.clone(), slot, index);
223
224 Some(id)
225}
226
227fn stake_addresses(metadata: &Cip509RbacMetadata) -> Vec<StakeAddress> {
229 let mut result = Vec::new();
230
231 if let Some(uris) = metadata.certificate_uris.x_uris().get(&0) {
232 result.extend(convert_stake_addresses(uris));
233 }
234 if let Some(uris) = metadata.certificate_uris.c_uris().get(&0) {
235 result.extend(convert_stake_addresses(uris));
236 }
237
238 result
239}
240
241fn convert_stake_addresses(uris: &[Cip0134Uri]) -> Vec<StakeAddress> {
243 uris.iter()
244 .filter_map(|uri| {
245 match uri.address() {
246 Address::Stake(a) => Some(a.clone().into()),
247 _ => None,
248 }
249 })
250 .collect()
251}