cat_gateway/db/index/block/
mod.rspub(crate) mod certs;
pub(crate) mod cip36;
pub(crate) mod rbac509;
pub(crate) mod txi;
pub(crate) mod txo;
use cardano_chain_follower::MultiEraBlock;
use certs::CertInsertQuery;
use cip36::Cip36InsertQuery;
use rbac509::Rbac509InsertQuery;
use tracing::error;
use txi::TxiInsertQuery;
use txo::TxoInsertQuery;
use super::{queries::FallibleQueryTasks, session::CassandraSession};
use crate::service::utilities::convert::from_saturating;
pub(crate) async fn index_block(block: &MultiEraBlock) -> anyhow::Result<()> {
let Some(session) = CassandraSession::get(block.immutable()) else {
anyhow::bail!("Failed to get Index DB Session. Can not index block.");
};
let mut cert_index = CertInsertQuery::new();
let mut cip36_index = Cip36InsertQuery::new();
let mut rbac509_index = Rbac509InsertQuery::new();
let mut txi_index = TxiInsertQuery::new();
let mut txo_index = TxoInsertQuery::new();
let block_data = block.decode();
let slot_no = block_data.slot();
for (txn_index, txs) in block_data.txs().iter().enumerate() {
let txn = from_saturating(txn_index);
let txn_hash = txs.hash().to_vec();
txi_index.index(txs, slot_no);
cip36_index.index(txn_index, txn, slot_no, block);
cert_index.index(txs, slot_no, txn, block);
txo_index.index(txs, slot_no, &txn_hash, txn);
rbac509_index.index(&txn_hash, txn_index, txn, slot_no, block);
}
let mut query_handles: FallibleQueryTasks = Vec::new();
query_handles.extend(txo_index.execute(&session));
query_handles.extend(txi_index.execute(&session));
query_handles.extend(cert_index.execute(&session));
query_handles.extend(cip36_index.execute(&session));
query_handles.extend(rbac509_index.execute(&session));
let mut result: anyhow::Result<()> = Ok(());
for handle in query_handles {
if result.is_err() {
handle.abort();
continue;
}
match handle.await {
Ok(join_res) => {
if let Err(error) = join_res {
error!(error=%error,"Query Failed");
result = Err(error);
}
},
Err(error) => {
error!(error=%error,"Query Join Failed");
result = Err(error.into());
},
}
}
result
}