cat_gateway/db/index/block/txo/
mod.rs

1//! Insert TXO Indexed Data Queries.
2//!
3//! Note, there are multiple ways TXO Data is indexed and they all happen in here.
4
5pub(crate) mod insert_txo;
6pub(crate) mod insert_txo_asset;
7pub(crate) mod insert_unstaked_txo;
8pub(crate) mod insert_unstaked_txo_asset;
9
10use std::sync::Arc;
11
12use cardano_blockchain_types::{
13    Network, Slot, StakeAddress, TransactionId, TxnIndex, TxnOutputOffset,
14};
15use scylla::Session;
16use tracing::{error, warn};
17
18use crate::{
19    db::index::{
20        queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
21        session::CassandraSession,
22    },
23    settings::cassandra_db,
24};
25
26/// Insert TXO Query and Parameters
27///
28/// There are multiple possible parameters to a query, which are represented separately.
29pub(crate) struct TxoInsertQuery {
30    /// Staked TXO Data Parameters
31    staked_txo: Vec<insert_txo::Params>,
32    /// Unstaked TXO Data Parameters
33    unstaked_txo: Vec<insert_unstaked_txo::Params>,
34    /// Staked TXO Asset Data Parameters
35    staked_txo_asset: Vec<insert_txo_asset::Params>,
36    /// Unstaked TXO Asset Data Parameters
37    unstaked_txo_asset: Vec<insert_unstaked_txo_asset::Params>,
38}
39
40impl TxoInsertQuery {
41    /// Create a new Insert TXO Query Batch
42    pub(crate) fn new() -> Self {
43        TxoInsertQuery {
44            staked_txo: Vec::new(),
45            unstaked_txo: Vec::new(),
46            staked_txo_asset: Vec::new(),
47            unstaked_txo_asset: Vec::new(),
48        }
49    }
50
51    /// Prepare Batch of Insert TXI Index Data Queries
52    pub(crate) async fn prepare_batch(
53        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
54    ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
55        let txo_staked_insert_batch = insert_txo::Params::prepare_batch(session, cfg).await;
56        let txo_unstaked_insert_batch =
57            insert_unstaked_txo::Params::prepare_batch(session, cfg).await;
58        let txo_staked_asset_insert_batch =
59            insert_txo_asset::Params::prepare_batch(session, cfg).await;
60        let txo_unstaked_asset_insert_batch =
61            insert_unstaked_txo_asset::Params::prepare_batch(session, cfg).await;
62
63        Ok((
64            txo_staked_insert_batch?,
65            txo_unstaked_insert_batch?,
66            txo_staked_asset_insert_batch?,
67            txo_unstaked_asset_insert_batch?,
68        ))
69    }
70
71    /// Extracts a stake address from a TXO if possible.
72    /// Returns None if it is not possible.
73    /// If we want to index, but can not determine a stake key hash, then return a Vec
74    /// with a single 0 byte.    This is because the index DB needs data in the
75    /// primary key, so we use a single byte of 0 to indicate    that there is no
76    /// stake address, and still have a primary key on the table. Otherwise return the
77    /// header and the stake key hash as a vec of 29 bytes.
78    fn extract_stake_address(
79        network: Network, txo: &pallas::ledger::traverse::MultiEraOutput<'_>, slot_no: Slot,
80        txn_id: &str,
81    ) -> Option<(Option<StakeAddress>, String)> {
82        let stake_address = match txo.address() {
83            Ok(address) => {
84                match address {
85                    // Byron addresses do not have stake addresses and are not supported.
86                    pallas::ledger::addresses::Address::Byron(_) => {
87                        return None;
88                    },
89                    pallas::ledger::addresses::Address::Shelley(address) => {
90                        let address_string = match address.to_bech32() {
91                            Ok(address) => address,
92                            Err(error) => {
93                                // Shouldn't happen, but if it does error and don't index.
94                                error!(error=%error, slot=?slot_no, txn=txn_id,"Error converting to bech32: skipping.");
95                                return None;
96                            },
97                        };
98
99                        let address = match address.delegation() {
100                            pallas::ledger::addresses::ShelleyDelegationPart::Script(hash) => {
101                                Some(StakeAddress::new(network, true, *hash))
102                            },
103                            pallas::ledger::addresses::ShelleyDelegationPart::Key(hash) => {
104                                Some(StakeAddress::new(network, false, *hash))
105                            },
106                            pallas::ledger::addresses::ShelleyDelegationPart::Pointer(_pointer) => {
107                                // These are not supported from Conway, so we don't support them
108                                // either.
109                                None
110                            },
111                            pallas::ledger::addresses::ShelleyDelegationPart::Null => None,
112                        };
113                        (address, address_string)
114                    },
115                    pallas::ledger::addresses::Address::Stake(_) => {
116                        // This should NOT appear in a TXO, so report if it does. But don't index it
117                        // as a stake address.
118                        warn!(
119                            slot = ?slot_no,
120                            txn = txn_id,
121                            "Unexpected Stake address found in TXO. Refusing to index."
122                        );
123                        return None;
124                    },
125                }
126            },
127            Err(error) => {
128                // This should not ever happen.
129                error!(error=%error, slot = ?slot_no, txn = txn_id, "Failed to get Address from TXO. Skipping TXO.");
130                return None;
131            },
132        };
133
134        Some(stake_address)
135    }
136
137    /// Index the transaction Inputs.
138    pub(crate) fn index(
139        &mut self, network: Network, txn: &pallas::ledger::traverse::MultiEraTx<'_>, slot_no: Slot,
140        txn_hash: TransactionId, index: TxnIndex,
141    ) {
142        let txn_id = txn_hash.to_string();
143
144        // Accumulate all the data we want to insert from this transaction here.
145        for (txo_index, txo) in txn.outputs().iter().enumerate() {
146            // This will only return None if the TXO is not to be indexed (Byron Addresses)
147            let Some((stake_address, address)) =
148                Self::extract_stake_address(network, txo, slot_no, &txn_id)
149            else {
150                continue;
151            };
152
153            let txo_index = TxnOutputOffset::from(txo_index);
154            if let Some(stake_address) = stake_address.clone() {
155                let params = insert_txo::Params::new(
156                    stake_address,
157                    slot_no,
158                    index,
159                    txo_index,
160                    &address,
161                    txo.lovelace_amount(),
162                    txn_hash,
163                );
164                self.staked_txo.push(params);
165            } else {
166                let params = insert_unstaked_txo::Params::new(
167                    txn_hash,
168                    txo_index,
169                    slot_no,
170                    index,
171                    &address,
172                    txo.lovelace_amount(),
173                );
174                self.unstaked_txo.push(params);
175            }
176
177            for asset in txo.non_ada_assets() {
178                let policy_id = asset.policy().to_vec();
179                for policy_asset in asset.assets() {
180                    if policy_asset.is_output() {
181                        let asset_name = policy_asset.name();
182                        let value = policy_asset.any_coin();
183
184                        if let Some(stake_address) = stake_address.clone() {
185                            let params = insert_txo_asset::Params::new(
186                                stake_address,
187                                slot_no,
188                                index,
189                                txo_index,
190                                &policy_id,
191                                asset_name,
192                                value,
193                            );
194                            self.staked_txo_asset.push(params);
195                        } else {
196                            let params = insert_unstaked_txo_asset::Params::new(
197                                txn_hash, txo_index, &policy_id, asset_name, slot_no, index, value,
198                            );
199                            self.unstaked_txo_asset.push(params);
200                        }
201                    } else {
202                        error!("Minting MultiAsset in TXO.");
203                    }
204                }
205            }
206        }
207    }
208
209    /// Index the transaction Inputs.
210    ///
211    /// Consumes `self` and returns a vector of futures.
212    pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
213        let mut query_handles: FallibleQueryTasks = Vec::new();
214
215        if !self.staked_txo.is_empty() {
216            let inner_session = session.clone();
217            query_handles.push(tokio::spawn(async move {
218                inner_session
219                    .execute_batch(PreparedQuery::TxoAdaInsertQuery, self.staked_txo)
220                    .await
221            }));
222        }
223
224        if !self.unstaked_txo.is_empty() {
225            let inner_session = session.clone();
226            query_handles.push(tokio::spawn(async move {
227                inner_session
228                    .execute_batch(PreparedQuery::UnstakedTxoAdaInsertQuery, self.unstaked_txo)
229                    .await
230            }));
231        }
232
233        if !self.staked_txo_asset.is_empty() {
234            let inner_session = session.clone();
235            query_handles.push(tokio::spawn(async move {
236                inner_session
237                    .execute_batch(PreparedQuery::TxoAssetInsertQuery, self.staked_txo_asset)
238                    .await
239            }));
240        }
241        if !self.unstaked_txo_asset.is_empty() {
242            let inner_session = session.clone();
243            query_handles.push(tokio::spawn(async move {
244                inner_session
245                    .execute_batch(
246                        PreparedQuery::UnstakedTxoAssetInsertQuery,
247                        self.unstaked_txo_asset,
248                    )
249                    .await
250            }));
251        }
252
253        query_handles
254    }
255}