cat_gateway/db/index/block/
roll_forward.rs

1//! Immutable Roll Forward logic.
2
3use std::{collections::HashSet, sync::Arc};
4
5use cardano_blockchain_types::{Slot, TransactionId};
6use futures::StreamExt;
7
8use crate::{
9    db::index::{block::CassandraSession, queries::purge},
10    settings::Settings,
11};
12
13/// Purge Data from Live Index
14pub(crate) async fn purge_live_index(purge_slot: Slot) -> anyhow::Result<()> {
15    let persistent = false; // get volatile session
16    let Some(session) = CassandraSession::get(persistent) else {
17        anyhow::bail!("Failed to acquire db session");
18    };
19
20    // Purge data up to this slot
21    // Slots arithmetic has saturating semantic, so this is ok.
22    #[allow(clippy::arithmetic_side_effects)]
23    let purge_to_slot = purge_slot - Settings::purge_slot_buffer();
24
25    let txn_hashes = purge_txi_by_hash(&session, purge_to_slot).await?;
26    purge_catalyst_id_for_stake_address(&session, purge_to_slot).await?;
27    purge_catalyst_id_for_txn_id(&session, &txn_hashes).await?;
28    purge_cip36_registration(&session, purge_to_slot).await?;
29    purge_cip36_registration_for_vote_key(&session, purge_to_slot).await?;
30    purge_cip36_registration_invalid(&session, purge_to_slot).await?;
31    purge_rbac509_registration(&session, purge_to_slot).await?;
32    purge_invalid_rbac509_registration(&session, purge_to_slot).await?;
33    purge_stake_registration(&session, purge_to_slot).await?;
34    purge_txo_ada(&session, purge_to_slot).await?;
35    purge_txo_assets(&session, purge_to_slot).await?;
36    purge_unstaked_txo_ada(&session, purge_to_slot).await?;
37    purge_unstaked_txo_assets(&session, purge_to_slot).await?;
38
39    Ok(())
40}
41
42/// Purges the data from `catalyst_id_for_stake_addr`.
43async fn purge_catalyst_id_for_stake_address(
44    session: &Arc<CassandraSession>, purge_to_slot: Slot,
45) -> anyhow::Result<()> {
46    use purge::catalyst_id_for_stake_address::{DeleteQuery, Params, PrimaryKeyQuery};
47
48    // Get all keys
49    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
50    // Filter
51    let mut delete_params: Vec<Params> = Vec::new();
52    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
53        if primary_key.1 <= purge_to_slot.into() {
54            delete_params.push(primary_key.into());
55        }
56    }
57    // Delete filtered keys
58    DeleteQuery::execute(session, delete_params).await?;
59    Ok(())
60}
61
62/// Purges the data from `catalyst_id_for_txn_id`.
63async fn purge_catalyst_id_for_txn_id(
64    session: &Arc<CassandraSession>, txn_hashes: &HashSet<TransactionId>,
65) -> anyhow::Result<()> {
66    use purge::catalyst_id_for_txn_id::{DeleteQuery, Params, PrimaryKeyQuery};
67
68    // Get all keys
69    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
70    // Filter
71    let mut delete_params: Vec<Params> = Vec::new();
72    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
73        let params: Params = primary_key.into();
74        if txn_hashes.contains(&params.txn_id.into()) {
75            delete_params.push(params);
76        }
77    }
78    // Delete filtered keys
79    DeleteQuery::execute(session, delete_params).await?;
80    Ok(())
81}
82
83/// Purge data from `cip36_registration`.
84async fn purge_cip36_registration(
85    session: &Arc<CassandraSession>, purge_to_slot: Slot,
86) -> anyhow::Result<()> {
87    use purge::cip36_registration::{DeleteQuery, Params, PrimaryKeyQuery};
88
89    // Get all keys
90    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
91    // Filter
92    let mut delete_params: Vec<Params> = Vec::new();
93    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
94        let params: Params = primary_key.into();
95        if params.slot_no <= purge_to_slot.into() {
96            delete_params.push(params);
97        }
98    }
99    // Delete filtered keys
100    DeleteQuery::execute(session, delete_params).await?;
101    Ok(())
102}
103
104/// Purge data from `cip36_registration_for_vote_key`.
105async fn purge_cip36_registration_for_vote_key(
106    session: &Arc<CassandraSession>, purge_to_slot: Slot,
107) -> anyhow::Result<()> {
108    use purge::cip36_registration_for_vote_key::{DeleteQuery, Params, PrimaryKeyQuery};
109
110    // Get all keys
111    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
112    // Filter
113    let mut delete_params: Vec<Params> = Vec::new();
114    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
115        let params: Params = primary_key.into();
116        if params.slot_no <= purge_to_slot.into() {
117            delete_params.push(params);
118        }
119    }
120    // Delete filtered keys
121    DeleteQuery::execute(session, delete_params).await?;
122    Ok(())
123}
124
125/// Purge data from `cip36_registration_invalid`.
126async fn purge_cip36_registration_invalid(
127    session: &Arc<CassandraSession>, purge_to_slot: Slot,
128) -> anyhow::Result<()> {
129    use purge::cip36_registration_invalid::{DeleteQuery, Params, PrimaryKeyQuery};
130
131    // Get all keys
132    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
133    // Filter
134    let mut delete_params: Vec<Params> = Vec::new();
135    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
136        let params: Params = primary_key.into();
137        if params.slot_no <= purge_to_slot.into() {
138            delete_params.push(params);
139        }
140    }
141    // Delete filtered keys
142    DeleteQuery::execute(session, delete_params).await?;
143    Ok(())
144}
145
146/// Purge data from `rbac509_registration`.
147async fn purge_rbac509_registration(
148    session: &Arc<CassandraSession>, purge_to_slot: Slot,
149) -> anyhow::Result<()> {
150    use purge::rbac509_registration::{DeleteQuery, Params, PrimaryKeyQuery};
151
152    // Get all keys
153    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
154    // Filter
155    let mut delete_params: Vec<Params> = Vec::new();
156    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
157        if primary_key.2 <= purge_to_slot.into() {
158            delete_params.push(primary_key.into());
159        }
160    }
161    // Delete filtered keys
162    DeleteQuery::execute(session, delete_params).await?;
163    Ok(())
164}
165
166/// Purges the data from `rbac509_invalid_registration`.
167async fn purge_invalid_rbac509_registration(
168    session: &Arc<CassandraSession>, purge_to_slot: Slot,
169) -> anyhow::Result<()> {
170    use purge::rbac509_invalid_registration::{DeleteQuery, Params, PrimaryKeyQuery};
171
172    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
173    let mut delete_params: Vec<Params> = Vec::new();
174    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
175        if primary_key.2 <= purge_to_slot.into() {
176            delete_params.push(primary_key.into());
177        }
178    }
179
180    DeleteQuery::execute(session, delete_params).await?;
181    Ok(())
182}
183
184/// Purge data from `stake_registration`.
185async fn purge_stake_registration(
186    session: &Arc<CassandraSession>, purge_to_slot: Slot,
187) -> anyhow::Result<()> {
188    use purge::stake_registration::{DeleteQuery, Params, PrimaryKeyQuery};
189
190    // Get all keys
191    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
192    // Filter
193    let mut delete_params: Vec<Params> = Vec::new();
194    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
195        let params: Params = primary_key.into();
196        if params.slot_no <= purge_to_slot.into() {
197            delete_params.push(params);
198        }
199    }
200    // Delete filtered keys
201    DeleteQuery::execute(session, delete_params).await?;
202    Ok(())
203}
204
205/// Purge data from `txi_by_hash`.
206async fn purge_txi_by_hash(
207    session: &Arc<CassandraSession>, purge_to_slot: Slot,
208) -> anyhow::Result<HashSet<TransactionId>> {
209    use purge::txi_by_hash::{DeleteQuery, Params, PrimaryKeyQuery};
210
211    // Get all keys
212    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
213    // Filter
214    let mut delete_params: Vec<Params> = Vec::new();
215    let mut txn_hashes: HashSet<TransactionId> = HashSet::new();
216    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
217        if primary_key.2 <= purge_to_slot.into() {
218            let params: Params = primary_key.into();
219            txn_hashes.insert(params.txn_id.into());
220            delete_params.push(params);
221        }
222    }
223    // Delete filtered keys
224    DeleteQuery::execute(session, delete_params).await?;
225    Ok(txn_hashes)
226}
227
228/// Purge data from `txo_ada`.
229async fn purge_txo_ada(session: &Arc<CassandraSession>, purge_to_slot: Slot) -> anyhow::Result<()> {
230    use purge::txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
231
232    // Get all keys
233    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
234    // Filter
235    let mut delete_params: Vec<Params> = Vec::new();
236    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
237        let params: Params = primary_key.into();
238        if params.slot_no <= purge_to_slot.into() {
239            delete_params.push(params);
240        }
241    }
242    // Delete filtered keys
243    DeleteQuery::execute(session, delete_params).await?;
244    Ok(())
245}
246
247/// Purge data from `txo_assets`.
248async fn purge_txo_assets(
249    session: &Arc<CassandraSession>, purge_to_slot: Slot,
250) -> anyhow::Result<()> {
251    use purge::txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
252
253    // Get all keys
254    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
255    // Filter
256    let mut delete_params: Vec<Params> = Vec::new();
257    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
258        let params: Params = primary_key.into();
259        if params.slot_no <= purge_to_slot.into() {
260            delete_params.push(params);
261        }
262    }
263    // Delete filtered keys
264    DeleteQuery::execute(session, delete_params).await?;
265    Ok(())
266}
267
268/// Purge data from `unstaked_txo_ada`.
269async fn purge_unstaked_txo_ada(
270    session: &Arc<CassandraSession>, purge_to_slot: Slot,
271) -> anyhow::Result<()> {
272    use purge::unstaked_txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
273
274    // Get all keys
275    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
276    // Filter
277    let mut delete_params: Vec<Params> = Vec::new();
278    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
279        if primary_key.2 <= purge_to_slot.into() {
280            let params: Params = primary_key.into();
281            delete_params.push(params);
282        }
283    }
284    // Delete filtered keys
285    DeleteQuery::execute(session, delete_params).await?;
286    Ok(())
287}
288
289/// Purge data from `unstaked_txo_assets`.
290async fn purge_unstaked_txo_assets(
291    session: &Arc<CassandraSession>, purge_to_slot: Slot,
292) -> anyhow::Result<()> {
293    use purge::unstaked_txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
294
295    // Get all keys
296    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
297    // Filter
298    let mut delete_params: Vec<Params> = Vec::new();
299    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
300        if primary_key.4 <= purge_to_slot.into() {
301            let params: Params = primary_key.into();
302            delete_params.push(params);
303        }
304    }
305    // Delete filtered keys
306    DeleteQuery::execute(session, delete_params).await?;
307    Ok(())
308}