cat_gateway/db/index/block/
roll_forward.rs1use std::{collections::HashSet, sync::Arc};
4
5use cardano_blockchain_types::{Slot, TransactionId};
6use futures::StreamExt;
7
8use crate::{
9 db::index::{block::CassandraSession, queries::purge},
10 settings::Settings,
11};
12
13pub(crate) async fn purge_live_index(purge_slot: Slot) -> anyhow::Result<()> {
15 let persistent = false; let Some(session) = CassandraSession::get(persistent) else {
17 anyhow::bail!("Failed to acquire db session");
18 };
19
20 #[allow(clippy::arithmetic_side_effects)]
23 let purge_to_slot = purge_slot - Settings::purge_slot_buffer();
24
25 let txn_hashes = purge_txi_by_hash(&session, purge_to_slot).await?;
26 purge_catalyst_id_for_stake_address(&session, purge_to_slot).await?;
27 purge_catalyst_id_for_txn_id(&session, &txn_hashes).await?;
28 purge_cip36_registration(&session, purge_to_slot).await?;
29 purge_cip36_registration_for_vote_key(&session, purge_to_slot).await?;
30 purge_cip36_registration_invalid(&session, purge_to_slot).await?;
31 purge_rbac509_registration(&session, purge_to_slot).await?;
32 purge_invalid_rbac509_registration(&session, purge_to_slot).await?;
33 purge_stake_registration(&session, purge_to_slot).await?;
34 purge_txo_ada(&session, purge_to_slot).await?;
35 purge_txo_assets(&session, purge_to_slot).await?;
36 purge_unstaked_txo_ada(&session, purge_to_slot).await?;
37 purge_unstaked_txo_assets(&session, purge_to_slot).await?;
38
39 Ok(())
40}
41
42async fn purge_catalyst_id_for_stake_address(
44 session: &Arc<CassandraSession>, purge_to_slot: Slot,
45) -> anyhow::Result<()> {
46 use purge::catalyst_id_for_stake_address::{DeleteQuery, Params, PrimaryKeyQuery};
47
48 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
50 let mut delete_params: Vec<Params> = Vec::new();
52 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
53 if primary_key.1 <= purge_to_slot.into() {
54 delete_params.push(primary_key.into());
55 }
56 }
57 DeleteQuery::execute(session, delete_params).await?;
59 Ok(())
60}
61
62async fn purge_catalyst_id_for_txn_id(
64 session: &Arc<CassandraSession>, txn_hashes: &HashSet<TransactionId>,
65) -> anyhow::Result<()> {
66 use purge::catalyst_id_for_txn_id::{DeleteQuery, Params, PrimaryKeyQuery};
67
68 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
70 let mut delete_params: Vec<Params> = Vec::new();
72 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
73 let params: Params = primary_key.into();
74 if txn_hashes.contains(¶ms.txn_id.into()) {
75 delete_params.push(params);
76 }
77 }
78 DeleteQuery::execute(session, delete_params).await?;
80 Ok(())
81}
82
83async fn purge_cip36_registration(
85 session: &Arc<CassandraSession>, purge_to_slot: Slot,
86) -> anyhow::Result<()> {
87 use purge::cip36_registration::{DeleteQuery, Params, PrimaryKeyQuery};
88
89 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
91 let mut delete_params: Vec<Params> = Vec::new();
93 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
94 let params: Params = primary_key.into();
95 if params.slot_no <= purge_to_slot.into() {
96 delete_params.push(params);
97 }
98 }
99 DeleteQuery::execute(session, delete_params).await?;
101 Ok(())
102}
103
104async fn purge_cip36_registration_for_vote_key(
106 session: &Arc<CassandraSession>, purge_to_slot: Slot,
107) -> anyhow::Result<()> {
108 use purge::cip36_registration_for_vote_key::{DeleteQuery, Params, PrimaryKeyQuery};
109
110 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
112 let mut delete_params: Vec<Params> = Vec::new();
114 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
115 let params: Params = primary_key.into();
116 if params.slot_no <= purge_to_slot.into() {
117 delete_params.push(params);
118 }
119 }
120 DeleteQuery::execute(session, delete_params).await?;
122 Ok(())
123}
124
125async fn purge_cip36_registration_invalid(
127 session: &Arc<CassandraSession>, purge_to_slot: Slot,
128) -> anyhow::Result<()> {
129 use purge::cip36_registration_invalid::{DeleteQuery, Params, PrimaryKeyQuery};
130
131 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
133 let mut delete_params: Vec<Params> = Vec::new();
135 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
136 let params: Params = primary_key.into();
137 if params.slot_no <= purge_to_slot.into() {
138 delete_params.push(params);
139 }
140 }
141 DeleteQuery::execute(session, delete_params).await?;
143 Ok(())
144}
145
146async fn purge_rbac509_registration(
148 session: &Arc<CassandraSession>, purge_to_slot: Slot,
149) -> anyhow::Result<()> {
150 use purge::rbac509_registration::{DeleteQuery, Params, PrimaryKeyQuery};
151
152 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
154 let mut delete_params: Vec<Params> = Vec::new();
156 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
157 if primary_key.2 <= purge_to_slot.into() {
158 delete_params.push(primary_key.into());
159 }
160 }
161 DeleteQuery::execute(session, delete_params).await?;
163 Ok(())
164}
165
166async fn purge_invalid_rbac509_registration(
168 session: &Arc<CassandraSession>, purge_to_slot: Slot,
169) -> anyhow::Result<()> {
170 use purge::rbac509_invalid_registration::{DeleteQuery, Params, PrimaryKeyQuery};
171
172 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
173 let mut delete_params: Vec<Params> = Vec::new();
174 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
175 if primary_key.2 <= purge_to_slot.into() {
176 delete_params.push(primary_key.into());
177 }
178 }
179
180 DeleteQuery::execute(session, delete_params).await?;
181 Ok(())
182}
183
184async fn purge_stake_registration(
186 session: &Arc<CassandraSession>, purge_to_slot: Slot,
187) -> anyhow::Result<()> {
188 use purge::stake_registration::{DeleteQuery, Params, PrimaryKeyQuery};
189
190 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
192 let mut delete_params: Vec<Params> = Vec::new();
194 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
195 let params: Params = primary_key.into();
196 if params.slot_no <= purge_to_slot.into() {
197 delete_params.push(params);
198 }
199 }
200 DeleteQuery::execute(session, delete_params).await?;
202 Ok(())
203}
204
205async fn purge_txi_by_hash(
207 session: &Arc<CassandraSession>, purge_to_slot: Slot,
208) -> anyhow::Result<HashSet<TransactionId>> {
209 use purge::txi_by_hash::{DeleteQuery, Params, PrimaryKeyQuery};
210
211 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
213 let mut delete_params: Vec<Params> = Vec::new();
215 let mut txn_hashes: HashSet<TransactionId> = HashSet::new();
216 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
217 if primary_key.2 <= purge_to_slot.into() {
218 let params: Params = primary_key.into();
219 txn_hashes.insert(params.txn_id.into());
220 delete_params.push(params);
221 }
222 }
223 DeleteQuery::execute(session, delete_params).await?;
225 Ok(txn_hashes)
226}
227
228async fn purge_txo_ada(session: &Arc<CassandraSession>, purge_to_slot: Slot) -> anyhow::Result<()> {
230 use purge::txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
231
232 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
234 let mut delete_params: Vec<Params> = Vec::new();
236 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
237 let params: Params = primary_key.into();
238 if params.slot_no <= purge_to_slot.into() {
239 delete_params.push(params);
240 }
241 }
242 DeleteQuery::execute(session, delete_params).await?;
244 Ok(())
245}
246
247async fn purge_txo_assets(
249 session: &Arc<CassandraSession>, purge_to_slot: Slot,
250) -> anyhow::Result<()> {
251 use purge::txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
252
253 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
255 let mut delete_params: Vec<Params> = Vec::new();
257 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
258 let params: Params = primary_key.into();
259 if params.slot_no <= purge_to_slot.into() {
260 delete_params.push(params);
261 }
262 }
263 DeleteQuery::execute(session, delete_params).await?;
265 Ok(())
266}
267
268async fn purge_unstaked_txo_ada(
270 session: &Arc<CassandraSession>, purge_to_slot: Slot,
271) -> anyhow::Result<()> {
272 use purge::unstaked_txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
273
274 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
276 let mut delete_params: Vec<Params> = Vec::new();
278 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
279 if primary_key.2 <= purge_to_slot.into() {
280 let params: Params = primary_key.into();
281 delete_params.push(params);
282 }
283 }
284 DeleteQuery::execute(session, delete_params).await?;
286 Ok(())
287}
288
289async fn purge_unstaked_txo_assets(
291 session: &Arc<CassandraSession>, purge_to_slot: Slot,
292) -> anyhow::Result<()> {
293 use purge::unstaked_txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
294
295 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
297 let mut delete_params: Vec<Params> = Vec::new();
299 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
300 if primary_key.4 <= purge_to_slot.into() {
301 let params: Params = primary_key.into();
302 delete_params.push(params);
303 }
304 }
305 DeleteQuery::execute(session, delete_params).await?;
307 Ok(())
308}