cat_gateway/db/index/block/
txi.rs

1//! Insert TXI Index Data Queries.
2
3use std::sync::Arc;
4
5use cardano_blockchain_types::{Slot, TransactionId, TxnOutputOffset};
6use catalyst_types::hashes::Blake2b256Hash;
7use scylla::{SerializeRow, Session};
8use tracing::error;
9
10use crate::{
11    db::{
12        index::{
13            queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
14            session::CassandraSession,
15        },
16        types::{DbSlot, DbTransactionId, DbTxnOutputOffset},
17    },
18    settings::cassandra_db,
19};
20
21/// Insert TXI Query and Parameters
22#[derive(SerializeRow, Debug)]
23pub(crate) struct TxiInsertParams {
24    /// Spent Transactions Hash
25    txn_id: DbTransactionId,
26    /// TXO Index spent.
27    txo: DbTxnOutputOffset,
28    /// Block Slot Number when spend occurred.
29    slot_no: DbSlot,
30}
31
32impl TxiInsertParams {
33    /// Create a new record for this transaction.
34    pub fn new(txn_id: TransactionId, txo: TxnOutputOffset, slot: Slot) -> Self {
35        Self {
36            txn_id: txn_id.into(),
37            txo: txo.into(),
38            slot_no: slot.into(),
39        }
40    }
41}
42
43/// Insert TXI Query and Parameters
44pub(crate) struct TxiInsertQuery {
45    /// Transaction Input Data to be inserted.
46    txi_data: Vec<TxiInsertParams>,
47}
48
49/// TXI by Txn hash Index
50const INSERT_TXI_QUERY: &str = include_str!("./cql/insert_txi.cql");
51
52impl TxiInsertQuery {
53    /// Create a new record for this transaction.
54    pub(crate) fn new() -> Self {
55        Self {
56            txi_data: Vec::new(),
57        }
58    }
59
60    /// Prepare Batch of Insert TXI Index Data Queries
61    pub(crate) async fn prepare_batch(
62        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
63    ) -> anyhow::Result<SizedBatch> {
64        PreparedQueries::prepare_batch(
65            session.clone(),
66            INSERT_TXI_QUERY,
67            cfg,
68            scylla::statement::Consistency::Any,
69            true,
70            false,
71        )
72        .await
73        .inspect_err(|error| error!(error=%error,"Failed to prepare Insert TXI Query."))
74        .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_TXI_QUERY}"))
75    }
76
77    /// Index the transaction Inputs.
78    pub(crate) fn index(&mut self, txs: &pallas_traverse::MultiEraTx<'_>, slot_no: Slot) {
79        // Index the TXI's.
80        for txi in txs.inputs() {
81            let txn_id = Blake2b256Hash::from(*txi.hash()).into();
82            let txo = txi.index().try_into().unwrap_or(i16::MAX).into();
83
84            self.txi_data
85                .push(TxiInsertParams::new(txn_id, txo, slot_no));
86        }
87    }
88
89    /// Execute the Certificate Indexing Queries.
90    ///
91    /// Consumes the `self` and returns a vector of futures.
92    pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
93        let mut query_handles: FallibleQueryTasks = Vec::new();
94
95        let inner_session = session.clone();
96
97        query_handles.push(tokio::spawn(async move {
98            inner_session
99                .execute_batch(PreparedQuery::TxiInsertQuery, self.txi_data)
100                .await
101        }));
102
103        query_handles
104    }
105}