cat_gateway/db/index/block/
txi.rs1use std::sync::Arc;
4
5use cardano_blockchain_types::{Slot, TransactionId, TxnOutputOffset};
6use catalyst_types::hashes::Blake2b256Hash;
7use scylla::{SerializeRow, Session};
8use tracing::error;
9
10use crate::{
11    db::{
12        index::{
13            queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
14            session::CassandraSession,
15        },
16        types::{DbSlot, DbTransactionId, DbTxnOutputOffset},
17    },
18    settings::cassandra_db,
19};
20
21#[derive(SerializeRow, Debug)]
23pub(crate) struct TxiInsertParams {
24    txn_id: DbTransactionId,
26    txo: DbTxnOutputOffset,
28    slot_no: DbSlot,
30}
31
32impl TxiInsertParams {
33    pub fn new(txn_id: TransactionId, txo: TxnOutputOffset, slot: Slot) -> Self {
35        Self {
36            txn_id: txn_id.into(),
37            txo: txo.into(),
38            slot_no: slot.into(),
39        }
40    }
41}
42
43pub(crate) struct TxiInsertQuery {
45    txi_data: Vec<TxiInsertParams>,
47}
48
49const INSERT_TXI_QUERY: &str = include_str!("./cql/insert_txi.cql");
51
52impl TxiInsertQuery {
53    pub(crate) fn new() -> Self {
55        Self {
56            txi_data: Vec::new(),
57        }
58    }
59
60    pub(crate) async fn prepare_batch(
62        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
63    ) -> anyhow::Result<SizedBatch> {
64        PreparedQueries::prepare_batch(
65            session.clone(),
66            INSERT_TXI_QUERY,
67            cfg,
68            scylla::statement::Consistency::Any,
69            true,
70            false,
71        )
72        .await
73        .inspect_err(|error| error!(error=%error,"Failed to prepare Insert TXI Query."))
74        .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_TXI_QUERY}"))
75    }
76
77    pub(crate) fn index(&mut self, txs: &pallas_traverse::MultiEraTx<'_>, slot_no: Slot) {
79        for txi in txs.inputs() {
81            let txn_id = Blake2b256Hash::from(*txi.hash()).into();
82            let txo = txi.index().try_into().unwrap_or(i16::MAX).into();
83
84            self.txi_data
85                .push(TxiInsertParams::new(txn_id, txo, slot_no));
86        }
87    }
88
89    pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
93        let mut query_handles: FallibleQueryTasks = Vec::new();
94
95        let inner_session = session.clone();
96
97        query_handles.push(tokio::spawn(async move {
98            inner_session
99                .execute_batch(PreparedQuery::TxiInsertQuery, self.txi_data)
100                .await
101        }));
102
103        query_handles
104    }
105}