1use std::{fmt::Debug, sync::Arc};
4
5use cardano_blockchain_types::{MultiEraBlock, Slot, StakeAddress, TxnIndex, VKeyHash};
6use ed25519_dalek::VerifyingKey;
7use pallas::ledger::primitives::{alonzo, conway};
8use scylla::{frame::value::MaybeUnset, SerializeRow, Session};
9use tracing::error;
10
11use crate::{
12 db::{
13 index::{
14 queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
15 session::CassandraSession,
16 },
17 types::{DbPublicKey, DbSlot, DbStakeAddress, DbTxnIndex},
18 },
19 settings::cassandra_db,
20};
21
22#[derive(SerializeRow)]
24pub(crate) struct StakeRegistrationInsertQuery {
25 stake_address: DbStakeAddress,
27 slot_no: DbSlot,
29 txn_index: DbTxnIndex,
31 stake_public_key: MaybeUnset<DbPublicKey>,
33 script: bool,
35 register: MaybeUnset<bool>,
37 deregister: MaybeUnset<bool>,
39 pool_delegation: MaybeUnset<Vec<u8>>,
41}
42
43impl Debug for StakeRegistrationInsertQuery {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
45 let stake_public_key = match self.stake_public_key {
46 MaybeUnset::Unset => "UNSET",
47 MaybeUnset::Set(ref v) => &hex::encode(v.as_ref()),
48 };
49 let register = match self.register {
50 MaybeUnset::Unset => "UNSET",
51 MaybeUnset::Set(v) => &format!("{v:?}"),
52 };
53 let deregister = match self.deregister {
54 MaybeUnset::Unset => "UNSET",
55 MaybeUnset::Set(v) => &format!("{v:?}"),
56 };
57 let pool_delegation = match self.pool_delegation {
58 MaybeUnset::Unset => "UNSET",
59 MaybeUnset::Set(ref v) => &hex::encode(v),
60 };
61
62 f.debug_struct("StakeRegistrationInsertQuery")
63 .field("stake_address", &format!("{}", self.stake_address))
64 .field("slot_no", &self.slot_no)
65 .field("txn_index", &self.txn_index)
66 .field("stake_public_key", &stake_public_key)
67 .field("script", &self.script)
68 .field("register", ®ister)
69 .field("deregister", &deregister)
70 .field("pool_delegation", &pool_delegation)
71 .finish()
72 }
73}
74
75const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");
77
78impl StakeRegistrationInsertQuery {
79 #[allow(clippy::too_many_arguments)]
81 pub fn new(
82 stake_address: StakeAddress, slot_no: Slot, txn_index: TxnIndex,
83 stake_public_key: Option<VerifyingKey>, script: bool, register: bool, deregister: bool,
84 pool_delegation: Option<Vec<u8>>,
85 ) -> Self {
86 let stake_public_key =
87 stake_public_key.map_or(MaybeUnset::Unset, |a| MaybeUnset::Set(a.into()));
88 StakeRegistrationInsertQuery {
89 stake_address: stake_address.into(),
90 slot_no: slot_no.into(),
91 txn_index: txn_index.into(),
92 stake_public_key,
93 script,
94 register: if register {
95 MaybeUnset::Set(true)
96 } else {
97 MaybeUnset::Unset
98 },
99 deregister: if deregister {
100 MaybeUnset::Set(true)
101 } else {
102 MaybeUnset::Unset
103 },
104 pool_delegation: if let Some(pool_delegation) = pool_delegation {
105 MaybeUnset::Set(pool_delegation)
106 } else {
107 MaybeUnset::Unset
108 },
109 }
110 }
111
112 pub(crate) async fn prepare_batch(
114 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
115 ) -> anyhow::Result<SizedBatch> {
116 PreparedQueries::prepare_batch(
117 session.clone(),
118 INSERT_STAKE_REGISTRATION_QUERY,
119 cfg,
120 scylla::statement::Consistency::Any,
121 true,
122 false,
123 )
124 .await
125 .inspect_err(
126 |error| error!(error=%error,"Failed to prepare Insert Stake Registration Query."),
127 )
128 .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_STAKE_REGISTRATION_QUERY}"))
129 }
130}
131
132pub(crate) struct CertInsertQuery {
134 stake_reg_data: Vec<StakeRegistrationInsertQuery>,
136}
137
138impl CertInsertQuery {
139 pub(crate) fn new() -> Self {
141 CertInsertQuery {
142 stake_reg_data: Vec::new(),
143 }
144 }
145
146 pub(crate) async fn prepare_batch(
148 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
149 ) -> anyhow::Result<SizedBatch> {
150 StakeRegistrationInsertQuery::prepare_batch(session, cfg).await
153 }
154
155 #[allow(clippy::too_many_arguments)]
157 fn stake_address(
158 &mut self, cred: &alonzo::StakeCredential, slot_no: Slot, txn: TxnIndex, register: bool,
159 deregister: bool, delegation: Option<Vec<u8>>, block: &MultiEraBlock,
160 ) {
161 let (stake_address, pubkey, script) = match *cred {
162 conway::StakeCredential::AddrKeyhash(cred) => {
163 let stake_address = StakeAddress::new(block.network(), false, cred.into());
164 let addr = block.witness_for_tx(&VKeyHash::from(*cred), txn);
165 (stake_address, addr, false)
168 },
169 conway::StakeCredential::Scripthash(h) => {
170 (
171 StakeAddress::new(block.network(), true, h.into()),
172 None,
173 true,
174 )
175 },
176 };
177
178 if pubkey.is_none() && !script && deregister {
179 error!("Stake Deregistration Certificate {stake_address} is NOT Witnessed.");
180 }
181
182 if pubkey.is_none() && !script && delegation.is_some() {
183 error!("Stake Delegation Certificate {stake_address} is NOT Witnessed.");
184 }
185
186 self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
188 stake_address,
189 slot_no,
190 txn,
191 pubkey,
192 script,
193 register,
194 deregister,
195 delegation,
196 ));
197 }
198
199 fn index_alonzo_cert(
201 &mut self, cert: &alonzo::Certificate, slot: Slot, index: TxnIndex, block: &MultiEraBlock,
202 ) {
203 #[allow(clippy::match_same_arms)]
204 match cert {
205 alonzo::Certificate::StakeRegistration(cred) => {
206 self.stake_address(cred, slot, index, true, false, None, block);
208 },
209 alonzo::Certificate::StakeDeregistration(cred) => {
210 self.stake_address(cred, slot, index, false, true, None, block);
211 },
212 alonzo::Certificate::StakeDelegation(cred, pool) => {
213 self.stake_address(cred, slot, index, false, false, Some(pool.to_vec()), block);
214 },
215 alonzo::Certificate::PoolRegistration { .. } => {},
216 alonzo::Certificate::PoolRetirement(..) => {},
217 alonzo::Certificate::GenesisKeyDelegation(..) => {},
218 alonzo::Certificate::MoveInstantaneousRewardsCert(_) => {},
219 }
220 }
221
222 fn index_conway_cert(
224 &mut self, cert: &conway::Certificate, slot_no: Slot, txn: TxnIndex, block: &MultiEraBlock,
225 ) {
226 #[allow(clippy::match_same_arms)]
227 match cert {
228 conway::Certificate::StakeRegistration(cred) => {
229 self.stake_address(cred, slot_no, txn, true, false, None, block);
231 },
232 conway::Certificate::StakeDeregistration(cred) => {
233 self.stake_address(cred, slot_no, txn, false, true, None, block);
234 },
235 conway::Certificate::StakeDelegation(cred, pool) => {
236 self.stake_address(cred, slot_no, txn, false, false, Some(pool.to_vec()), block);
237 },
238 conway::Certificate::PoolRegistration { .. } => {},
239 conway::Certificate::PoolRetirement(..) => {},
240 conway::Certificate::Reg(..) => {},
241 conway::Certificate::UnReg(..) => {},
242 conway::Certificate::VoteDeleg(..) => {},
243 conway::Certificate::StakeVoteDeleg(..) => {},
244 conway::Certificate::StakeRegDeleg(..) => {},
245 conway::Certificate::VoteRegDeleg(..) => {},
246 conway::Certificate::StakeVoteRegDeleg(..) => {},
247 conway::Certificate::AuthCommitteeHot(..) => {},
248 conway::Certificate::ResignCommitteeCold(..) => {},
249 conway::Certificate::RegDRepCert(..) => {},
250 conway::Certificate::UnRegDRepCert(..) => {},
251 conway::Certificate::UpdateDRepCert(..) => {},
252 }
253 }
254
255 pub(crate) fn index(
257 &mut self, txs: &pallas::ledger::traverse::MultiEraTx<'_>, slot: Slot, index: TxnIndex,
258 block: &MultiEraBlock,
259 ) {
260 #[allow(clippy::match_same_arms)]
261 txs.certs().iter().for_each(|cert| {
262 match cert {
263 pallas::ledger::traverse::MultiEraCert::NotApplicable => {},
264 pallas::ledger::traverse::MultiEraCert::AlonzoCompatible(cert) => {
265 self.index_alonzo_cert(cert, slot, index, block);
266 },
267 pallas::ledger::traverse::MultiEraCert::Conway(cert) => {
268 self.index_conway_cert(cert, slot, index, block);
269 },
270 _ => {},
271 }
272 });
273 }
274
275 pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
279 let mut query_handles: FallibleQueryTasks = Vec::new();
280
281 let inner_session = session.clone();
282
283 query_handles.push(tokio::spawn(async move {
284 inner_session
285 .execute_batch(
286 PreparedQuery::StakeRegistrationInsertQuery,
287 self.stake_reg_data,
288 )
289 .await
290 }));
291
292 query_handles
293 }
294}