cat_gateway/db/index/block/
txi.rsuse std::sync::Arc;
use scylla::{SerializeRow, Session};
use tracing::error;
use crate::{
db::index::{
queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
session::CassandraSession,
},
settings::cassandra_db,
};
#[derive(SerializeRow, Debug)]
pub(crate) struct TxiInsertParams {
txn_hash: Vec<u8>,
txo: i16,
slot_no: num_bigint::BigInt,
}
impl TxiInsertParams {
pub fn new(txn_hash: &[u8], txo: i16, slot_no: u64) -> Self {
Self {
txn_hash: txn_hash.to_vec(),
txo,
slot_no: slot_no.into(),
}
}
}
pub(crate) struct TxiInsertQuery {
txi_data: Vec<TxiInsertParams>,
}
const INSERT_TXI_QUERY: &str = include_str!("./cql/insert_txi.cql");
impl TxiInsertQuery {
pub(crate) fn new() -> Self {
Self {
txi_data: Vec::new(),
}
}
pub(crate) async fn prepare_batch(
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<SizedBatch> {
let txi_insert_queries = PreparedQueries::prepare_batch(
session.clone(),
INSERT_TXI_QUERY,
cfg,
scylla::statement::Consistency::Any,
true,
false,
)
.await;
if let Err(ref error) = txi_insert_queries {
error!(error=%error,"Failed to prepare Insert TXI Query.");
};
txi_insert_queries
}
pub(crate) fn index(&mut self, txs: &pallas_traverse::MultiEraTx<'_>, slot_no: u64) {
for txi in txs.inputs() {
let txn_hash = txi.hash().to_vec();
let txo: i16 = txi.index().try_into().unwrap_or(i16::MAX);
self.txi_data
.push(TxiInsertParams::new(&txn_hash, txo, slot_no));
}
}
pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
let mut query_handles: FallibleQueryTasks = Vec::new();
let inner_session = session.clone();
query_handles.push(tokio::spawn(async move {
inner_session
.execute_batch(PreparedQuery::TxiInsertQuery, self.txi_data)
.await
}));
query_handles
}
}