cat_gateway/db/index/block/txo/
mod.rs1pub(crate) mod insert_txo;
6pub(crate) mod insert_txo_asset;
7pub(crate) mod insert_unstaked_txo;
8pub(crate) mod insert_unstaked_txo_asset;
9
10use std::sync::Arc;
11
12use cardano_blockchain_types::{
13 Network, Slot, StakeAddress, TransactionId, TxnIndex, TxnOutputOffset,
14};
15use scylla::Session;
16use tracing::{error, warn};
17
18use crate::{
19 db::index::{
20 queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
21 session::CassandraSession,
22 },
23 settings::cassandra_db,
24};
25
26pub(crate) struct TxoInsertQuery {
30 staked_txo: Vec<insert_txo::Params>,
32 unstaked_txo: Vec<insert_unstaked_txo::Params>,
34 staked_txo_asset: Vec<insert_txo_asset::Params>,
36 unstaked_txo_asset: Vec<insert_unstaked_txo_asset::Params>,
38}
39
40impl TxoInsertQuery {
41 pub(crate) fn new() -> Self {
43 TxoInsertQuery {
44 staked_txo: Vec::new(),
45 unstaked_txo: Vec::new(),
46 staked_txo_asset: Vec::new(),
47 unstaked_txo_asset: Vec::new(),
48 }
49 }
50
51 pub(crate) async fn prepare_batch(
53 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
54 ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
55 let txo_staked_insert_batch = insert_txo::Params::prepare_batch(session, cfg).await;
56 let txo_unstaked_insert_batch =
57 insert_unstaked_txo::Params::prepare_batch(session, cfg).await;
58 let txo_staked_asset_insert_batch =
59 insert_txo_asset::Params::prepare_batch(session, cfg).await;
60 let txo_unstaked_asset_insert_batch =
61 insert_unstaked_txo_asset::Params::prepare_batch(session, cfg).await;
62
63 Ok((
64 txo_staked_insert_batch?,
65 txo_unstaked_insert_batch?,
66 txo_staked_asset_insert_batch?,
67 txo_unstaked_asset_insert_batch?,
68 ))
69 }
70
71 fn extract_stake_address(
79 network: Network, txo: &pallas::ledger::traverse::MultiEraOutput<'_>, slot_no: Slot,
80 txn_id: &str,
81 ) -> Option<(Option<StakeAddress>, String)> {
82 let stake_address = match txo.address() {
83 Ok(address) => {
84 match address {
85 pallas::ledger::addresses::Address::Byron(_) => {
87 return None;
88 },
89 pallas::ledger::addresses::Address::Shelley(address) => {
90 let address_string = match address.to_bech32() {
91 Ok(address) => address,
92 Err(error) => {
93 error!(error=%error, slot=?slot_no, txn=txn_id,"Error converting to bech32: skipping.");
95 return None;
96 },
97 };
98
99 let address = match address.delegation() {
100 pallas::ledger::addresses::ShelleyDelegationPart::Script(hash) => {
101 Some(StakeAddress::new(network, true, *hash))
102 },
103 pallas::ledger::addresses::ShelleyDelegationPart::Key(hash) => {
104 Some(StakeAddress::new(network, false, *hash))
105 },
106 pallas::ledger::addresses::ShelleyDelegationPart::Pointer(_pointer) => {
107 None
110 },
111 pallas::ledger::addresses::ShelleyDelegationPart::Null => None,
112 };
113 (address, address_string)
114 },
115 pallas::ledger::addresses::Address::Stake(_) => {
116 warn!(
119 slot = ?slot_no,
120 txn = txn_id,
121 "Unexpected Stake address found in TXO. Refusing to index."
122 );
123 return None;
124 },
125 }
126 },
127 Err(error) => {
128 error!(error=%error, slot = ?slot_no, txn = txn_id, "Failed to get Address from TXO. Skipping TXO.");
130 return None;
131 },
132 };
133
134 Some(stake_address)
135 }
136
137 pub(crate) fn index(
139 &mut self, network: Network, txn: &pallas::ledger::traverse::MultiEraTx<'_>, slot_no: Slot,
140 txn_hash: TransactionId, index: TxnIndex,
141 ) {
142 let txn_id = txn_hash.to_string();
143
144 for (txo_index, txo) in txn.outputs().iter().enumerate() {
146 let Some((stake_address, address)) =
148 Self::extract_stake_address(network, txo, slot_no, &txn_id)
149 else {
150 continue;
151 };
152
153 let txo_index = TxnOutputOffset::from(txo_index);
154 if let Some(stake_address) = stake_address.clone() {
155 let params = insert_txo::Params::new(
156 stake_address,
157 slot_no,
158 index,
159 txo_index,
160 &address,
161 txo.lovelace_amount(),
162 txn_hash,
163 );
164 self.staked_txo.push(params);
165 } else {
166 let params = insert_unstaked_txo::Params::new(
167 txn_hash,
168 txo_index,
169 slot_no,
170 index,
171 &address,
172 txo.lovelace_amount(),
173 );
174 self.unstaked_txo.push(params);
175 }
176
177 for asset in txo.non_ada_assets() {
178 let policy_id = asset.policy().to_vec();
179 for policy_asset in asset.assets() {
180 if policy_asset.is_output() {
181 let asset_name = policy_asset.name();
182 let value = policy_asset.any_coin();
183
184 if let Some(stake_address) = stake_address.clone() {
185 let params = insert_txo_asset::Params::new(
186 stake_address,
187 slot_no,
188 index,
189 txo_index,
190 &policy_id,
191 asset_name,
192 value,
193 );
194 self.staked_txo_asset.push(params);
195 } else {
196 let params = insert_unstaked_txo_asset::Params::new(
197 txn_hash, txo_index, &policy_id, asset_name, slot_no, index, value,
198 );
199 self.unstaked_txo_asset.push(params);
200 }
201 } else {
202 error!("Minting MultiAsset in TXO.");
203 }
204 }
205 }
206 }
207 }
208
209 pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
213 let mut query_handles: FallibleQueryTasks = Vec::new();
214
215 if !self.staked_txo.is_empty() {
216 let inner_session = session.clone();
217 query_handles.push(tokio::spawn(async move {
218 inner_session
219 .execute_batch(PreparedQuery::TxoAdaInsertQuery, self.staked_txo)
220 .await
221 }));
222 }
223
224 if !self.unstaked_txo.is_empty() {
225 let inner_session = session.clone();
226 query_handles.push(tokio::spawn(async move {
227 inner_session
228 .execute_batch(PreparedQuery::UnstakedTxoAdaInsertQuery, self.unstaked_txo)
229 .await
230 }));
231 }
232
233 if !self.staked_txo_asset.is_empty() {
234 let inner_session = session.clone();
235 query_handles.push(tokio::spawn(async move {
236 inner_session
237 .execute_batch(PreparedQuery::TxoAssetInsertQuery, self.staked_txo_asset)
238 .await
239 }));
240 }
241 if !self.unstaked_txo_asset.is_empty() {
242 let inner_session = session.clone();
243 query_handles.push(tokio::spawn(async move {
244 inner_session
245 .execute_batch(
246 PreparedQuery::UnstakedTxoAssetInsertQuery,
247 self.unstaked_txo_asset,
248 )
249 .await
250 }));
251 }
252
253 query_handles
254 }
255}