cat_gateway/db/index/block/cip36/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
//! Index CIP-36 Registrations.
mod insert_cip36;
mod insert_cip36_for_vote_key;
mod insert_cip36_invalid;
use std::sync::Arc;
use cardano_chain_follower::{Metadata, MultiEraBlock};
use scylla::Session;
use crate::{
db::index::{
queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
session::CassandraSession,
},
settings::cassandra_db,
};
/// Insert CIP-36 Registration Queries
pub(crate) struct Cip36InsertQuery {
/// Stake Registration Data captured during indexing.
registrations: Vec<insert_cip36::Params>,
/// Stake Registration Data captured during indexing.
invalid: Vec<insert_cip36_invalid::Params>,
/// Stake Registration Data captured during indexing.
for_vote_key: Vec<insert_cip36_for_vote_key::Params>,
}
impl Cip36InsertQuery {
/// Create new data set for CIP-36 Registrations Insert Query Batch.
pub(crate) fn new() -> Self {
Cip36InsertQuery {
registrations: Vec::new(),
invalid: Vec::new(),
for_vote_key: Vec::new(),
}
}
/// Prepare Batch of Insert Cip36 Registration Data Queries
pub(crate) async fn prepare_batch(
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch)> {
let insert_cip36_batch = insert_cip36::Params::prepare_batch(session, cfg).await;
let insert_cip36_invalid_batch =
insert_cip36_invalid::Params::prepare_batch(session, cfg).await;
let insert_cip36_for_vote_key_addr_batch =
insert_cip36_for_vote_key::Params::prepare_batch(session, cfg).await;
Ok((
insert_cip36_batch?,
insert_cip36_invalid_batch?,
insert_cip36_for_vote_key_addr_batch?,
))
}
/// Index the CIP-36 registrations in a transaction.
pub(crate) fn index(
&mut self, txn: usize, txn_index: i16, slot_no: u64, block: &MultiEraBlock,
) {
if let Some(decoded_metadata) = block.txn_metadata(txn, Metadata::cip36::LABEL) {
#[allow(irrefutable_let_patterns)]
if let Metadata::DecodedMetadataValues::Cip36(cip36) = &decoded_metadata.value {
// Check if we are indexing a valid or invalid registration.
// Note, we ONLY care about catalyst, we should only have 1 voting key, if not, call
// it an error.
if decoded_metadata.report.is_empty() && cip36.voting_keys.len() == 1 {
// Always true, because we already checked if the array has only one entry.
if let Some(vote_key) = cip36.voting_keys.first() {
self.registrations.push(insert_cip36::Params::new(
vote_key, slot_no, txn_index, cip36,
));
self.for_vote_key
.push(insert_cip36_for_vote_key::Params::new(
vote_key, slot_no, txn_index, cip36, true,
));
}
} else if cip36.stake_pk.is_some() {
// We can't index an error, if there is no stake public key.
if cip36.voting_keys.is_empty() {
self.invalid.push(insert_cip36_invalid::Params::new(
None,
slot_no,
txn_index,
cip36,
decoded_metadata.report.clone(),
));
}
for vote_key in &cip36.voting_keys {
self.invalid.push(insert_cip36_invalid::Params::new(
Some(vote_key),
slot_no,
txn_index,
cip36,
decoded_metadata.report.clone(),
));
self.for_vote_key
.push(insert_cip36_for_vote_key::Params::new(
vote_key, slot_no, txn_index, cip36, false,
));
}
}
}
}
}
/// Execute the CIP-36 Registration Indexing Queries.
///
/// Consumes the `self` and returns a vector of futures.
pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
let mut query_handles: FallibleQueryTasks = Vec::new();
if !self.registrations.is_empty() {
let inner_session = session.clone();
query_handles.push(tokio::spawn(async move {
inner_session
.execute_batch(
PreparedQuery::Cip36RegistrationInsertQuery,
self.registrations,
)
.await
}));
}
if !self.invalid.is_empty() {
let inner_session = session.clone();
query_handles.push(tokio::spawn(async move {
inner_session
.execute_batch(
PreparedQuery::Cip36RegistrationInsertErrorQuery,
self.invalid,
)
.await
}));
}
if !self.for_vote_key.is_empty() {
let inner_session = session.clone();
query_handles.push(tokio::spawn(async move {
inner_session
.execute_batch(
PreparedQuery::Cip36RegistrationForStakeAddrInsertQuery,
self.for_vote_key,
)
.await
}));
}
query_handles
}
}