cat_gateway/db/index/block/
roll_forward.rs

1//! Immutable Roll Forward logic.
2
3use std::{collections::HashSet, sync::Arc};
4
5use cardano_blockchain_types::{Slot, TransactionId};
6use futures::StreamExt;
7
8use crate::db::index::{block::CassandraSession, queries::purge};
9
10/// Purge condition option
11#[derive(Debug, Clone, Copy, PartialEq)]
12pub(crate) enum PurgeCondition {
13    /// Purge all data before the provided slot number (including)
14    PurgeBackwards(Slot),
15    /// Purge all data after the provided slot number (including)
16    PurgeForwards(Slot),
17}
18
19impl PurgeCondition {
20    /// A filtering condition of the `PurgeOption` and provided `slot` value
21    fn filter(&self, slot: Slot) -> bool {
22        match self {
23            Self::PurgeBackwards(purge_to_slot) => &slot <= purge_to_slot,
24            Self::PurgeForwards(purge_to_slot) => &slot >= purge_to_slot,
25        }
26    }
27}
28
29/// Purge cardano Live Index data from the volatile db session
30pub(crate) async fn purge_live_index(purge_condition: PurgeCondition) -> anyhow::Result<()> {
31    let persistent = false; // get volatile session
32    let Some(session) = CassandraSession::get(persistent) else {
33        anyhow::bail!("Failed to acquire db session");
34    };
35
36    let txn_hashes = purge_txi_by_hash(&session, purge_condition).await?;
37    purge_catalyst_id_for_stake_address(&session, purge_condition).await?;
38    purge_catalyst_id_for_txn_id(&session, &txn_hashes).await?;
39    purge_cip36_registration(&session, purge_condition).await?;
40    purge_cip36_registration_for_vote_key(&session, purge_condition).await?;
41    purge_cip36_registration_invalid(&session, purge_condition).await?;
42    purge_rbac509_registration(&session, purge_condition).await?;
43    purge_invalid_rbac509_registration(&session, purge_condition).await?;
44    purge_stake_registration(&session, purge_condition).await?;
45    purge_txo_ada(&session, purge_condition).await?;
46    purge_txo_assets(&session, purge_condition).await?;
47    purge_unstaked_txo_ada(&session, purge_condition).await?;
48    purge_unstaked_txo_assets(&session, purge_condition).await?;
49
50    Ok(())
51}
52
53/// Purges the data from `catalyst_id_for_stake_addr`.
54async fn purge_catalyst_id_for_stake_address(
55    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
56) -> anyhow::Result<()> {
57    use purge::catalyst_id_for_stake_address::{DeleteQuery, Params, PrimaryKeyQuery};
58
59    // Get all keys
60    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
61    // Filter
62    let mut delete_params: Vec<Params> = Vec::new();
63    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
64        if purge_condition.filter(primary_key.1.into()) {
65            delete_params.push(primary_key.into());
66        }
67    }
68    // Delete filtered keys
69    DeleteQuery::execute(session, delete_params).await?;
70    Ok(())
71}
72
73/// Purges the data from `catalyst_id_for_txn_id`.
74async fn purge_catalyst_id_for_txn_id(
75    session: &Arc<CassandraSession>, txn_hashes: &HashSet<TransactionId>,
76) -> anyhow::Result<()> {
77    use purge::catalyst_id_for_txn_id::{DeleteQuery, Params, PrimaryKeyQuery};
78
79    // Get all keys
80    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
81    // Filter
82    let mut delete_params: Vec<Params> = Vec::new();
83    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
84        let params: Params = primary_key.into();
85        if txn_hashes.contains(&params.txn_id.into()) {
86            delete_params.push(params);
87        }
88    }
89    // Delete filtered keys
90    DeleteQuery::execute(session, delete_params).await?;
91    Ok(())
92}
93
94/// Purge data from `cip36_registration`.
95async fn purge_cip36_registration(
96    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
97) -> anyhow::Result<()> {
98    use purge::cip36_registration::{DeleteQuery, Params, PrimaryKeyQuery};
99
100    // Get all keys
101    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
102    // Filter
103    let mut delete_params: Vec<Params> = Vec::new();
104    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
105        let params: Params = primary_key.into();
106        if purge_condition.filter(params.slot_no.into()) {
107            delete_params.push(params);
108        }
109    }
110    // Delete filtered keys
111    DeleteQuery::execute(session, delete_params).await?;
112    Ok(())
113}
114
115/// Purge data from `cip36_registration_for_vote_key`.
116async fn purge_cip36_registration_for_vote_key(
117    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
118) -> anyhow::Result<()> {
119    use purge::cip36_registration_for_vote_key::{DeleteQuery, Params, PrimaryKeyQuery};
120
121    // Get all keys
122    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
123    // Filter
124    let mut delete_params: Vec<Params> = Vec::new();
125    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
126        let params: Params = primary_key.into();
127        if purge_condition.filter(params.slot_no.into()) {
128            delete_params.push(params);
129        }
130    }
131    // Delete filtered keys
132    DeleteQuery::execute(session, delete_params).await?;
133    Ok(())
134}
135
136/// Purge data from `cip36_registration_invalid`.
137async fn purge_cip36_registration_invalid(
138    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
139) -> anyhow::Result<()> {
140    use purge::cip36_registration_invalid::{DeleteQuery, Params, PrimaryKeyQuery};
141
142    // Get all keys
143    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
144    // Filter
145    let mut delete_params: Vec<Params> = Vec::new();
146    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
147        let params: Params = primary_key.into();
148        if purge_condition.filter(params.slot_no.into()) {
149            delete_params.push(params);
150        }
151    }
152    // Delete filtered keys
153    DeleteQuery::execute(session, delete_params).await?;
154    Ok(())
155}
156
157/// Purge data from `rbac509_registration`.
158async fn purge_rbac509_registration(
159    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
160) -> anyhow::Result<()> {
161    use purge::rbac509_registration::{DeleteQuery, Params, PrimaryKeyQuery};
162
163    // Get all keys
164    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
165    // Filter
166    let mut delete_params: Vec<Params> = Vec::new();
167    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
168        if purge_condition.filter(primary_key.2.into()) {
169            delete_params.push(primary_key.into());
170        }
171    }
172    // Delete filtered keys
173    DeleteQuery::execute(session, delete_params).await?;
174    Ok(())
175}
176
177/// Purges the data from `rbac509_invalid_registration`.
178async fn purge_invalid_rbac509_registration(
179    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
180) -> anyhow::Result<()> {
181    use purge::rbac509_invalid_registration::{DeleteQuery, Params, PrimaryKeyQuery};
182
183    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
184    let mut delete_params: Vec<Params> = Vec::new();
185    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
186        if purge_condition.filter(primary_key.2.into()) {
187            delete_params.push(primary_key.into());
188        }
189    }
190
191    DeleteQuery::execute(session, delete_params).await?;
192    Ok(())
193}
194
195/// Purge data from `stake_registration`.
196async fn purge_stake_registration(
197    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
198) -> anyhow::Result<()> {
199    use purge::stake_registration::{DeleteQuery, Params, PrimaryKeyQuery};
200
201    // Get all keys
202    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
203    // Filter
204    let mut delete_params: Vec<Params> = Vec::new();
205    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
206        let params: Params = primary_key.into();
207        if purge_condition.filter(params.slot_no.into()) {
208            delete_params.push(params);
209        }
210    }
211    // Delete filtered keys
212    DeleteQuery::execute(session, delete_params).await?;
213    Ok(())
214}
215
216/// Purge data from `txi_by_hash`.
217async fn purge_txi_by_hash(
218    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
219) -> anyhow::Result<HashSet<TransactionId>> {
220    use purge::txi_by_hash::{DeleteQuery, Params, PrimaryKeyQuery};
221
222    // Get all keys
223    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
224    // Filter
225    let mut delete_params: Vec<Params> = Vec::new();
226    let mut txn_hashes: HashSet<TransactionId> = HashSet::new();
227    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
228        if purge_condition.filter(primary_key.2.into()) {
229            let params: Params = primary_key.into();
230            txn_hashes.insert(params.txn_id.into());
231            delete_params.push(params);
232        }
233    }
234    // Delete filtered keys
235    DeleteQuery::execute(session, delete_params).await?;
236    Ok(txn_hashes)
237}
238
239/// Purge data from `txo_ada`.
240async fn purge_txo_ada(
241    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
242) -> anyhow::Result<()> {
243    use purge::txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
244
245    // Get all keys
246    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
247    // Filter
248    let mut delete_params: Vec<Params> = Vec::new();
249    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
250        let params: Params = primary_key.into();
251        if purge_condition.filter(params.slot_no.into()) {
252            delete_params.push(params);
253        }
254    }
255    // Delete filtered keys
256    DeleteQuery::execute(session, delete_params).await?;
257    Ok(())
258}
259
260/// Purge data from `txo_assets`.
261async fn purge_txo_assets(
262    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
263) -> anyhow::Result<()> {
264    use purge::txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
265
266    // Get all keys
267    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
268    // Filter
269    let mut delete_params: Vec<Params> = Vec::new();
270    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
271        let params: Params = primary_key.into();
272        if purge_condition.filter(params.slot_no.into()) {
273            delete_params.push(params);
274        }
275    }
276    // Delete filtered keys
277    DeleteQuery::execute(session, delete_params).await?;
278    Ok(())
279}
280
281/// Purge data from `unstaked_txo_ada`.
282async fn purge_unstaked_txo_ada(
283    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
284) -> anyhow::Result<()> {
285    use purge::unstaked_txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
286
287    // Get all keys
288    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
289    // Filter
290    let mut delete_params: Vec<Params> = Vec::new();
291    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
292        if purge_condition.filter(primary_key.2.clone().into()) {
293            let params: Params = primary_key.into();
294            delete_params.push(params);
295        }
296    }
297    // Delete filtered keys
298    DeleteQuery::execute(session, delete_params).await?;
299    Ok(())
300}
301
302/// Purge data from `unstaked_txo_assets`.
303async fn purge_unstaked_txo_assets(
304    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
305) -> anyhow::Result<()> {
306    use purge::unstaked_txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
307
308    // Get all keys
309    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
310    // Filter
311    let mut delete_params: Vec<Params> = Vec::new();
312    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
313        if purge_condition.filter(primary_key.4.clone().into()) {
314            let params: Params = primary_key.into();
315            delete_params.push(params);
316        }
317    }
318    // Delete filtered keys
319    DeleteQuery::execute(session, delete_params).await?;
320    Ok(())
321}