1use std::{fmt::Debug, sync::Arc};
4
5use cardano_blockchain_types::{MultiEraBlock, Slot, StakeAddress, TxnIndex, VKeyHash};
6use ed25519_dalek::VerifyingKey;
7use pallas::ledger::primitives::{alonzo, conway};
8use scylla::{frame::value::MaybeUnset, SerializeRow, Session};
9use tracing::error;
10
11use crate::{
12 db::{
13 index::{
14 queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
15 session::CassandraSession,
16 },
17 types::{DbPublicKey, DbSlot, DbStakeAddress, DbTxnIndex},
18 },
19 settings::cassandra_db,
20};
21
22#[derive(SerializeRow)]
24pub(crate) struct StakeRegistrationInsertQuery {
25 stake_address: DbStakeAddress,
27 slot_no: DbSlot,
29 txn_index: DbTxnIndex,
31 stake_public_key: DbPublicKey,
33 script: bool,
35 register: MaybeUnset<bool>,
37 deregister: MaybeUnset<bool>,
39 cip36: MaybeUnset<bool>,
41 pool_delegation: MaybeUnset<Vec<u8>>,
43}
44
45impl Debug for StakeRegistrationInsertQuery {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
47 let stake_public_key = hex::encode(self.stake_public_key.as_ref());
48 let register = match self.register {
49 MaybeUnset::Unset => "UNSET",
50 MaybeUnset::Set(v) => &format!("{v:?}"),
51 };
52 let deregister = match self.deregister {
53 MaybeUnset::Unset => "UNSET",
54 MaybeUnset::Set(v) => &format!("{v:?}"),
55 };
56 let cip36 = match self.cip36 {
57 MaybeUnset::Unset => "UNSET",
58 MaybeUnset::Set(v) => &format!("{v:?}"),
59 };
60 let pool_delegation = match self.pool_delegation {
61 MaybeUnset::Unset => "UNSET",
62 MaybeUnset::Set(ref v) => &hex::encode(v),
63 };
64
65 f.debug_struct("StakeRegistrationInsertQuery")
66 .field("stake_address", &format!("{}", self.stake_address))
67 .field("slot_no", &self.slot_no)
68 .field("txn_index", &self.txn_index)
69 .field("stake_public_key", &stake_public_key)
70 .field("script", &self.script)
71 .field("register", ®ister)
72 .field("deregister", &deregister)
73 .field("cip36", &cip36)
74 .field("pool_delegation", &pool_delegation)
75 .finish()
76 }
77}
78
79const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");
81
82impl StakeRegistrationInsertQuery {
83 #[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
85 pub fn new(
86 stake_address: StakeAddress, slot_no: Slot, txn_index: TxnIndex,
87 stake_public_key: VerifyingKey, script: bool, register: bool, deregister: bool,
88 cip36: bool, pool_delegation: Option<Vec<u8>>,
89 ) -> Self {
90 StakeRegistrationInsertQuery {
91 stake_address: stake_address.into(),
92 slot_no: slot_no.into(),
93 txn_index: txn_index.into(),
94 stake_public_key: stake_public_key.into(),
95 script,
96 register: if register {
97 MaybeUnset::Set(true)
98 } else {
99 MaybeUnset::Unset
100 },
101 deregister: if deregister {
102 MaybeUnset::Set(true)
103 } else {
104 MaybeUnset::Unset
105 },
106 cip36: if cip36 {
107 MaybeUnset::Set(true)
108 } else {
109 MaybeUnset::Unset
110 },
111 pool_delegation: if let Some(pool_delegation) = pool_delegation {
112 MaybeUnset::Set(pool_delegation)
113 } else {
114 MaybeUnset::Unset
115 },
116 }
117 }
118
119 pub(crate) async fn prepare_batch(
121 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
122 ) -> anyhow::Result<SizedBatch> {
123 PreparedQueries::prepare_batch(
124 session.clone(),
125 INSERT_STAKE_REGISTRATION_QUERY,
126 cfg,
127 scylla::statement::Consistency::Any,
128 true,
129 false,
130 )
131 .await
132 .inspect_err(
133 |error| error!(error=%error,"Failed to prepare Insert Stake Registration Query."),
134 )
135 .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_STAKE_REGISTRATION_QUERY}"))
136 }
137}
138
139pub(crate) struct CertInsertQuery {
141 stake_reg_data: Vec<StakeRegistrationInsertQuery>,
143}
144
145impl CertInsertQuery {
146 pub(crate) fn new() -> Self {
148 CertInsertQuery {
149 stake_reg_data: Vec::new(),
150 }
151 }
152
153 pub(crate) async fn prepare_batch(
155 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
156 ) -> anyhow::Result<SizedBatch> {
157 StakeRegistrationInsertQuery::prepare_batch(session, cfg).await
160 }
161
162 #[allow(clippy::too_many_arguments)]
164 fn stake_address(
165 &mut self, cred: &alonzo::StakeCredential, slot_no: Slot, txn: TxnIndex, register: bool,
166 deregister: bool, delegation: Option<Vec<u8>>, block: &MultiEraBlock,
167 ) {
168 let (stake_address, pubkey, script) = match *cred {
169 conway::StakeCredential::AddrKeyhash(cred) => {
170 let stake_address = StakeAddress::new(block.network(), false, cred.into());
171 let addr = block.witness_for_tx(&VKeyHash::from(*cred), txn);
172 (stake_address, addr, false)
175 },
176 conway::StakeCredential::Scripthash(h) => {
177 (
178 StakeAddress::new(block.network(), true, h.into()),
179 None,
180 true,
181 )
182 },
183 };
184
185 if pubkey.is_none() && !script && deregister {
186 error!("Stake Deregistration Certificate {stake_address} is NOT Witnessed.");
187 }
188
189 if pubkey.is_none() && !script && delegation.is_some() {
190 error!("Stake Delegation Certificate {stake_address} is NOT Witnessed.");
191 }
192
193 if let Some(pubkey) = pubkey {
195 self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
196 stake_address,
197 slot_no,
198 txn,
199 pubkey,
200 script,
201 register,
202 deregister,
203 false,
204 delegation,
205 ));
206 }
207 }
208
209 fn index_alonzo_cert(
211 &mut self, cert: &alonzo::Certificate, slot: Slot, index: TxnIndex, block: &MultiEraBlock,
212 ) {
213 #[allow(clippy::match_same_arms)]
214 match cert {
215 alonzo::Certificate::StakeRegistration(cred) => {
216 self.stake_address(cred, slot, index, true, false, None, block);
218 },
219 alonzo::Certificate::StakeDeregistration(cred) => {
220 self.stake_address(cred, slot, index, false, true, None, block);
221 },
222 alonzo::Certificate::StakeDelegation(cred, pool) => {
223 self.stake_address(cred, slot, index, false, false, Some(pool.to_vec()), block);
224 },
225 alonzo::Certificate::PoolRegistration { .. } => {},
226 alonzo::Certificate::PoolRetirement(..) => {},
227 alonzo::Certificate::GenesisKeyDelegation(..) => {},
228 alonzo::Certificate::MoveInstantaneousRewardsCert(_) => {},
229 }
230 }
231
232 fn index_conway_cert(
234 &mut self, cert: &conway::Certificate, slot_no: Slot, txn: TxnIndex, block: &MultiEraBlock,
235 ) {
236 #[allow(clippy::match_same_arms)]
237 match cert {
238 conway::Certificate::StakeRegistration(cred) => {
239 self.stake_address(cred, slot_no, txn, true, false, None, block);
241 },
242 conway::Certificate::StakeDeregistration(cred) => {
243 self.stake_address(cred, slot_no, txn, false, true, None, block);
244 },
245 conway::Certificate::StakeDelegation(cred, pool) => {
246 self.stake_address(cred, slot_no, txn, false, false, Some(pool.to_vec()), block);
247 },
248 conway::Certificate::PoolRegistration { .. } => {},
249 conway::Certificate::PoolRetirement(..) => {},
250 conway::Certificate::Reg(..) => {},
251 conway::Certificate::UnReg(..) => {},
252 conway::Certificate::VoteDeleg(..) => {},
253 conway::Certificate::StakeVoteDeleg(..) => {},
254 conway::Certificate::StakeRegDeleg(..) => {},
255 conway::Certificate::VoteRegDeleg(..) => {},
256 conway::Certificate::StakeVoteRegDeleg(..) => {},
257 conway::Certificate::AuthCommitteeHot(..) => {},
258 conway::Certificate::ResignCommitteeCold(..) => {},
259 conway::Certificate::RegDRepCert(..) => {},
260 conway::Certificate::UnRegDRepCert(..) => {},
261 conway::Certificate::UpdateDRepCert(..) => {},
262 }
263 }
264
265 pub(crate) fn index(
267 &mut self, txs: &pallas::ledger::traverse::MultiEraTx<'_>, slot: Slot, index: TxnIndex,
268 block: &MultiEraBlock,
269 ) {
270 #[allow(clippy::match_same_arms)]
271 txs.certs().iter().for_each(|cert| {
272 match cert {
273 pallas::ledger::traverse::MultiEraCert::NotApplicable => {},
274 pallas::ledger::traverse::MultiEraCert::AlonzoCompatible(cert) => {
275 self.index_alonzo_cert(cert, slot, index, block);
276 },
277 pallas::ledger::traverse::MultiEraCert::Conway(cert) => {
278 self.index_conway_cert(cert, slot, index, block);
279 },
280 _ => {},
281 }
282 });
283 }
284
285 pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
289 let mut query_handles: FallibleQueryTasks = Vec::new();
290
291 let inner_session = session.clone();
292
293 query_handles.push(tokio::spawn(async move {
294 inner_session
295 .execute_batch(
296 PreparedQuery::StakeRegistrationInsertQuery,
297 self.stake_reg_data,
298 )
299 .await
300 }));
301
302 query_handles
303 }
304}