cat_gateway/db/index/block/
roll_forward.rs1use std::{collections::HashSet, sync::Arc};
4
5use cardano_blockchain_types::{Slot, TransactionId};
6use futures::StreamExt;
7
8use crate::db::index::{block::CassandraSession, queries::purge};
9
10#[derive(Debug, Clone, Copy, PartialEq)]
12pub(crate) enum PurgeCondition {
13    PurgeBackwards(Slot),
15    PurgeForwards(Slot),
17}
18
19impl PurgeCondition {
20    fn filter(&self, slot: Slot) -> bool {
22        match self {
23            Self::PurgeBackwards(purge_to_slot) => &slot <= purge_to_slot,
24            Self::PurgeForwards(purge_to_slot) => &slot >= purge_to_slot,
25        }
26    }
27}
28
29pub(crate) async fn purge_live_index(purge_condition: PurgeCondition) -> anyhow::Result<()> {
31    let persistent = false; let Some(session) = CassandraSession::get(persistent) else {
33        anyhow::bail!("Failed to acquire db session");
34    };
35
36    let txn_hashes = purge_txi_by_hash(&session, purge_condition).await?;
37    purge_catalyst_id_for_stake_address(&session, purge_condition).await?;
38    purge_catalyst_id_for_txn_id(&session, &txn_hashes).await?;
39    purge_cip36_registration(&session, purge_condition).await?;
40    purge_cip36_registration_for_vote_key(&session, purge_condition).await?;
41    purge_cip36_registration_invalid(&session, purge_condition).await?;
42    purge_rbac509_registration(&session, purge_condition).await?;
43    purge_invalid_rbac509_registration(&session, purge_condition).await?;
44    purge_stake_registration(&session, purge_condition).await?;
45    purge_txo_ada(&session, purge_condition).await?;
46    purge_txo_assets(&session, purge_condition).await?;
47    purge_unstaked_txo_ada(&session, purge_condition).await?;
48    purge_unstaked_txo_assets(&session, purge_condition).await?;
49
50    Ok(())
51}
52
53async fn purge_catalyst_id_for_stake_address(
55    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
56) -> anyhow::Result<()> {
57    use purge::catalyst_id_for_stake_address::{DeleteQuery, Params, PrimaryKeyQuery};
58
59    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
61    let mut delete_params: Vec<Params> = Vec::new();
63    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
64        if purge_condition.filter(primary_key.1.into()) {
65            delete_params.push(primary_key.into());
66        }
67    }
68    DeleteQuery::execute(session, delete_params).await?;
70    Ok(())
71}
72
73async fn purge_catalyst_id_for_txn_id(
75    session: &Arc<CassandraSession>, txn_hashes: &HashSet<TransactionId>,
76) -> anyhow::Result<()> {
77    use purge::catalyst_id_for_txn_id::{DeleteQuery, Params, PrimaryKeyQuery};
78
79    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
81    let mut delete_params: Vec<Params> = Vec::new();
83    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
84        let params: Params = primary_key.into();
85        if txn_hashes.contains(¶ms.txn_id.into()) {
86            delete_params.push(params);
87        }
88    }
89    DeleteQuery::execute(session, delete_params).await?;
91    Ok(())
92}
93
94async fn purge_cip36_registration(
96    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
97) -> anyhow::Result<()> {
98    use purge::cip36_registration::{DeleteQuery, Params, PrimaryKeyQuery};
99
100    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
102    let mut delete_params: Vec<Params> = Vec::new();
104    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
105        let params: Params = primary_key.into();
106        if purge_condition.filter(params.slot_no.into()) {
107            delete_params.push(params);
108        }
109    }
110    DeleteQuery::execute(session, delete_params).await?;
112    Ok(())
113}
114
115async fn purge_cip36_registration_for_vote_key(
117    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
118) -> anyhow::Result<()> {
119    use purge::cip36_registration_for_vote_key::{DeleteQuery, Params, PrimaryKeyQuery};
120
121    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
123    let mut delete_params: Vec<Params> = Vec::new();
125    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
126        let params: Params = primary_key.into();
127        if purge_condition.filter(params.slot_no.into()) {
128            delete_params.push(params);
129        }
130    }
131    DeleteQuery::execute(session, delete_params).await?;
133    Ok(())
134}
135
136async fn purge_cip36_registration_invalid(
138    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
139) -> anyhow::Result<()> {
140    use purge::cip36_registration_invalid::{DeleteQuery, Params, PrimaryKeyQuery};
141
142    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
144    let mut delete_params: Vec<Params> = Vec::new();
146    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
147        let params: Params = primary_key.into();
148        if purge_condition.filter(params.slot_no.into()) {
149            delete_params.push(params);
150        }
151    }
152    DeleteQuery::execute(session, delete_params).await?;
154    Ok(())
155}
156
157async fn purge_rbac509_registration(
159    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
160) -> anyhow::Result<()> {
161    use purge::rbac509_registration::{DeleteQuery, Params, PrimaryKeyQuery};
162
163    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
165    let mut delete_params: Vec<Params> = Vec::new();
167    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
168        if purge_condition.filter(primary_key.2.into()) {
169            delete_params.push(primary_key.into());
170        }
171    }
172    DeleteQuery::execute(session, delete_params).await?;
174    Ok(())
175}
176
177async fn purge_invalid_rbac509_registration(
179    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
180) -> anyhow::Result<()> {
181    use purge::rbac509_invalid_registration::{DeleteQuery, Params, PrimaryKeyQuery};
182
183    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
184    let mut delete_params: Vec<Params> = Vec::new();
185    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
186        if purge_condition.filter(primary_key.2.into()) {
187            delete_params.push(primary_key.into());
188        }
189    }
190
191    DeleteQuery::execute(session, delete_params).await?;
192    Ok(())
193}
194
195async fn purge_stake_registration(
197    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
198) -> anyhow::Result<()> {
199    use purge::stake_registration::{DeleteQuery, Params, PrimaryKeyQuery};
200
201    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
203    let mut delete_params: Vec<Params> = Vec::new();
205    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
206        let params: Params = primary_key.into();
207        if purge_condition.filter(params.slot_no.into()) {
208            delete_params.push(params);
209        }
210    }
211    DeleteQuery::execute(session, delete_params).await?;
213    Ok(())
214}
215
216async fn purge_txi_by_hash(
218    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
219) -> anyhow::Result<HashSet<TransactionId>> {
220    use purge::txi_by_hash::{DeleteQuery, Params, PrimaryKeyQuery};
221
222    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
224    let mut delete_params: Vec<Params> = Vec::new();
226    let mut txn_hashes: HashSet<TransactionId> = HashSet::new();
227    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
228        if purge_condition.filter(primary_key.2.into()) {
229            let params: Params = primary_key.into();
230            txn_hashes.insert(params.txn_id.into());
231            delete_params.push(params);
232        }
233    }
234    DeleteQuery::execute(session, delete_params).await?;
236    Ok(txn_hashes)
237}
238
239async fn purge_txo_ada(
241    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
242) -> anyhow::Result<()> {
243    use purge::txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
244
245    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
247    let mut delete_params: Vec<Params> = Vec::new();
249    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
250        let params: Params = primary_key.into();
251        if purge_condition.filter(params.slot_no.into()) {
252            delete_params.push(params);
253        }
254    }
255    DeleteQuery::execute(session, delete_params).await?;
257    Ok(())
258}
259
260async fn purge_txo_assets(
262    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
263) -> anyhow::Result<()> {
264    use purge::txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
265
266    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
268    let mut delete_params: Vec<Params> = Vec::new();
270    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
271        let params: Params = primary_key.into();
272        if purge_condition.filter(params.slot_no.into()) {
273            delete_params.push(params);
274        }
275    }
276    DeleteQuery::execute(session, delete_params).await?;
278    Ok(())
279}
280
281async fn purge_unstaked_txo_ada(
283    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
284) -> anyhow::Result<()> {
285    use purge::unstaked_txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
286
287    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
289    let mut delete_params: Vec<Params> = Vec::new();
291    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
292        if purge_condition.filter(primary_key.2.clone().into()) {
293            let params: Params = primary_key.into();
294            delete_params.push(params);
295        }
296    }
297    DeleteQuery::execute(session, delete_params).await?;
299    Ok(())
300}
301
302async fn purge_unstaked_txo_assets(
304    session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
305) -> anyhow::Result<()> {
306    use purge::unstaked_txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
307
308    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
310    let mut delete_params: Vec<Params> = Vec::new();
312    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
313        if purge_condition.filter(primary_key.4.clone().into()) {
314            let params: Params = primary_key.into();
315            delete_params.push(params);
316        }
317    }
318    DeleteQuery::execute(session, delete_params).await?;
320    Ok(())
321}