cat_gateway/db/index/block/
roll_forward.rs1use std::{collections::HashSet, sync::Arc};
4
5use cardano_blockchain_types::{Slot, TransactionId};
6use futures::StreamExt;
7
8use crate::db::index::{block::CassandraSession, queries::purge};
9
10#[derive(Debug, Clone, Copy, PartialEq)]
12pub(crate) enum PurgeCondition {
13 PurgeBackwards(Slot),
15 PurgeForwards(Slot),
17}
18
19impl PurgeCondition {
20 fn filter(&self, slot: Slot) -> bool {
22 match self {
23 Self::PurgeBackwards(purge_to_slot) => &slot <= purge_to_slot,
24 Self::PurgeForwards(purge_to_slot) => &slot >= purge_to_slot,
25 }
26 }
27}
28
29pub(crate) async fn purge_live_index(purge_condition: PurgeCondition) -> anyhow::Result<()> {
31 let persistent = false; let Some(session) = CassandraSession::get(persistent) else {
33 anyhow::bail!("Failed to acquire db session");
34 };
35
36 let txn_hashes = purge_txi_by_hash(&session, purge_condition).await?;
37 purge_catalyst_id_for_stake_address(&session, purge_condition).await?;
38 purge_catalyst_id_for_txn_id(&session, &txn_hashes).await?;
39 purge_cip36_registration(&session, purge_condition).await?;
40 purge_cip36_registration_for_vote_key(&session, purge_condition).await?;
41 purge_cip36_registration_invalid(&session, purge_condition).await?;
42 purge_rbac509_registration(&session, purge_condition).await?;
43 purge_invalid_rbac509_registration(&session, purge_condition).await?;
44 purge_stake_registration(&session, purge_condition).await?;
45 purge_txo_ada(&session, purge_condition).await?;
46 purge_txo_assets(&session, purge_condition).await?;
47 purge_unstaked_txo_ada(&session, purge_condition).await?;
48 purge_unstaked_txo_assets(&session, purge_condition).await?;
49
50 Ok(())
51}
52
53async fn purge_catalyst_id_for_stake_address(
55 session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
56) -> anyhow::Result<()> {
57 use purge::catalyst_id_for_stake_address::{DeleteQuery, Params, PrimaryKeyQuery};
58
59 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
61 let mut delete_params: Vec<Params> = Vec::new();
63 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
64 if purge_condition.filter(primary_key.1.into()) {
65 delete_params.push(primary_key.into());
66 }
67 }
68 DeleteQuery::execute(session, delete_params).await?;
70 Ok(())
71}
72
73async fn purge_catalyst_id_for_txn_id(
75 session: &Arc<CassandraSession>, txn_hashes: &HashSet<TransactionId>,
76) -> anyhow::Result<()> {
77 use purge::catalyst_id_for_txn_id::{DeleteQuery, Params, PrimaryKeyQuery};
78
79 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
81 let mut delete_params: Vec<Params> = Vec::new();
83 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
84 let params: Params = primary_key.into();
85 if txn_hashes.contains(¶ms.txn_id.into()) {
86 delete_params.push(params);
87 }
88 }
89 DeleteQuery::execute(session, delete_params).await?;
91 Ok(())
92}
93
94async fn purge_cip36_registration(
96 session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
97) -> anyhow::Result<()> {
98 use purge::cip36_registration::{DeleteQuery, Params, PrimaryKeyQuery};
99
100 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
102 let mut delete_params: Vec<Params> = Vec::new();
104 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
105 let params: Params = primary_key.into();
106 if purge_condition.filter(params.slot_no.into()) {
107 delete_params.push(params);
108 }
109 }
110 DeleteQuery::execute(session, delete_params).await?;
112 Ok(())
113}
114
115async fn purge_cip36_registration_for_vote_key(
117 session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
118) -> anyhow::Result<()> {
119 use purge::cip36_registration_for_vote_key::{DeleteQuery, Params, PrimaryKeyQuery};
120
121 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
123 let mut delete_params: Vec<Params> = Vec::new();
125 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
126 let params: Params = primary_key.into();
127 if purge_condition.filter(params.slot_no.into()) {
128 delete_params.push(params);
129 }
130 }
131 DeleteQuery::execute(session, delete_params).await?;
133 Ok(())
134}
135
136async fn purge_cip36_registration_invalid(
138 session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
139) -> anyhow::Result<()> {
140 use purge::cip36_registration_invalid::{DeleteQuery, Params, PrimaryKeyQuery};
141
142 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
144 let mut delete_params: Vec<Params> = Vec::new();
146 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
147 let params: Params = primary_key.into();
148 if purge_condition.filter(params.slot_no.into()) {
149 delete_params.push(params);
150 }
151 }
152 DeleteQuery::execute(session, delete_params).await?;
154 Ok(())
155}
156
157async fn purge_rbac509_registration(
159 session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
160) -> anyhow::Result<()> {
161 use purge::rbac509_registration::{DeleteQuery, Params, PrimaryKeyQuery};
162
163 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
165 let mut delete_params: Vec<Params> = Vec::new();
167 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
168 if purge_condition.filter(primary_key.2.into()) {
169 delete_params.push(primary_key.into());
170 }
171 }
172 DeleteQuery::execute(session, delete_params).await?;
174 Ok(())
175}
176
177async fn purge_invalid_rbac509_registration(
179 session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
180) -> anyhow::Result<()> {
181 use purge::rbac509_invalid_registration::{DeleteQuery, Params, PrimaryKeyQuery};
182
183 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
184 let mut delete_params: Vec<Params> = Vec::new();
185 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
186 if purge_condition.filter(primary_key.2.into()) {
187 delete_params.push(primary_key.into());
188 }
189 }
190
191 DeleteQuery::execute(session, delete_params).await?;
192 Ok(())
193}
194
195async fn purge_stake_registration(
197 session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
198) -> anyhow::Result<()> {
199 use purge::stake_registration::{DeleteQuery, Params, PrimaryKeyQuery};
200
201 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
203 let mut delete_params: Vec<Params> = Vec::new();
205 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
206 let params: Params = primary_key.into();
207 if purge_condition.filter(params.slot_no.into()) {
208 delete_params.push(params);
209 }
210 }
211 DeleteQuery::execute(session, delete_params).await?;
213 Ok(())
214}
215
216async fn purge_txi_by_hash(
218 session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
219) -> anyhow::Result<HashSet<TransactionId>> {
220 use purge::txi_by_hash::{DeleteQuery, Params, PrimaryKeyQuery};
221
222 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
224 let mut delete_params: Vec<Params> = Vec::new();
226 let mut txn_hashes: HashSet<TransactionId> = HashSet::new();
227 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
228 if purge_condition.filter(primary_key.2.into()) {
229 let params: Params = primary_key.into();
230 txn_hashes.insert(params.txn_id.into());
231 delete_params.push(params);
232 }
233 }
234 DeleteQuery::execute(session, delete_params).await?;
236 Ok(txn_hashes)
237}
238
239async fn purge_txo_ada(
241 session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
242) -> anyhow::Result<()> {
243 use purge::txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
244
245 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
247 let mut delete_params: Vec<Params> = Vec::new();
249 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
250 let params: Params = primary_key.into();
251 if purge_condition.filter(params.slot_no.into()) {
252 delete_params.push(params);
253 }
254 }
255 DeleteQuery::execute(session, delete_params).await?;
257 Ok(())
258}
259
260async fn purge_txo_assets(
262 session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
263) -> anyhow::Result<()> {
264 use purge::txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
265
266 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
268 let mut delete_params: Vec<Params> = Vec::new();
270 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
271 let params: Params = primary_key.into();
272 if purge_condition.filter(params.slot_no.into()) {
273 delete_params.push(params);
274 }
275 }
276 DeleteQuery::execute(session, delete_params).await?;
278 Ok(())
279}
280
281async fn purge_unstaked_txo_ada(
283 session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
284) -> anyhow::Result<()> {
285 use purge::unstaked_txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
286
287 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
289 let mut delete_params: Vec<Params> = Vec::new();
291 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
292 if purge_condition.filter(primary_key.2.clone().into()) {
293 let params: Params = primary_key.into();
294 delete_params.push(params);
295 }
296 }
297 DeleteQuery::execute(session, delete_params).await?;
299 Ok(())
300}
301
302async fn purge_unstaked_txo_assets(
304 session: &Arc<CassandraSession>, purge_condition: PurgeCondition,
305) -> anyhow::Result<()> {
306 use purge::unstaked_txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
307
308 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
310 let mut delete_params: Vec<Params> = Vec::new();
312 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
313 if purge_condition.filter(primary_key.4.clone().into()) {
314 let params: Params = primary_key.into();
315 delete_params.push(params);
316 }
317 }
318 DeleteQuery::execute(session, delete_params).await?;
320 Ok(())
321}