cat_gateway/db/index/block/
certs.rs

1//! Index certs found in a transaction.
2
3use std::{fmt::Debug, sync::Arc};
4
5use cardano_blockchain_types::{MultiEraBlock, Slot, StakeAddress, TxnIndex, VKeyHash};
6use ed25519_dalek::VerifyingKey;
7use pallas::ledger::primitives::{alonzo, conway};
8use scylla::{frame::value::MaybeUnset, SerializeRow, Session};
9use tracing::error;
10
11use crate::{
12    db::{
13        index::{
14            queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
15            session::CassandraSession,
16        },
17        types::{DbPublicKey, DbSlot, DbStakeAddress, DbTxnIndex},
18    },
19    settings::cassandra_db,
20};
21
22/// Insert TXI Query and Parameters
23#[derive(SerializeRow)]
24pub(crate) struct StakeRegistrationInsertQuery {
25    /// Stake address (29 bytes).
26    stake_address: DbStakeAddress,
27    /// Slot Number the cert is in.
28    slot_no: DbSlot,
29    /// Transaction Index.
30    txn_index: DbTxnIndex,
31    /// Full Stake Public Key (32 byte Ed25519 Public key, not hashed).
32    stake_public_key: MaybeUnset<DbPublicKey>,
33    /// Is the stake address a script or not.
34    script: bool,
35    /// Is the Certificate Registered?
36    register: MaybeUnset<bool>,
37    /// Is the Certificate Deregistered?
38    deregister: MaybeUnset<bool>,
39    /// Pool Delegation Address
40    pool_delegation: MaybeUnset<Vec<u8>>,
41}
42
43impl Debug for StakeRegistrationInsertQuery {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
45        let stake_public_key = match self.stake_public_key {
46            MaybeUnset::Unset => "UNSET",
47            MaybeUnset::Set(ref v) => &hex::encode(v.as_ref()),
48        };
49        let register = match self.register {
50            MaybeUnset::Unset => "UNSET",
51            MaybeUnset::Set(v) => &format!("{v:?}"),
52        };
53        let deregister = match self.deregister {
54            MaybeUnset::Unset => "UNSET",
55            MaybeUnset::Set(v) => &format!("{v:?}"),
56        };
57        let pool_delegation = match self.pool_delegation {
58            MaybeUnset::Unset => "UNSET",
59            MaybeUnset::Set(ref v) => &hex::encode(v),
60        };
61
62        f.debug_struct("StakeRegistrationInsertQuery")
63            .field("stake_address", &format!("{}", self.stake_address))
64            .field("slot_no", &self.slot_no)
65            .field("txn_index", &self.txn_index)
66            .field("stake_public_key", &stake_public_key)
67            .field("script", &self.script)
68            .field("register", &register)
69            .field("deregister", &deregister)
70            .field("pool_delegation", &pool_delegation)
71            .finish()
72    }
73}
74
75/// TXI by Txn hash Index
76const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");
77
78impl StakeRegistrationInsertQuery {
79    /// Create a new Insert Query.
80    #[allow(clippy::too_many_arguments)]
81    pub fn new(
82        stake_address: StakeAddress, slot_no: Slot, txn_index: TxnIndex,
83        stake_public_key: Option<VerifyingKey>, script: bool, register: bool, deregister: bool,
84        pool_delegation: Option<Vec<u8>>,
85    ) -> Self {
86        let stake_public_key =
87            stake_public_key.map_or(MaybeUnset::Unset, |a| MaybeUnset::Set(a.into()));
88        StakeRegistrationInsertQuery {
89            stake_address: stake_address.into(),
90            slot_no: slot_no.into(),
91            txn_index: txn_index.into(),
92            stake_public_key,
93            script,
94            register: if register {
95                MaybeUnset::Set(true)
96            } else {
97                MaybeUnset::Unset
98            },
99            deregister: if deregister {
100                MaybeUnset::Set(true)
101            } else {
102                MaybeUnset::Unset
103            },
104            pool_delegation: if let Some(pool_delegation) = pool_delegation {
105                MaybeUnset::Set(pool_delegation)
106            } else {
107                MaybeUnset::Unset
108            },
109        }
110    }
111
112    /// Prepare Batch of Insert TXI Index Data Queries
113    pub(crate) async fn prepare_batch(
114        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
115    ) -> anyhow::Result<SizedBatch> {
116        PreparedQueries::prepare_batch(
117            session.clone(),
118            INSERT_STAKE_REGISTRATION_QUERY,
119            cfg,
120            scylla::statement::Consistency::Any,
121            true,
122            false,
123        )
124        .await
125        .inspect_err(
126            |error| error!(error=%error,"Failed to prepare Insert Stake Registration Query."),
127        )
128        .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_STAKE_REGISTRATION_QUERY}"))
129    }
130}
131
132/// Insert Cert Queries
133pub(crate) struct CertInsertQuery {
134    /// Stake Registration Data captured during indexing.
135    stake_reg_data: Vec<StakeRegistrationInsertQuery>,
136}
137
138impl CertInsertQuery {
139    /// Create new data set for Cert Insert Query Batch.
140    pub(crate) fn new() -> Self {
141        CertInsertQuery {
142            stake_reg_data: Vec::new(),
143        }
144    }
145
146    /// Prepare Batch of Insert TXI Index Data Queries
147    pub(crate) async fn prepare_batch(
148        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
149    ) -> anyhow::Result<SizedBatch> {
150        // Note: for now we have one query, but there are many certs, and later we may have more
151        // to add here.
152        StakeRegistrationInsertQuery::prepare_batch(session, cfg).await
153    }
154
155    /// Get the stake address for a hash, return an empty address if one can not be found.
156    #[allow(clippy::too_many_arguments)]
157    fn stake_address(
158        &mut self, cred: &alonzo::StakeCredential, slot_no: Slot, txn: TxnIndex, register: bool,
159        deregister: bool, delegation: Option<Vec<u8>>, block: &MultiEraBlock,
160    ) {
161        let (stake_address, pubkey, script) = match *cred {
162            conway::StakeCredential::AddrKeyhash(cred) => {
163                let stake_address = StakeAddress::new(block.network(), false, cred.into());
164                let addr = block.witness_for_tx(&VKeyHash::from(*cred), txn);
165                // Note: it is totally possible for the Registration Certificate to not be
166                // witnessed.
167                (stake_address, addr, false)
168            },
169            conway::StakeCredential::Scripthash(h) => {
170                (
171                    StakeAddress::new(block.network(), true, h.into()),
172                    None,
173                    true,
174                )
175            },
176        };
177
178        if pubkey.is_none() && !script && deregister {
179            error!("Stake Deregistration Certificate {stake_address} is NOT Witnessed.");
180        }
181
182        if pubkey.is_none() && !script && delegation.is_some() {
183            error!("Stake Delegation Certificate {stake_address} is NOT Witnessed.");
184        }
185
186        // This may not be witnessed, its normal but disappointing.
187        self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
188            stake_address,
189            slot_no,
190            txn,
191            pubkey,
192            script,
193            register,
194            deregister,
195            delegation,
196        ));
197    }
198
199    /// Index an Alonzo Era certificate into the database.
200    fn index_alonzo_cert(
201        &mut self, cert: &alonzo::Certificate, slot: Slot, index: TxnIndex, block: &MultiEraBlock,
202    ) {
203        #[allow(clippy::match_same_arms)]
204        match cert {
205            alonzo::Certificate::StakeRegistration(cred) => {
206                // This may not be witnessed, its normal but disappointing.
207                self.stake_address(cred, slot, index, true, false, None, block);
208            },
209            alonzo::Certificate::StakeDeregistration(cred) => {
210                self.stake_address(cred, slot, index, false, true, None, block);
211            },
212            alonzo::Certificate::StakeDelegation(cred, pool) => {
213                self.stake_address(cred, slot, index, false, false, Some(pool.to_vec()), block);
214            },
215            alonzo::Certificate::PoolRegistration { .. } => {},
216            alonzo::Certificate::PoolRetirement(..) => {},
217            alonzo::Certificate::GenesisKeyDelegation(..) => {},
218            alonzo::Certificate::MoveInstantaneousRewardsCert(_) => {},
219        }
220    }
221
222    /// Index a certificate from a conway transaction.
223    fn index_conway_cert(
224        &mut self, cert: &conway::Certificate, slot_no: Slot, txn: TxnIndex, block: &MultiEraBlock,
225    ) {
226        #[allow(clippy::match_same_arms)]
227        match cert {
228            conway::Certificate::StakeRegistration(cred) => {
229                // This may not be witnessed, its normal but disappointing.
230                self.stake_address(cred, slot_no, txn, true, false, None, block);
231            },
232            conway::Certificate::StakeDeregistration(cred) => {
233                self.stake_address(cred, slot_no, txn, false, true, None, block);
234            },
235            conway::Certificate::StakeDelegation(cred, pool) => {
236                self.stake_address(cred, slot_no, txn, false, false, Some(pool.to_vec()), block);
237            },
238            conway::Certificate::PoolRegistration { .. } => {},
239            conway::Certificate::PoolRetirement(..) => {},
240            conway::Certificate::Reg(..) => {},
241            conway::Certificate::UnReg(..) => {},
242            conway::Certificate::VoteDeleg(..) => {},
243            conway::Certificate::StakeVoteDeleg(..) => {},
244            conway::Certificate::StakeRegDeleg(..) => {},
245            conway::Certificate::VoteRegDeleg(..) => {},
246            conway::Certificate::StakeVoteRegDeleg(..) => {},
247            conway::Certificate::AuthCommitteeHot(..) => {},
248            conway::Certificate::ResignCommitteeCold(..) => {},
249            conway::Certificate::RegDRepCert(..) => {},
250            conway::Certificate::UnRegDRepCert(..) => {},
251            conway::Certificate::UpdateDRepCert(..) => {},
252        }
253    }
254
255    /// Index the certificates in a transaction.
256    pub(crate) fn index(
257        &mut self, txs: &pallas::ledger::traverse::MultiEraTx<'_>, slot: Slot, index: TxnIndex,
258        block: &MultiEraBlock,
259    ) {
260        #[allow(clippy::match_same_arms)]
261        txs.certs().iter().for_each(|cert| {
262            match cert {
263                pallas::ledger::traverse::MultiEraCert::NotApplicable => {},
264                pallas::ledger::traverse::MultiEraCert::AlonzoCompatible(cert) => {
265                    self.index_alonzo_cert(cert, slot, index, block);
266                },
267                pallas::ledger::traverse::MultiEraCert::Conway(cert) => {
268                    self.index_conway_cert(cert, slot, index, block);
269                },
270                _ => {},
271            }
272        });
273    }
274
275    /// Execute the Certificate Indexing Queries.
276    ///
277    /// Consumes the `self` and returns a vector of futures.
278    pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
279        let mut query_handles: FallibleQueryTasks = Vec::new();
280
281        let inner_session = session.clone();
282
283        query_handles.push(tokio::spawn(async move {
284            inner_session
285                .execute_batch(
286                    PreparedQuery::StakeRegistrationInsertQuery,
287                    self.stake_reg_data,
288                )
289                .await
290        }));
291
292        query_handles
293    }
294}