cat_gateway/db/index/block/
certs.rs

1//! Index certs found in a transaction.
2
3use std::{fmt::Debug, sync::Arc};
4
5use cardano_blockchain_types::{MultiEraBlock, Slot, StakeAddress, TxnIndex, VKeyHash};
6use ed25519_dalek::VerifyingKey;
7use pallas::ledger::primitives::{alonzo, conway};
8use scylla::{frame::value::MaybeUnset, SerializeRow, Session};
9use tracing::error;
10
11use crate::{
12    db::{
13        index::{
14            queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
15            session::CassandraSession,
16        },
17        types::{DbPublicKey, DbSlot, DbStakeAddress, DbTxnIndex},
18    },
19    settings::cassandra_db,
20};
21
22/// Insert stake registration query
23#[derive(SerializeRow)]
24pub(crate) struct StakeRegistrationInsertQuery {
25    /// Stake address (29 bytes).
26    stake_address: DbStakeAddress,
27    /// Slot Number the cert is in.
28    slot_no: DbSlot,
29    /// Transaction Index.
30    txn_index: DbTxnIndex,
31    /// Full Stake Public Key (32 byte Ed25519 Public key, not hashed).
32    stake_public_key: DbPublicKey,
33    /// Is the stake address a script or not.
34    script: bool,
35    /// Is the Cardano Certificate Registered
36    register: MaybeUnset<bool>,
37    /// Is the Cardano Certificate Deregistered
38    deregister: MaybeUnset<bool>,
39    /// Is the stake address contains CIP36 registration?
40    cip36: MaybeUnset<bool>,
41    /// Pool Delegation Address
42    pool_delegation: MaybeUnset<Vec<u8>>,
43}
44
45impl Debug for StakeRegistrationInsertQuery {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
47        let stake_public_key = hex::encode(self.stake_public_key.as_ref());
48        let register = match self.register {
49            MaybeUnset::Unset => "UNSET",
50            MaybeUnset::Set(v) => &format!("{v:?}"),
51        };
52        let deregister = match self.deregister {
53            MaybeUnset::Unset => "UNSET",
54            MaybeUnset::Set(v) => &format!("{v:?}"),
55        };
56        let cip36 = match self.cip36 {
57            MaybeUnset::Unset => "UNSET",
58            MaybeUnset::Set(v) => &format!("{v:?}"),
59        };
60        let pool_delegation = match self.pool_delegation {
61            MaybeUnset::Unset => "UNSET",
62            MaybeUnset::Set(ref v) => &hex::encode(v),
63        };
64
65        f.debug_struct("StakeRegistrationInsertQuery")
66            .field("stake_address", &format!("{}", self.stake_address))
67            .field("slot_no", &self.slot_no)
68            .field("txn_index", &self.txn_index)
69            .field("stake_public_key", &stake_public_key)
70            .field("script", &self.script)
71            .field("register", &register)
72            .field("deregister", &deregister)
73            .field("cip36", &cip36)
74            .field("pool_delegation", &pool_delegation)
75            .finish()
76    }
77}
78
79/// Insert stake registration
80const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");
81
82impl StakeRegistrationInsertQuery {
83    /// Create a new Insert Query.
84    #[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
85    pub fn new(
86        stake_address: StakeAddress, slot_no: Slot, txn_index: TxnIndex,
87        stake_public_key: VerifyingKey, script: bool, register: bool, deregister: bool,
88        cip36: bool, pool_delegation: Option<Vec<u8>>,
89    ) -> Self {
90        StakeRegistrationInsertQuery {
91            stake_address: stake_address.into(),
92            slot_no: slot_no.into(),
93            txn_index: txn_index.into(),
94            stake_public_key: stake_public_key.into(),
95            script,
96            register: if register {
97                MaybeUnset::Set(true)
98            } else {
99                MaybeUnset::Unset
100            },
101            deregister: if deregister {
102                MaybeUnset::Set(true)
103            } else {
104                MaybeUnset::Unset
105            },
106            cip36: if cip36 {
107                MaybeUnset::Set(true)
108            } else {
109                MaybeUnset::Unset
110            },
111            pool_delegation: if let Some(pool_delegation) = pool_delegation {
112                MaybeUnset::Set(pool_delegation)
113            } else {
114                MaybeUnset::Unset
115            },
116        }
117    }
118
119    /// Prepare Batch of Insert stake registration.
120    pub(crate) async fn prepare_batch(
121        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
122    ) -> anyhow::Result<SizedBatch> {
123        PreparedQueries::prepare_batch(
124            session.clone(),
125            INSERT_STAKE_REGISTRATION_QUERY,
126            cfg,
127            scylla::statement::Consistency::Any,
128            true,
129            false,
130        )
131        .await
132        .inspect_err(
133            |error| error!(error=%error,"Failed to prepare Insert Stake Registration Query."),
134        )
135        .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_STAKE_REGISTRATION_QUERY}"))
136    }
137}
138
139/// Insert Cert Queries
140pub(crate) struct CertInsertQuery {
141    /// Stake Registration Data captured during indexing.
142    stake_reg_data: Vec<StakeRegistrationInsertQuery>,
143}
144
145impl CertInsertQuery {
146    /// Create new data set for Cert Insert Query Batch.
147    pub(crate) fn new() -> Self {
148        CertInsertQuery {
149            stake_reg_data: Vec::new(),
150        }
151    }
152
153    /// Prepare Batch of Insert TXI Index Data Queries
154    pub(crate) async fn prepare_batch(
155        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
156    ) -> anyhow::Result<SizedBatch> {
157        // Note: for now we have one query, but there are many certs, and later we may have more
158        // to add here.
159        StakeRegistrationInsertQuery::prepare_batch(session, cfg).await
160    }
161
162    /// Get the stake address for a hash, return an empty address if one can not be found.
163    #[allow(clippy::too_many_arguments)]
164    fn stake_address(
165        &mut self, cred: &alonzo::StakeCredential, slot_no: Slot, txn: TxnIndex, register: bool,
166        deregister: bool, delegation: Option<Vec<u8>>, block: &MultiEraBlock,
167    ) {
168        let (stake_address, pubkey, script) = match *cred {
169            conway::StakeCredential::AddrKeyhash(cred) => {
170                let stake_address = StakeAddress::new(block.network(), false, cred.into());
171                let addr = block.witness_for_tx(&VKeyHash::from(*cred), txn);
172                // Note: it is totally possible for the Registration Certificate to not be
173                // witnessed.
174                (stake_address, addr, false)
175            },
176            conway::StakeCredential::Scripthash(h) => {
177                (
178                    StakeAddress::new(block.network(), true, h.into()),
179                    None,
180                    true,
181                )
182            },
183        };
184
185        if pubkey.is_none() && !script && deregister {
186            error!("Stake Deregistration Certificate {stake_address} is NOT Witnessed.");
187        }
188
189        if pubkey.is_none() && !script && delegation.is_some() {
190            error!("Stake Delegation Certificate {stake_address} is NOT Witnessed.");
191        }
192
193        // This may not be witnessed, its normal but disappointing.
194        if let Some(pubkey) = pubkey {
195            self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
196                stake_address,
197                slot_no,
198                txn,
199                pubkey,
200                script,
201                register,
202                deregister,
203                false,
204                delegation,
205            ));
206        }
207    }
208
209    /// Index an Alonzo Era certificate into the database.
210    fn index_alonzo_cert(
211        &mut self, cert: &alonzo::Certificate, slot: Slot, index: TxnIndex, block: &MultiEraBlock,
212    ) {
213        #[allow(clippy::match_same_arms)]
214        match cert {
215            alonzo::Certificate::StakeRegistration(cred) => {
216                // This may not be witnessed, its normal but disappointing.
217                self.stake_address(cred, slot, index, true, false, None, block);
218            },
219            alonzo::Certificate::StakeDeregistration(cred) => {
220                self.stake_address(cred, slot, index, false, true, None, block);
221            },
222            alonzo::Certificate::StakeDelegation(cred, pool) => {
223                self.stake_address(cred, slot, index, false, false, Some(pool.to_vec()), block);
224            },
225            alonzo::Certificate::PoolRegistration { .. } => {},
226            alonzo::Certificate::PoolRetirement(..) => {},
227            alonzo::Certificate::GenesisKeyDelegation(..) => {},
228            alonzo::Certificate::MoveInstantaneousRewardsCert(_) => {},
229        }
230    }
231
232    /// Index a certificate from a conway transaction.
233    fn index_conway_cert(
234        &mut self, cert: &conway::Certificate, slot_no: Slot, txn: TxnIndex, block: &MultiEraBlock,
235    ) {
236        #[allow(clippy::match_same_arms)]
237        match cert {
238            conway::Certificate::StakeRegistration(cred) => {
239                // This may not be witnessed, its normal but disappointing.
240                self.stake_address(cred, slot_no, txn, true, false, None, block);
241            },
242            conway::Certificate::StakeDeregistration(cred) => {
243                self.stake_address(cred, slot_no, txn, false, true, None, block);
244            },
245            conway::Certificate::StakeDelegation(cred, pool) => {
246                self.stake_address(cred, slot_no, txn, false, false, Some(pool.to_vec()), block);
247            },
248            conway::Certificate::PoolRegistration { .. } => {},
249            conway::Certificate::PoolRetirement(..) => {},
250            conway::Certificate::Reg(..) => {},
251            conway::Certificate::UnReg(..) => {},
252            conway::Certificate::VoteDeleg(..) => {},
253            conway::Certificate::StakeVoteDeleg(..) => {},
254            conway::Certificate::StakeRegDeleg(..) => {},
255            conway::Certificate::VoteRegDeleg(..) => {},
256            conway::Certificate::StakeVoteRegDeleg(..) => {},
257            conway::Certificate::AuthCommitteeHot(..) => {},
258            conway::Certificate::ResignCommitteeCold(..) => {},
259            conway::Certificate::RegDRepCert(..) => {},
260            conway::Certificate::UnRegDRepCert(..) => {},
261            conway::Certificate::UpdateDRepCert(..) => {},
262        }
263    }
264
265    /// Index the certificates in a transaction.
266    pub(crate) fn index(
267        &mut self, txs: &pallas::ledger::traverse::MultiEraTx<'_>, slot: Slot, index: TxnIndex,
268        block: &MultiEraBlock,
269    ) {
270        #[allow(clippy::match_same_arms)]
271        txs.certs().iter().for_each(|cert| {
272            match cert {
273                pallas::ledger::traverse::MultiEraCert::NotApplicable => {},
274                pallas::ledger::traverse::MultiEraCert::AlonzoCompatible(cert) => {
275                    self.index_alonzo_cert(cert, slot, index, block);
276                },
277                pallas::ledger::traverse::MultiEraCert::Conway(cert) => {
278                    self.index_conway_cert(cert, slot, index, block);
279                },
280                _ => {},
281            }
282        });
283    }
284
285    /// Execute the Certificate Indexing Queries.
286    ///
287    /// Consumes the `self` and returns a vector of futures.
288    pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
289        let mut query_handles: FallibleQueryTasks = Vec::new();
290
291        let inner_session = session.clone();
292
293        query_handles.push(tokio::spawn(async move {
294            inner_session
295                .execute_batch(
296                    PreparedQuery::StakeRegistrationInsertQuery,
297                    self.stake_reg_data,
298                )
299                .await
300        }));
301
302        query_handles
303    }
304}