1pub(crate) mod purge;
6pub(crate) mod rbac;
7pub(crate) mod registrations;
8pub(crate) mod staked_ada;
9pub(crate) mod sync_status;
10
11use std::{fmt::Debug, sync::Arc};
12
13use anyhow::bail;
14use crossbeam_skiplist::SkipMap;
15use registrations::{
16 get_all_invalids::GetAllInvalidRegistrationsQuery,
17 get_all_registrations::GetAllRegistrationsQuery, get_from_stake_addr::GetRegistrationQuery,
18 get_from_stake_address::GetStakeAddrQuery, get_from_vote_key::GetStakeAddrFromVoteKeyQuery,
19 get_invalid::GetInvalidRegistrationQuery,
20};
21use scylla::{
22 batch::Batch, prepared_statement::PreparedStatement, serialize::row::SerializeRow,
23 transport::iterator::QueryPager, QueryResult, Session,
24};
25use staked_ada::{
26 get_assets_by_stake_address::GetAssetsByStakeAddressQuery,
27 get_txi_by_txn_hash::GetTxiByTxnHashesQuery,
28 get_txo_by_stake_address::GetTxoByStakeAddressQuery, update_txo_spent::UpdateTxoSpentQuery,
29};
30use sync_status::update::SyncStatusInsertQuery;
31use tracing::error;
32
33use super::block::{
34 certs::CertInsertQuery, cip36::Cip36InsertQuery, rbac509::Rbac509InsertQuery,
35 txi::TxiInsertQuery, txo::TxoInsertQuery,
36};
37use crate::{
38 db::index::queries::rbac::{
39 get_catalyst_id_from_stake_address, get_catalyst_id_from_transaction_id,
40 get_rbac_invalid_registrations, get_rbac_registrations,
41 },
42 settings::cassandra_db,
43};
44
45pub(crate) type SizedBatch = SkipMap<u16, Arc<Batch>>;
47
48#[derive(strum_macros::Display)]
50#[allow(clippy::enum_variant_names)]
51pub(crate) enum PreparedQuery {
52 TxoAdaInsertQuery,
54 TxoAssetInsertQuery,
56 UnstakedTxoAdaInsertQuery,
58 UnstakedTxoAssetInsertQuery,
60 TxiInsertQuery,
62 StakeRegistrationInsertQuery,
64 Cip36RegistrationInsertQuery,
66 Cip36RegistrationInsertErrorQuery,
68 Cip36RegistrationForStakeAddrInsertQuery,
70 TxoSpentUpdateQuery,
72 Rbac509InsertQuery,
74 Rbac509InvalidInsertQuery,
76 CatalystIdForTxnIdInsertQuery,
78 CatalystIdForStakeAddressInsertQuery,
80}
81
82pub(crate) enum PreparedSelectQuery {
84 TxoByStakeAddress,
86 TxiByTransactionHash,
88 AssetsByStakeAddress,
90 RegistrationFromStakeAddr,
92 InvalidRegistrationsFromStakeAddr,
94 StakeAddrFromStakeHash,
96 StakeAddrFromVoteKey,
98 CatalystIdByTransactionId,
100 CatalystIdByStakeAddress,
102 RbacRegistrationsByCatalystId,
104 RbacInvalidRegistrationsByCatalystId,
106 GetAllRegistrations,
108 GetAllInvalidRegistrations,
110}
111
112pub(crate) enum PreparedUpsertQuery {
114 SyncStatusInsert,
116}
117
118#[allow(clippy::struct_field_names)]
120pub(crate) struct PreparedQueries {
121 txo_insert_queries: SizedBatch,
123 txo_asset_insert_queries: SizedBatch,
125 unstaked_txo_insert_queries: SizedBatch,
127 unstaked_txo_asset_insert_queries: SizedBatch,
129 txi_insert_queries: SizedBatch,
131 stake_registration_insert_queries: SizedBatch,
133 cip36_registration_insert_queries: SizedBatch,
135 cip36_registration_error_insert_queries: SizedBatch,
137 cip36_registration_for_stake_address_insert_queries: SizedBatch,
139 txo_spent_update_queries: SizedBatch,
141 txo_by_stake_address_query: PreparedStatement,
143 txi_by_txn_hash_query: PreparedStatement,
145 rbac509_registration_insert_queries: SizedBatch,
147 rbac509_invalid_registration_insert_queries: SizedBatch,
149 catalyst_id_for_txn_id_insert_queries: SizedBatch,
151 catalyst_id_for_stake_address_insert_queries: SizedBatch,
153 native_assets_by_stake_address_query: PreparedStatement,
155 registration_from_stake_addr_query: PreparedStatement,
157 stake_addr_from_stake_address_query: PreparedStatement,
159 stake_addr_from_vote_key_query: PreparedStatement,
161 invalid_registrations_from_stake_addr_query: PreparedStatement,
163 sync_status_insert: PreparedStatement,
165 catalyst_id_by_stake_address_query: PreparedStatement,
167 catalyst_id_by_transaction_id_query: PreparedStatement,
169 rbac_registrations_by_catalyst_id_query: PreparedStatement,
171 rbac_invalid_registrations_by_catalyst_id_query: PreparedStatement,
173 get_all_registrations_query: PreparedStatement,
175 get_all_invalid_registrations_query: PreparedStatement,
177}
178
179pub(crate) type FallibleQueryResults = anyhow::Result<Vec<QueryResult>>;
181pub(crate) type FallibleQueryTasks = Vec<tokio::task::JoinHandle<FallibleQueryResults>>;
183
184impl PreparedQueries {
185 #[allow(clippy::too_many_lines)]
187 pub(crate) async fn new(
188 session: Arc<Session>, cfg: &cassandra_db::EnvVars,
189 ) -> anyhow::Result<Self> {
190 let txi_insert_queries = TxiInsertQuery::prepare_batch(&session, cfg).await?;
192 let all_txo_queries = TxoInsertQuery::prepare_batch(&session, cfg).await;
193 let stake_registration_insert_queries =
194 CertInsertQuery::prepare_batch(&session, cfg).await?;
195 let all_cip36_queries = Cip36InsertQuery::prepare_batch(&session, cfg).await;
196 let txo_spent_update_queries =
197 UpdateTxoSpentQuery::prepare_batch(session.clone(), cfg).await?;
198 let txo_by_stake_address_query = GetTxoByStakeAddressQuery::prepare(session.clone()).await;
199 let txi_by_txn_hash_query = GetTxiByTxnHashesQuery::prepare(session.clone()).await;
200 let all_rbac_queries = Rbac509InsertQuery::prepare_batch(&session, cfg).await;
201 let native_assets_by_stake_address_query =
202 GetAssetsByStakeAddressQuery::prepare(session.clone()).await;
203 let registration_from_stake_addr_query =
204 GetRegistrationQuery::prepare(session.clone()).await;
205 let stake_addr_from_stake_address = GetStakeAddrQuery::prepare(session.clone()).await;
206 let stake_addr_from_vote_key = GetStakeAddrFromVoteKeyQuery::prepare(session.clone()).await;
207 let invalid_registrations = GetInvalidRegistrationQuery::prepare(session.clone()).await;
208 let get_all_registrations_query = GetAllRegistrationsQuery::prepare(session.clone()).await;
209 let get_all_invalid_registrations_query =
210 GetAllInvalidRegistrationsQuery::prepare(session.clone()).await;
211 let sync_status_insert = SyncStatusInsertQuery::prepare(session.clone()).await?;
212 let catalyst_id_by_stake_address_query =
213 get_catalyst_id_from_stake_address::Query::prepare(session.clone()).await?;
214 let catalyst_id_by_transaction_id_query =
215 get_catalyst_id_from_transaction_id::Query::prepare(session.clone()).await?;
216 let rbac_registrations_by_catalyst_id_query =
217 get_rbac_registrations::Query::prepare(session.clone()).await?;
218 let rbac_invalid_registrations_by_catalyst_id_query =
219 get_rbac_invalid_registrations::Query::prepare(session.clone()).await?;
220
221 let (
222 txo_insert_queries,
223 unstaked_txo_insert_queries,
224 txo_asset_insert_queries,
225 unstaked_txo_asset_insert_queries,
226 ) = all_txo_queries?;
227
228 let (
229 cip36_registration_insert_queries,
230 cip36_registration_error_insert_queries,
231 cip36_registration_for_stake_address_insert_queries,
232 ) = all_cip36_queries?;
233
234 let (
235 rbac509_registration_insert_queries,
236 rbac509_invalid_registration_insert_queries,
237 catalyst_id_for_txn_id_insert_queries,
238 catalyst_id_for_stake_address_insert_queries,
239 ) = all_rbac_queries?;
240
241 Ok(Self {
242 txo_insert_queries,
243 txo_asset_insert_queries,
244 unstaked_txo_insert_queries,
245 unstaked_txo_asset_insert_queries,
246 txi_insert_queries,
247 stake_registration_insert_queries,
248 cip36_registration_insert_queries,
249 cip36_registration_error_insert_queries,
250 cip36_registration_for_stake_address_insert_queries,
251 txo_spent_update_queries,
252 txo_by_stake_address_query: txo_by_stake_address_query?,
253 txi_by_txn_hash_query: txi_by_txn_hash_query?,
254 rbac509_registration_insert_queries,
255 rbac509_invalid_registration_insert_queries,
256 catalyst_id_for_txn_id_insert_queries,
257 catalyst_id_for_stake_address_insert_queries,
258 native_assets_by_stake_address_query: native_assets_by_stake_address_query?,
259 registration_from_stake_addr_query: registration_from_stake_addr_query?,
260 stake_addr_from_stake_address_query: stake_addr_from_stake_address?,
261 stake_addr_from_vote_key_query: stake_addr_from_vote_key?,
262 invalid_registrations_from_stake_addr_query: invalid_registrations?,
263 sync_status_insert,
264 rbac_registrations_by_catalyst_id_query,
265 rbac_invalid_registrations_by_catalyst_id_query,
266 catalyst_id_by_stake_address_query,
267 catalyst_id_by_transaction_id_query,
268 get_all_registrations_query: get_all_registrations_query?,
269 get_all_invalid_registrations_query: get_all_invalid_registrations_query?,
270 })
271 }
272
273 pub(crate) async fn prepare(
275 session: Arc<Session>, query: &str, consistency: scylla::statement::Consistency,
276 idempotent: bool,
277 ) -> anyhow::Result<PreparedStatement> {
278 let mut prepared = session.prepare(query).await?;
279 prepared.set_consistency(consistency);
280 prepared.set_is_idempotent(idempotent);
281
282 Ok(prepared)
283 }
284
285 pub(crate) async fn prepare_batch(
289 session: Arc<Session>, query: &str, cfg: &cassandra_db::EnvVars,
290 consistency: scylla::statement::Consistency, idempotent: bool, logged: bool,
291 ) -> anyhow::Result<SizedBatch> {
292 let sized_batches: SizedBatch = SkipMap::new();
293
294 let prepared = Self::prepare(session, query, consistency, idempotent).await?;
297
298 for batch_size in cassandra_db::MIN_BATCH_SIZE..=cfg.max_batch_size {
299 let mut batch: Batch = Batch::new(if logged {
300 scylla::batch::BatchType::Logged
301 } else {
302 scylla::batch::BatchType::Unlogged
303 });
304 batch.set_consistency(consistency);
305 batch.set_is_idempotent(idempotent);
306 for _ in cassandra_db::MIN_BATCH_SIZE..=batch_size {
307 batch.append_statement(prepared.clone());
308 }
309
310 sized_batches.insert(batch_size.try_into()?, Arc::new(batch));
311 }
312
313 Ok(sized_batches)
314 }
315
316 pub(crate) async fn execute_upsert<P>(
320 &self, session: Arc<Session>, upsert_query: PreparedUpsertQuery, params: P,
321 ) -> anyhow::Result<()>
322 where P: SerializeRow {
323 let prepared_stmt = match upsert_query {
324 PreparedUpsertQuery::SyncStatusInsert => &self.sync_status_insert,
325 };
326
327 session
328 .execute_unpaged(prepared_stmt, params)
329 .await
330 .map_err(|e| anyhow::anyhow!(e))?;
331
332 Ok(())
333 }
334
335 pub(crate) async fn execute_iter<P>(
340 &self, session: Arc<Session>, select_query: PreparedSelectQuery, params: P,
341 ) -> anyhow::Result<QueryPager>
342 where P: SerializeRow {
343 let prepared_stmt = match select_query {
344 PreparedSelectQuery::TxoByStakeAddress => &self.txo_by_stake_address_query,
345 PreparedSelectQuery::TxiByTransactionHash => &self.txi_by_txn_hash_query,
346 PreparedSelectQuery::AssetsByStakeAddress => &self.native_assets_by_stake_address_query,
347 PreparedSelectQuery::RegistrationFromStakeAddr => {
348 &self.registration_from_stake_addr_query
349 },
350 PreparedSelectQuery::StakeAddrFromStakeHash => {
351 &self.stake_addr_from_stake_address_query
352 },
353 PreparedSelectQuery::StakeAddrFromVoteKey => &self.stake_addr_from_vote_key_query,
354 PreparedSelectQuery::InvalidRegistrationsFromStakeAddr => {
355 &self.invalid_registrations_from_stake_addr_query
356 },
357 PreparedSelectQuery::RbacRegistrationsByCatalystId => {
358 &self.rbac_registrations_by_catalyst_id_query
359 },
360 PreparedSelectQuery::RbacInvalidRegistrationsByCatalystId => {
361 &self.rbac_invalid_registrations_by_catalyst_id_query
362 },
363 PreparedSelectQuery::CatalystIdByTransactionId => {
364 &self.catalyst_id_by_transaction_id_query
365 },
366 PreparedSelectQuery::CatalystIdByStakeAddress => {
367 &self.catalyst_id_by_stake_address_query
368 },
369 PreparedSelectQuery::GetAllRegistrations => &self.get_all_registrations_query,
370 PreparedSelectQuery::GetAllInvalidRegistrations => {
371 &self.get_all_invalid_registrations_query
372 },
373 };
374 session_execute_iter(session, prepared_stmt, params).await
375 }
376
377 pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
385 &self, session: Arc<Session>, cfg: Arc<cassandra_db::EnvVars>, query: PreparedQuery,
386 values: Vec<T>,
387 ) -> FallibleQueryResults {
388 let query_map = match query {
389 PreparedQuery::TxoAdaInsertQuery => &self.txo_insert_queries,
390 PreparedQuery::TxoAssetInsertQuery => &self.txo_asset_insert_queries,
391 PreparedQuery::UnstakedTxoAdaInsertQuery => &self.unstaked_txo_insert_queries,
392 PreparedQuery::UnstakedTxoAssetInsertQuery => &self.unstaked_txo_asset_insert_queries,
393 PreparedQuery::TxiInsertQuery => &self.txi_insert_queries,
394 PreparedQuery::StakeRegistrationInsertQuery => &self.stake_registration_insert_queries,
395 PreparedQuery::Cip36RegistrationInsertQuery => &self.cip36_registration_insert_queries,
396 PreparedQuery::Cip36RegistrationInsertErrorQuery => {
397 &self.cip36_registration_error_insert_queries
398 },
399 PreparedQuery::Cip36RegistrationForStakeAddrInsertQuery => {
400 &self.cip36_registration_for_stake_address_insert_queries
401 },
402 PreparedQuery::TxoSpentUpdateQuery => &self.txo_spent_update_queries,
403 PreparedQuery::Rbac509InsertQuery => &self.rbac509_registration_insert_queries,
404 PreparedQuery::Rbac509InvalidInsertQuery => {
405 &self.rbac509_invalid_registration_insert_queries
406 },
407 PreparedQuery::CatalystIdForTxnIdInsertQuery => {
408 &self.catalyst_id_for_txn_id_insert_queries
409 },
410 PreparedQuery::CatalystIdForStakeAddressInsertQuery => {
411 &self.catalyst_id_for_stake_address_insert_queries
412 },
413 };
414 session_execute_batch(session, query_map, cfg, query, values).await
415 }
416}
417
418async fn session_execute_batch<T: SerializeRow + Debug, Q: std::fmt::Display>(
426 session: Arc<Session>, query_map: &SizedBatch, cfg: Arc<cassandra_db::EnvVars>, query: Q,
427 values: Vec<T>,
428) -> FallibleQueryResults {
429 let mut results: Vec<QueryResult> = Vec::new();
430 let mut errors = Vec::new();
431
432 let chunks = values.chunks(cfg.max_batch_size.try_into().unwrap_or(1));
433 let query_str = format!("{query}");
434
435 for chunk in chunks {
436 let chunk_size: u16 = chunk.len().try_into()?;
437 let Some(batch_query) = query_map.get(&chunk_size) else {
438 bail!("No batch query found for size {}", chunk_size);
440 };
441 let batch_query_statements = batch_query.value().clone();
442 match session.batch(&batch_query_statements, chunk).await {
443 Ok(result) => results.push(result),
444 Err(err) => {
445 let chunk_str = format!("{chunk:?}");
446 error!(error=%err, query=query_str, chunk=chunk_str, "Query Execution Failed");
447 errors.push(err);
448 },
450 }
451 }
452
453 if !errors.is_empty() {
454 bail!("Query Failed: {query_str}! {errors:?}");
455 }
456
457 Ok(results)
458}
459
460pub(crate) async fn session_execute_iter<P>(
465 session: Arc<Session>, prepared_stmt: &PreparedStatement, params: P,
466) -> anyhow::Result<QueryPager>
467where P: SerializeRow {
468 session
469 .execute_iter(prepared_stmt.clone(), params)
470 .await
471 .map_err(|e| anyhow::anyhow!(e))
472}