cat_gateway/db/index/block/
certs.rs

1//! Index certs found in a transaction.
2
3use std::{fmt::Debug, sync::Arc};
4
5use cardano_blockchain_types::{MultiEraBlock, Slot, StakeAddress, TxnIndex, VKeyHash};
6use ed25519_dalek::VerifyingKey;
7use pallas::ledger::primitives::{alonzo, conway};
8use scylla::{frame::value::MaybeUnset, SerializeRow, Session};
9use tracing::error;
10
11use crate::{
12    db::{
13        index::{
14            queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
15            session::CassandraSession,
16        },
17        types::{DbPublicKey, DbSlot, DbStakeAddress, DbTxnIndex},
18    },
19    settings::cassandra_db,
20};
21
22/// Insert TXI Query and Parameters
23#[derive(SerializeRow)]
24pub(crate) struct StakeRegistrationInsertQuery {
25    /// Stake address (29 bytes).
26    stake_address: DbStakeAddress,
27    /// Slot Number the cert is in.
28    slot_no: DbSlot,
29    /// Transaction Index.
30    txn_index: DbTxnIndex,
31    /// Full Stake Public Key (32 byte Ed25519 Public key, not hashed).
32    stake_public_key: MaybeUnset<DbPublicKey>,
33    /// Is the stake address a script or not.
34    script: bool,
35    /// Is the Certificate Registered?
36    register: MaybeUnset<bool>,
37    /// Is the Certificate Deregistered?
38    deregister: MaybeUnset<bool>,
39    /// Pool Delegation Address
40    pool_delegation: MaybeUnset<Vec<u8>>,
41}
42
43impl Debug for StakeRegistrationInsertQuery {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
45        let stake_public_key = match self.stake_public_key {
46            MaybeUnset::Unset => "UNSET",
47            MaybeUnset::Set(ref v) => &hex::encode(v.as_ref()),
48        };
49        let register = match self.register {
50            MaybeUnset::Unset => "UNSET",
51            MaybeUnset::Set(v) => &format!("{v:?}"),
52        };
53        let deregister = match self.deregister {
54            MaybeUnset::Unset => "UNSET",
55            MaybeUnset::Set(v) => &format!("{v:?}"),
56        };
57        let pool_delegation = match self.pool_delegation {
58            MaybeUnset::Unset => "UNSET",
59            MaybeUnset::Set(ref v) => &hex::encode(v),
60        };
61
62        f.debug_struct("StakeRegistrationInsertQuery")
63            .field("stake_address", &format!("{}", self.stake_address))
64            .field("slot_no", &self.slot_no)
65            .field("txn_index", &self.txn_index)
66            .field("stake_public_key", &stake_public_key)
67            .field("script", &self.script)
68            .field("register", &register)
69            .field("deregister", &deregister)
70            .field("pool_delegation", &pool_delegation)
71            .finish()
72    }
73}
74
75/// TXI by Txn hash Index
76const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");
77
78impl StakeRegistrationInsertQuery {
79    /// Create a new Insert Query.
80    #[allow(clippy::too_many_arguments)]
81    pub fn new(
82        stake_address: StakeAddress, slot_no: Slot, txn_index: TxnIndex,
83        stake_public_key: Option<VerifyingKey>, script: bool, register: bool, deregister: bool,
84        pool_delegation: Option<Vec<u8>>,
85    ) -> Self {
86        let stake_public_key =
87            stake_public_key.map_or(MaybeUnset::Unset, |a| MaybeUnset::Set(a.into()));
88        StakeRegistrationInsertQuery {
89            stake_address: stake_address.into(),
90            slot_no: slot_no.into(),
91            txn_index: txn_index.into(),
92            stake_public_key,
93            script,
94            register: if register {
95                MaybeUnset::Set(true)
96            } else {
97                MaybeUnset::Unset
98            },
99            deregister: if deregister {
100                MaybeUnset::Set(true)
101            } else {
102                MaybeUnset::Unset
103            },
104            pool_delegation: if let Some(pool_delegation) = pool_delegation {
105                MaybeUnset::Set(pool_delegation)
106            } else {
107                MaybeUnset::Unset
108            },
109        }
110    }
111
112    /// Prepare Batch of Insert TXI Index Data Queries
113    pub(crate) async fn prepare_batch(
114        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
115    ) -> anyhow::Result<SizedBatch> {
116        PreparedQueries::prepare_batch(
117            session.clone(),
118            INSERT_STAKE_REGISTRATION_QUERY,
119            cfg,
120            scylla::statement::Consistency::Any,
121            true,
122            false,
123        )
124        .await
125        .inspect_err(
126            |error| error!(error=%error,"Failed to prepare Insert Stake Registration Query."),
127        )
128        .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_STAKE_REGISTRATION_QUERY}"))
129    }
130}
131
132/// Insert Cert Queries
133pub(crate) struct CertInsertQuery {
134    /// Stake Registration Data captured during indexing.
135    stake_reg_data: Vec<StakeRegistrationInsertQuery>,
136}
137
138impl CertInsertQuery {
139    /// Create new data set for Cert Insert Query Batch.
140    pub(crate) fn new() -> Self {
141        CertInsertQuery {
142            stake_reg_data: Vec::new(),
143        }
144    }
145
146    /// Prepare Batch of Insert TXI Index Data Queries
147    pub(crate) async fn prepare_batch(
148        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
149    ) -> anyhow::Result<SizedBatch> {
150        // Note: for now we have one query, but there are many certs, and later we may have more
151        // to add here.
152        StakeRegistrationInsertQuery::prepare_batch(session, cfg).await
153    }
154
155    /// Get the stake address for a hash, return an empty address if one can not be found.
156    #[allow(clippy::too_many_arguments)]
157    fn stake_address(
158        &mut self, cred: &alonzo::StakeCredential, slot_no: Slot, txn: TxnIndex, register: bool,
159        deregister: bool, delegation: Option<Vec<u8>>, block: &MultiEraBlock,
160    ) {
161        let (stake_address, pubkey, script) = match cred {
162            conway::StakeCredential::AddrKeyhash(cred) => {
163                let stake_address = StakeAddress::new(block.network(), false, *cred);
164                let addr = block.witness_for_tx(&VKeyHash::from(*cred), txn);
165                // Note: it is totally possible for the Registration Certificate to not be
166                // witnessed.
167                (stake_address, addr, false)
168            },
169            conway::StakeCredential::Scripthash(h) => {
170                (StakeAddress::new(block.network(), true, *h), None, true)
171            },
172        };
173
174        if pubkey.is_none() && !script && deregister {
175            error!("Stake Deregistration Certificate {stake_address} is NOT Witnessed.");
176        }
177
178        if pubkey.is_none() && !script && delegation.is_some() {
179            error!("Stake Delegation Certificate {stake_address} is NOT Witnessed.");
180        }
181
182        // This may not be witnessed, its normal but disappointing.
183        self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
184            stake_address,
185            slot_no,
186            txn,
187            pubkey,
188            script,
189            register,
190            deregister,
191            delegation,
192        ));
193    }
194
195    /// Index an Alonzo Era certificate into the database.
196    fn index_alonzo_cert(
197        &mut self, cert: &alonzo::Certificate, slot: Slot, index: TxnIndex, block: &MultiEraBlock,
198    ) {
199        #[allow(clippy::match_same_arms)]
200        match cert {
201            alonzo::Certificate::StakeRegistration(cred) => {
202                // This may not be witnessed, its normal but disappointing.
203                self.stake_address(cred, slot, index, true, false, None, block);
204            },
205            alonzo::Certificate::StakeDeregistration(cred) => {
206                self.stake_address(cred, slot, index, false, true, None, block);
207            },
208            alonzo::Certificate::StakeDelegation(cred, pool) => {
209                self.stake_address(cred, slot, index, false, false, Some(pool.to_vec()), block);
210            },
211            alonzo::Certificate::PoolRegistration { .. } => {},
212            alonzo::Certificate::PoolRetirement(..) => {},
213            alonzo::Certificate::GenesisKeyDelegation(..) => {},
214            alonzo::Certificate::MoveInstantaneousRewardsCert(_) => {},
215        }
216    }
217
218    /// Index a certificate from a conway transaction.
219    fn index_conway_cert(
220        &mut self, cert: &conway::Certificate, slot_no: Slot, txn: TxnIndex, block: &MultiEraBlock,
221    ) {
222        #[allow(clippy::match_same_arms)]
223        match cert {
224            conway::Certificate::StakeRegistration(cred) => {
225                // This may not be witnessed, its normal but disappointing.
226                self.stake_address(cred, slot_no, txn, true, false, None, block);
227            },
228            conway::Certificate::StakeDeregistration(cred) => {
229                self.stake_address(cred, slot_no, txn, false, true, None, block);
230            },
231            conway::Certificate::StakeDelegation(cred, pool) => {
232                self.stake_address(cred, slot_no, txn, false, false, Some(pool.to_vec()), block);
233            },
234            conway::Certificate::PoolRegistration { .. } => {},
235            conway::Certificate::PoolRetirement(..) => {},
236            conway::Certificate::Reg(..) => {},
237            conway::Certificate::UnReg(..) => {},
238            conway::Certificate::VoteDeleg(..) => {},
239            conway::Certificate::StakeVoteDeleg(..) => {},
240            conway::Certificate::StakeRegDeleg(..) => {},
241            conway::Certificate::VoteRegDeleg(..) => {},
242            conway::Certificate::StakeVoteRegDeleg(..) => {},
243            conway::Certificate::AuthCommitteeHot(..) => {},
244            conway::Certificate::ResignCommitteeCold(..) => {},
245            conway::Certificate::RegDRepCert(..) => {},
246            conway::Certificate::UnRegDRepCert(..) => {},
247            conway::Certificate::UpdateDRepCert(..) => {},
248        }
249    }
250
251    /// Index the certificates in a transaction.
252    pub(crate) fn index(
253        &mut self, txs: &pallas::ledger::traverse::MultiEraTx<'_>, slot: Slot, index: TxnIndex,
254        block: &MultiEraBlock,
255    ) {
256        #[allow(clippy::match_same_arms)]
257        txs.certs().iter().for_each(|cert| {
258            match cert {
259                pallas::ledger::traverse::MultiEraCert::NotApplicable => {},
260                pallas::ledger::traverse::MultiEraCert::AlonzoCompatible(cert) => {
261                    self.index_alonzo_cert(cert, slot, index, block);
262                },
263                pallas::ledger::traverse::MultiEraCert::Conway(cert) => {
264                    self.index_conway_cert(cert, slot, index, block);
265                },
266                _ => {},
267            }
268        });
269    }
270
271    /// Execute the Certificate Indexing Queries.
272    ///
273    /// Consumes the `self` and returns a vector of futures.
274    pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
275        let mut query_handles: FallibleQueryTasks = Vec::new();
276
277        let inner_session = session.clone();
278
279        query_handles.push(tokio::spawn(async move {
280            inner_session
281                .execute_batch(
282                    PreparedQuery::StakeRegistrationInsertQuery,
283                    self.stake_reg_data,
284                )
285                .await
286        }));
287
288        query_handles
289    }
290}