cat_gateway/db/index/block/
txi.rs1use std::sync::Arc;
4
5use cardano_blockchain_types::{Slot, TransactionId, TxnOutputOffset};
6use catalyst_types::hashes::Blake2b256Hash;
7use scylla::{SerializeRow, Session};
8use tracing::error;
9
10use crate::{
11 db::{
12 index::{
13 queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
14 session::CassandraSession,
15 },
16 types::{DbSlot, DbTransactionId, DbTxnOutputOffset},
17 },
18 settings::cassandra_db,
19};
20
21#[derive(SerializeRow, Debug)]
23pub(crate) struct TxiInsertParams {
24 txn_id: DbTransactionId,
26 txo: DbTxnOutputOffset,
28 slot_no: DbSlot,
30}
31
32impl TxiInsertParams {
33 pub fn new(txn_id: TransactionId, txo: TxnOutputOffset, slot: Slot) -> Self {
35 Self {
36 txn_id: txn_id.into(),
37 txo: txo.into(),
38 slot_no: slot.into(),
39 }
40 }
41}
42
43pub(crate) struct TxiInsertQuery {
45 txi_data: Vec<TxiInsertParams>,
47}
48
49const INSERT_TXI_QUERY: &str = include_str!("./cql/insert_txi.cql");
51
52impl TxiInsertQuery {
53 pub(crate) fn new() -> Self {
55 Self {
56 txi_data: Vec::new(),
57 }
58 }
59
60 pub(crate) async fn prepare_batch(
62 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
63 ) -> anyhow::Result<SizedBatch> {
64 PreparedQueries::prepare_batch(
65 session.clone(),
66 INSERT_TXI_QUERY,
67 cfg,
68 scylla::statement::Consistency::Any,
69 true,
70 false,
71 )
72 .await
73 .inspect_err(|error| error!(error=%error,"Failed to prepare Insert TXI Query."))
74 .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_TXI_QUERY}"))
75 }
76
77 pub(crate) fn index(&mut self, txs: &pallas_traverse::MultiEraTx<'_>, slot_no: Slot) {
79 for txi in txs.inputs() {
81 let txn_id = Blake2b256Hash::from(*txi.hash()).into();
82 let txo = txi.index().try_into().unwrap_or(i16::MAX).into();
83
84 self.txi_data
85 .push(TxiInsertParams::new(txn_id, txo, slot_no));
86 }
87 }
88
89 pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
93 let mut query_handles: FallibleQueryTasks = Vec::new();
94
95 let inner_session = session.clone();
96
97 query_handles.push(tokio::spawn(async move {
98 inner_session
99 .execute_batch(PreparedQuery::TxiInsertQuery, self.txi_data)
100 .await
101 }));
102
103 query_handles
104 }
105}