pub(crate) mod rbac;
pub(crate) mod registrations;
pub(crate) mod staked_ada;
pub(crate) mod sync_status;
use std::{fmt::Debug, sync::Arc};
use anyhow::bail;
use crossbeam_skiplist::SkipMap;
use rbac::{
get_chain_root::GetChainRootQuery, get_registrations::GetRegistrationsByChainRootQuery,
get_role0_chain_root::GetRole0ChainRootQuery,
};
use registrations::{
get_from_stake_addr::GetRegistrationQuery, get_from_stake_hash::GetStakeAddrQuery,
get_from_vote_key::GetStakeAddrFromVoteKeyQuery, get_invalid::GetInvalidRegistrationQuery,
};
use scylla::{
batch::Batch, prepared_statement::PreparedStatement, serialize::row::SerializeRow,
transport::iterator::QueryPager, QueryResult, Session,
};
use staked_ada::{
get_assets_by_stake_address::GetAssetsByStakeAddressQuery,
get_txi_by_txn_hash::GetTxiByTxnHashesQuery,
get_txo_by_stake_address::GetTxoByStakeAddressQuery, update_txo_spent::UpdateTxoSpentQuery,
};
use sync_status::update::SyncStatusInsertQuery;
use tracing::error;
use super::block::{
certs::CertInsertQuery, cip36::Cip36InsertQuery, rbac509::Rbac509InsertQuery,
txi::TxiInsertQuery, txo::TxoInsertQuery,
};
use crate::settings::cassandra_db;
pub(crate) type SizedBatch = SkipMap<u16, Arc<Batch>>;
#[derive(strum_macros::Display)]
#[allow(clippy::enum_variant_names)]
pub(crate) enum PreparedQuery {
TxoAdaInsertQuery,
TxoAssetInsertQuery,
UnstakedTxoAdaInsertQuery,
UnstakedTxoAssetInsertQuery,
TxiInsertQuery,
StakeRegistrationInsertQuery,
Cip36RegistrationInsertQuery,
Cip36RegistrationInsertErrorQuery,
Cip36RegistrationForStakeAddrInsertQuery,
TxoSpentUpdateQuery,
Rbac509InsertQuery,
ChainRootForTxnIdInsertQuery,
ChainRootForRole0KeyInsertQuery,
ChainRootForStakeAddressInsertQuery,
}
pub(crate) enum PreparedSelectQuery {
TxoByStakeAddress,
TxiByTransactionHash,
AssetsByStakeAddress,
RegistrationFromStakeAddr,
InvalidRegistrationsFromStakeAddr,
StakeAddrFromStakeHash,
StakeAddrFromVoteKey,
ChainRootByStakeAddress,
RegistrationsByChainRoot,
ChainRootByRole0Key,
}
pub(crate) enum PreparedUpsertQuery {
SyncStatusInsert,
}
#[allow(clippy::struct_field_names)]
pub(crate) struct PreparedQueries {
txo_insert_queries: SizedBatch,
txo_asset_insert_queries: SizedBatch,
unstaked_txo_insert_queries: SizedBatch,
unstaked_txo_asset_insert_queries: SizedBatch,
txi_insert_queries: SizedBatch,
stake_registration_insert_queries: SizedBatch,
cip36_registration_insert_queries: SizedBatch,
cip36_registration_error_insert_queries: SizedBatch,
cip36_registration_for_stake_address_insert_queries: SizedBatch,
txo_spent_update_queries: SizedBatch,
txo_by_stake_address_query: PreparedStatement,
txi_by_txn_hash_query: PreparedStatement,
rbac509_registration_insert_queries: SizedBatch,
chain_root_for_txn_id_insert_queries: SizedBatch,
chain_root_for_role0_key_insert_queries: SizedBatch,
chain_root_for_stake_address_insert_queries: SizedBatch,
native_assets_by_stake_address_query: PreparedStatement,
registration_from_stake_addr_query: PreparedStatement,
stake_addr_from_stake_hash_query: PreparedStatement,
stake_addr_from_vote_key_query: PreparedStatement,
invalid_registrations_from_stake_addr_query: PreparedStatement,
sync_status_insert: PreparedStatement,
chain_root_by_stake_address_query: PreparedStatement,
registrations_by_chain_root_query: PreparedStatement,
chain_root_by_role0_key_query: PreparedStatement,
}
#[allow(dead_code)]
pub(crate) type FallibleQueryResult = anyhow::Result<QueryResult>;
pub(crate) type FallibleQueryResults = anyhow::Result<Vec<QueryResult>>;
pub(crate) type FallibleQueryTasks = Vec<tokio::task::JoinHandle<FallibleQueryResults>>;
impl PreparedQueries {
pub(crate) async fn new(
session: Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<Self> {
let txi_insert_queries = TxiInsertQuery::prepare_batch(&session, cfg).await;
let all_txo_queries = TxoInsertQuery::prepare_batch(&session, cfg).await;
let stake_registration_insert_queries = CertInsertQuery::prepare_batch(&session, cfg).await;
let all_cip36_queries = Cip36InsertQuery::prepare_batch(&session, cfg).await;
let txo_spent_update_queries =
UpdateTxoSpentQuery::prepare_batch(session.clone(), cfg).await;
let txo_by_stake_address_query = GetTxoByStakeAddressQuery::prepare(session.clone()).await;
let txi_by_txn_hash_query = GetTxiByTxnHashesQuery::prepare(session.clone()).await;
let all_rbac_queries = Rbac509InsertQuery::prepare_batch(&session, cfg).await;
let native_assets_by_stake_address_query =
GetAssetsByStakeAddressQuery::prepare(session.clone()).await;
let registration_from_stake_addr_query =
GetRegistrationQuery::prepare(session.clone()).await;
let stake_addr_from_stake_hash = GetStakeAddrQuery::prepare(session.clone()).await;
let stake_addr_from_vote_key = GetStakeAddrFromVoteKeyQuery::prepare(session.clone()).await;
let invalid_registrations = GetInvalidRegistrationQuery::prepare(session.clone()).await;
let sync_status_insert = SyncStatusInsertQuery::prepare(session.clone()).await;
let chain_root_by_stake_address = GetChainRootQuery::prepare(session.clone()).await;
let registrations_by_chain_root =
GetRegistrationsByChainRootQuery::prepare(session.clone()).await;
let chain_root_by_role0_key = GetRole0ChainRootQuery::prepare(session).await;
let (
txo_insert_queries,
unstaked_txo_insert_queries,
txo_asset_insert_queries,
unstaked_txo_asset_insert_queries,
) = all_txo_queries?;
let (
cip36_registration_insert_queries,
cip36_registration_error_insert_queries,
cip36_registration_for_stake_address_insert_queries,
) = all_cip36_queries?;
let (
rbac509_registration_insert_queries,
chain_root_for_txn_id_insert_queries,
chain_root_for_role0_key_insert_queries,
chain_root_for_stake_address_insert_queries,
) = all_rbac_queries?;
Ok(Self {
txo_insert_queries,
txo_asset_insert_queries,
unstaked_txo_insert_queries,
unstaked_txo_asset_insert_queries,
txi_insert_queries: txi_insert_queries?,
stake_registration_insert_queries: stake_registration_insert_queries?,
cip36_registration_insert_queries,
cip36_registration_error_insert_queries,
cip36_registration_for_stake_address_insert_queries,
txo_spent_update_queries: txo_spent_update_queries?,
txo_by_stake_address_query: txo_by_stake_address_query?,
txi_by_txn_hash_query: txi_by_txn_hash_query?,
rbac509_registration_insert_queries,
chain_root_for_txn_id_insert_queries,
chain_root_for_role0_key_insert_queries,
chain_root_for_stake_address_insert_queries,
native_assets_by_stake_address_query: native_assets_by_stake_address_query?,
registration_from_stake_addr_query: registration_from_stake_addr_query?,
stake_addr_from_stake_hash_query: stake_addr_from_stake_hash?,
stake_addr_from_vote_key_query: stake_addr_from_vote_key?,
invalid_registrations_from_stake_addr_query: invalid_registrations?,
sync_status_insert: sync_status_insert?,
chain_root_by_stake_address_query: chain_root_by_stake_address?,
registrations_by_chain_root_query: registrations_by_chain_root?,
chain_root_by_role0_key_query: chain_root_by_role0_key?,
})
}
pub(crate) async fn prepare(
session: Arc<Session>, query: &str, consistency: scylla::statement::Consistency,
idempotent: bool,
) -> anyhow::Result<PreparedStatement> {
let mut prepared = session.prepare(query).await?;
prepared.set_consistency(consistency);
prepared.set_is_idempotent(idempotent);
Ok(prepared)
}
pub(crate) async fn prepare_batch(
session: Arc<Session>, query: &str, cfg: &cassandra_db::EnvVars,
consistency: scylla::statement::Consistency, idempotent: bool, logged: bool,
) -> anyhow::Result<SizedBatch> {
let sized_batches: SizedBatch = SkipMap::new();
let prepared = Self::prepare(session, query, consistency, idempotent).await?;
for batch_size in cassandra_db::MIN_BATCH_SIZE..=cfg.max_batch_size {
let mut batch: Batch = Batch::new(if logged {
scylla::batch::BatchType::Logged
} else {
scylla::batch::BatchType::Unlogged
});
batch.set_consistency(consistency);
batch.set_is_idempotent(idempotent);
for _ in cassandra_db::MIN_BATCH_SIZE..=batch_size {
batch.append_statement(prepared.clone());
}
sized_batches.insert(batch_size.try_into()?, Arc::new(batch));
}
Ok(sized_batches)
}
pub(crate) async fn execute_upsert<P>(
&self, session: Arc<Session>, upsert_query: PreparedUpsertQuery, params: P,
) -> anyhow::Result<()>
where P: SerializeRow {
let prepared_stmt = match upsert_query {
PreparedUpsertQuery::SyncStatusInsert => &self.sync_status_insert,
};
session
.execute_unpaged(prepared_stmt, params)
.await
.map_err(|e| anyhow::anyhow!(e))?;
Ok(())
}
pub(crate) async fn execute_iter<P>(
&self, session: Arc<Session>, select_query: PreparedSelectQuery, params: P,
) -> anyhow::Result<QueryPager>
where P: SerializeRow {
let prepared_stmt = match select_query {
PreparedSelectQuery::TxoByStakeAddress => &self.txo_by_stake_address_query,
PreparedSelectQuery::TxiByTransactionHash => &self.txi_by_txn_hash_query,
PreparedSelectQuery::AssetsByStakeAddress => &self.native_assets_by_stake_address_query,
PreparedSelectQuery::RegistrationFromStakeAddr => {
&self.registration_from_stake_addr_query
},
PreparedSelectQuery::StakeAddrFromStakeHash => &self.stake_addr_from_stake_hash_query,
PreparedSelectQuery::StakeAddrFromVoteKey => &self.stake_addr_from_vote_key_query,
PreparedSelectQuery::InvalidRegistrationsFromStakeAddr => {
&self.invalid_registrations_from_stake_addr_query
},
PreparedSelectQuery::ChainRootByStakeAddress => &self.chain_root_by_stake_address_query,
PreparedSelectQuery::RegistrationsByChainRoot => {
&self.registrations_by_chain_root_query
},
PreparedSelectQuery::ChainRootByRole0Key => &self.chain_root_by_role0_key_query,
};
session
.execute_iter(prepared_stmt.clone(), params)
.await
.map_err(|e| anyhow::anyhow!(e))
}
pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
&self, session: Arc<Session>, cfg: Arc<cassandra_db::EnvVars>, query: PreparedQuery,
values: Vec<T>,
) -> FallibleQueryResults {
let query_map = match query {
PreparedQuery::TxoAdaInsertQuery => &self.txo_insert_queries,
PreparedQuery::TxoAssetInsertQuery => &self.txo_asset_insert_queries,
PreparedQuery::UnstakedTxoAdaInsertQuery => &self.unstaked_txo_insert_queries,
PreparedQuery::UnstakedTxoAssetInsertQuery => &self.unstaked_txo_asset_insert_queries,
PreparedQuery::TxiInsertQuery => &self.txi_insert_queries,
PreparedQuery::StakeRegistrationInsertQuery => &self.stake_registration_insert_queries,
PreparedQuery::Cip36RegistrationInsertQuery => &self.cip36_registration_insert_queries,
PreparedQuery::Cip36RegistrationInsertErrorQuery => {
&self.cip36_registration_error_insert_queries
},
PreparedQuery::Cip36RegistrationForStakeAddrInsertQuery => {
&self.cip36_registration_for_stake_address_insert_queries
},
PreparedQuery::TxoSpentUpdateQuery => &self.txo_spent_update_queries,
PreparedQuery::Rbac509InsertQuery => &self.rbac509_registration_insert_queries,
PreparedQuery::ChainRootForTxnIdInsertQuery => {
&self.chain_root_for_txn_id_insert_queries
},
PreparedQuery::ChainRootForRole0KeyInsertQuery => {
&self.chain_root_for_role0_key_insert_queries
},
PreparedQuery::ChainRootForStakeAddressInsertQuery => {
&self.chain_root_for_stake_address_insert_queries
},
};
let mut results: Vec<QueryResult> = Vec::new();
let chunks = values.chunks(cfg.max_batch_size.try_into().unwrap_or(1));
let mut query_failed = false;
let query_str = format!("{query}");
for chunk in chunks {
let chunk_size: u16 = chunk.len().try_into()?;
let Some(batch_query) = query_map.get(&chunk_size) else {
bail!("No batch query found for size {}", chunk_size);
};
let batch_query_statements = batch_query.value().clone();
match session.batch(&batch_query_statements, chunk).await {
Ok(result) => results.push(result),
Err(err) => {
let chunk_str = format!("{chunk:?}");
error!(error=%err, query=query_str, chunk=chunk_str, "Query Execution Failed");
query_failed = true;
},
}
}
if query_failed {
bail!("Query Failed: {query_str}!");
}
Ok(results)
}
}