cat_gateway/db/index/block/
mod.rs

1//! Index a block
2//! Primary Data Indexing - Upsert operations
3
4pub(crate) mod certs;
5pub(crate) mod cip36;
6pub(crate) mod rbac509;
7pub(crate) mod roll_forward;
8pub(crate) mod txi;
9pub(crate) mod txo;
10
11use cardano_blockchain_types::MultiEraBlock;
12use catalyst_types::hashes::Blake2b256Hash;
13use certs::CertInsertQuery;
14use cip36::Cip36InsertQuery;
15use rbac509::Rbac509InsertQuery;
16use tracing::error;
17use txi::TxiInsertQuery;
18use txo::TxoInsertQuery;
19
20use super::{queries::FallibleQueryTasks, session::CassandraSession};
21
22/// Add all data needed from the block into the indexes.
23pub(crate) async fn index_block(block: &MultiEraBlock) -> anyhow::Result<()> {
24    // Get the session.  This should never fail.
25    let Some(session) = CassandraSession::get(block.is_immutable()) else {
26        anyhow::bail!("Failed to get Index DB Session.  Can not index block.");
27    };
28
29    let mut cert_index = CertInsertQuery::new();
30    let mut cip36_index = Cip36InsertQuery::new();
31    let mut rbac509_index = Rbac509InsertQuery::new();
32
33    let mut txi_index = TxiInsertQuery::new();
34    let mut txo_index = TxoInsertQuery::new();
35
36    let slot_no = block.point().slot_or_default();
37
38    // We add all transactions in the block to their respective index data sets.
39    for (index, txn) in block.enumerate_txs() {
40        let txn_id = Blake2b256Hash::from(txn.hash()).into();
41
42        // Index the TXIs.
43        txi_index.index(&txn, slot_no);
44
45        // TODO: Index minting.
46        // let mint = txs.mints().iter() {};
47
48        // TODO: Index Metadata.
49        cip36_index.index(index, slot_no, block);
50
51        // Index Certificates inside the transaction.
52        cert_index.index(&txn, slot_no, index, block);
53
54        // Index the TXOs.
55        txo_index.index(block.network(), &txn, slot_no, txn_id, index);
56
57        // Index RBAC 509 inside the transaction.
58        rbac509_index.index(&session, txn_id, index, block).await;
59    }
60
61    // We then execute each batch of data from the block.
62    // This maximizes batching opportunities.
63    let mut query_handles: FallibleQueryTasks = Vec::new();
64
65    query_handles.extend(txo_index.execute(&session));
66    query_handles.extend(txi_index.execute(&session));
67    query_handles.extend(cert_index.execute(&session));
68    query_handles.extend(cip36_index.execute(&session));
69    query_handles.extend(rbac509_index.execute(&session));
70
71    let mut result: anyhow::Result<()> = Ok(());
72
73    // Wait for operations to complete, and display any errors
74    for handle in query_handles {
75        if result.is_err() {
76            // Try and cancel all futures waiting tasks and return the first error we encountered.
77            handle.abort();
78            continue;
79        }
80        match handle.await {
81            Ok(join_res) => {
82                if let Err(error) = join_res {
83                    // IF a query fails, assume everything else is broken.
84                    error!(error=%error,"Query Failed");
85                    result = Err(error);
86                }
87            },
88            Err(error) => {
89                error!(error=%error,"Query Join Failed");
90                result = Err(error.into());
91            },
92        }
93    }
94
95    result
96}