1use std::{fmt::Display, ops::Sub, sync::Arc, time::Duration};
4
5use cardano_blockchain_types::{MultiEraBlock, Network, Point, Slot};
6use cardano_chain_follower::{ChainFollower, ChainSyncConfig};
7use duration_string::DurationString;
8use event::EventTarget;
9use futures::{stream::FuturesUnordered, StreamExt};
10use rand::{Rng, SeedableRng};
11use tokio::sync::broadcast;
12use tracing::{debug, error, info};
13
14use crate::{
15 db::index::{
16 block::{
17 index_block,
18 roll_forward::{self, PurgeCondition},
19 },
20 queries::sync_status::{
21 get::{get_sync_status, SyncStatus},
22 update::update_sync_status,
23 },
24 session::CassandraSession,
25 },
26 service::utilities::health::{
27 set_follower_immutable_first_reached_tip, set_follower_live_first_reached_tip,
28 },
29 settings::{chain_follower, Settings},
30};
31
32pub(crate) mod event;
33
34pub(crate) const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
36
37async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
39 let chain = cfg.chain;
40 let dl_config = cfg.dl_config.clone();
41
42 let mut cfg = ChainSyncConfig::default_for(chain);
43 cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
44 info!(chain = %chain, "Starting Chain Sync Task");
45
46 if let Err(error) = cfg.run().await {
47 error!(chain=%chain, error=%error, "Failed to start Chain Sync Task");
48 Err(error)?;
49 }
50
51 Ok(())
52}
53
54#[derive(Clone)]
56struct SyncParams {
57 chain: Network,
59 start: Point,
61 end: Point,
63 first_indexed_block: Option<Point>,
65 first_is_immutable: bool,
67 last_indexed_block: Option<Point>,
69 last_is_immutable: bool,
71 total_blocks_synced: u64,
73 last_blocks_synced: u64,
75 retries: u64,
77 backoff_delay: Option<Duration>,
79 result: Arc<Option<anyhow::Result<()>>>,
81 follower_roll_forward: Option<Point>,
83}
84
85impl Display for SyncParams {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 if self.result.is_none() {
88 write!(f, "Sync_Params {{ ")?;
89 } else {
90 write!(f, "Sync_Result {{ ")?;
91 }
92
93 write!(f, "start: {}, end: {}", self.start, self.end)?;
94
95 if let Some(first) = self.first_indexed_block.as_ref() {
96 write!(
97 f,
98 ", first_indexed_block: {first}{}",
99 if self.first_is_immutable { ":I" } else { "" }
100 )?;
101 }
102
103 if let Some(last) = self.last_indexed_block.as_ref() {
104 write!(
105 f,
106 ", last_indexed_block: {last}{}",
107 if self.last_is_immutable { ":I" } else { "" }
108 )?;
109 }
110
111 if self.retries > 0 {
112 write!(f, ", retries: {}", self.retries)?;
113 }
114
115 if self.retries > 0 || self.result.is_some() {
116 write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
117 }
118
119 if self.result.is_some() {
120 write!(f, ", last_sync: {}", self.last_blocks_synced)?;
121 }
122
123 if let Some(backoff) = self.backoff_delay.as_ref() {
124 write!(f, ", backoff: {}", DurationString::from(*backoff))?;
125 }
126
127 if let Some(result) = self.result.as_ref() {
128 match result {
129 Ok(()) => write!(f, ", Success")?,
130 Err(error) => write!(f, ", {error}")?,
131 };
132 }
133
134 f.write_str(" }")
135 }
136}
137
138const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
140
141impl SyncParams {
142 fn new(chain: Network, start: Point, end: Point) -> Self {
144 Self {
145 chain,
146 start,
147 end,
148 first_indexed_block: None,
149 first_is_immutable: false,
150 last_indexed_block: None,
151 last_is_immutable: false,
152 total_blocks_synced: 0,
153 last_blocks_synced: 0,
154 retries: 0,
155 backoff_delay: None,
156 result: Arc::new(None),
157 follower_roll_forward: None,
158 }
159 }
160
161 fn retry(&self) -> Self {
163 let retry_count = self.retries.saturating_add(1);
164
165 let mut backoff = None;
166
167 if self.last_blocks_synced == 0 {
170 backoff = match retry_count {
172 1 => Some(Duration::from_secs(1)), 2..5 => Some(Duration::from_secs(10)), _ => Some(Duration::from_secs(30)), };
176 }
177
178 let mut retry = self.clone();
179 retry.last_blocks_synced = 0;
180 retry.retries = retry_count;
181 retry.backoff_delay = backoff;
182 retry.result = Arc::new(None);
183 retry.follower_roll_forward = None;
184
185 retry
186 }
187
188 pub(crate) fn done(
190 &self, first: Option<Point>, first_immutable: bool, last: Option<Point>,
191 last_immutable: bool, synced: u64, result: anyhow::Result<()>,
192 ) -> Self {
193 if result.is_ok() && self.end != Point::TIP {
194 update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
198 }
199 let mut done = self.clone();
200 done.first_indexed_block = first;
201 done.first_is_immutable = first_immutable;
202 done.last_indexed_block = last;
203 done.last_is_immutable = last_immutable;
204 done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
205 done.last_blocks_synced = synced;
206 done.result = Arc::new(Some(result));
207
208 done
209 }
210
211 fn actual_start(&self) -> Point {
213 self.last_indexed_block
214 .as_ref()
215 .unwrap_or(&self.start)
216 .clone()
217 }
218
219 async fn backoff(&self) {
225 if let Some(backoff) = self.backoff_delay {
226 let mut rng = rand::rngs::StdRng::from_entropy();
227 let actual_backoff =
228 rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
229
230 tokio::time::sleep(actual_backoff).await;
231 }
232 }
233}
234
235#[allow(clippy::too_many_lines)]
238fn sync_subchain(
239 params: SyncParams, event_sender: broadcast::Sender<event::ChainIndexerEvent>,
240) -> tokio::task::JoinHandle<SyncParams> {
241 tokio::spawn(async move {
242 params.backoff().await;
244
245 drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
247 info!(chain=%params.chain, params=%params,"Starting Chain Indexing Task");
248
249 let mut first_indexed_block = params.first_indexed_block.clone();
250 let mut first_immutable = params.first_is_immutable;
251 let mut last_indexed_block = params.last_indexed_block.clone();
252 let mut last_immutable = params.last_is_immutable;
253 let mut blocks_synced = 0u64;
254
255 let mut follower =
256 ChainFollower::new(params.chain, params.actual_start(), params.end.clone()).await;
257 while let Some(chain_update) = follower.next().await {
258 let tips = ChainFollower::get_tips(params.chain).await;
259 let immutable_slot = tips.0.slot_or_default();
260 let live_slot = tips.1.slot_or_default();
261 let event = event::ChainIndexerEvent::LiveTipSlotChanged {
262 immutable_slot,
263 live_slot,
264 };
265 if let Err(err) = event_sender.send(event) {
266 error!(error=%err, "Unable to send event.");
267 } else {
268 debug!(live_tip_slot=?live_slot, "Chain Indexer update");
269 }
270 match chain_update.kind {
271 cardano_chain_follower::Kind::ImmutableBlockRollForward => {
272 if params.end == Point::TIP {
274 info!(chain=%params.chain, point=?chain_update.block_data().point(), "Immutable chain rolled forward.");
278 let mut result = params.done(
279 first_indexed_block,
280 first_immutable,
281 last_indexed_block,
282 last_immutable,
283 blocks_synced,
284 Ok(()),
285 );
286 result.follower_roll_forward = Some(chain_update.block_data().point());
289 return result;
290 }
291 },
292 cardano_chain_follower::Kind::Block => {
293 let block = chain_update.block_data();
294
295 if let Err(error) = index_block(block).await {
296 let error_msg = format!("Failed to index block {}", block.point());
297 error!(chain=%params.chain, error=%error, params=%params, error_msg);
298 return params.done(
299 first_indexed_block,
300 first_immutable,
301 last_indexed_block,
302 last_immutable,
303 blocks_synced,
304 Err(error.context(error_msg)),
305 );
306 }
307
308 if chain_update.tip && !set_follower_live_first_reached_tip() {
309 let _ = event_sender.send(event::ChainIndexerEvent::SyncLiveChainCompleted);
310 }
311
312 update_block_state(
313 block,
314 &mut first_indexed_block,
315 &mut first_immutable,
316 &mut last_indexed_block,
317 &mut last_immutable,
318 &mut blocks_synced,
319 );
320 },
321
322 cardano_chain_follower::Kind::Rollback => {
323 let rollback_slot = chain_update.block_data().slot();
325
326 let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
327 if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
328 error!(chain=%params.chain, error=%error,
329 "Chain follower rollback, purging volatile data task failed."
330 );
331 } else {
332 #[allow(clippy::arithmetic_side_effects)]
334 let purge_slots = params
335 .last_indexed_block
336 .as_ref()
337 .map_or(0.into(), |l| l.slot_or_default() - rollback_slot);
339
340 let _ = event_sender.send(event::ChainIndexerEvent::ForwardDataPurged {
341 purge_slots: purge_slots.into(),
342 });
343
344 let block = chain_update.block_data();
346 if let Err(error) = index_block(block).await {
347 let error_msg =
348 format!("Failed to index block after rollback {}", block.point());
349 error!(chain=%params.chain, error=%error, params=%params, error_msg);
350 return params.done(
351 first_indexed_block,
352 first_immutable,
353 last_indexed_block,
354 last_immutable,
355 blocks_synced,
356 Err(error.context(error_msg)),
357 );
358 }
359
360 update_block_state(
361 block,
362 &mut first_indexed_block,
363 &mut first_immutable,
364 &mut last_indexed_block,
365 &mut last_immutable,
366 &mut blocks_synced,
367 );
368 }
369 },
370 }
371 }
372
373 let result = params.done(
374 first_indexed_block,
375 first_immutable,
376 last_indexed_block,
377 last_immutable,
378 blocks_synced,
379 Ok(()),
380 );
381
382 info!(chain = %result.chain, result=%result, "Indexing Blockchain Completed: OK");
383
384 result
385 })
386}
387
388fn update_block_state(
390 block: &MultiEraBlock, first_indexed_block: &mut Option<Point>, first_immutable: &mut bool,
391 last_indexed_block: &mut Option<Point>, last_immutable: &mut bool, blocks_synced: &mut u64,
392) {
393 *last_immutable = block.is_immutable();
394 *last_indexed_block = Some(block.point());
395
396 if first_indexed_block.is_none() {
397 *first_immutable = *last_immutable;
398 *first_indexed_block = Some(block.point());
399 }
400
401 *blocks_synced = blocks_synced.saturating_add(1);
402}
403
404struct SyncTask {
407 cfg: chain_follower::EnvVars,
409
410 sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
412
413 current_sync_tasks: u16,
415
416 start_slot: Slot,
418
419 immutable_tip_slot: Slot,
421
422 live_tip_slot: Slot,
424
425 sync_status: Vec<SyncStatus>,
427
428 event_channel: (
430 broadcast::Sender<event::ChainIndexerEvent>,
431 broadcast::Receiver<event::ChainIndexerEvent>,
432 ),
433}
434
435impl SyncTask {
436 fn new(cfg: chain_follower::EnvVars) -> SyncTask {
438 Self {
439 cfg,
440 sync_tasks: FuturesUnordered::new(),
441 start_slot: 0.into(),
442 current_sync_tasks: 0,
443 immutable_tip_slot: 0.into(),
444 live_tip_slot: 0.into(),
445 sync_status: Vec::new(),
446 event_channel: broadcast::channel(10),
447 }
448 }
449
450 fn add_sync_task(
452 &mut self, params: SyncParams, event_sender: broadcast::Sender<event::ChainIndexerEvent>,
453 ) {
454 self.sync_tasks.push(sync_subchain(params, event_sender));
455 self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
456 debug!(current_sync_tasks=%self.current_sync_tasks, "Added new Sync Task");
457 self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
458 current_sync_tasks: self.current_sync_tasks,
459 });
460 }
461
462 fn sync_task_finished(&mut self) {
464 self.current_sync_tasks = self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
465 error!("current_sync_tasks -= 1 overflow");
466 0
467 });
468 debug!(current_sync_tasks=%self.current_sync_tasks, "Finished Sync Task");
469 self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
470 current_sync_tasks: self.current_sync_tasks,
471 });
472 }
473
474 #[allow(clippy::too_many_lines)]
483 async fn run(&mut self) {
484 let tips = ChainFollower::get_tips(self.cfg.chain).await;
487 self.immutable_tip_slot = tips.0.slot_or_default();
488 self.live_tip_slot = tips.1.slot_or_default();
489 info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Running the primary blockchain follower task.");
490
491 self.dispatch_event(event::ChainIndexerEvent::ImmutableTipSlotChanged {
492 immutable_slot: self.immutable_tip_slot,
493 live_slot: self.live_tip_slot,
494 });
495 self.dispatch_event(event::ChainIndexerEvent::LiveTipSlotChanged {
496 immutable_slot: self.immutable_tip_slot,
497 live_slot: self.live_tip_slot,
498 });
499
500 drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
506
507 info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state for indexing");
508 self.sync_status = get_sync_status().await;
509 debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
510
511 self.add_sync_task(
514 SyncParams::new(
515 self.cfg.chain,
516 Point::fuzzy(self.immutable_tip_slot),
517 Point::TIP,
518 ),
519 self.event_channel.0.clone(),
520 );
521 self.dispatch_event(event::ChainIndexerEvent::SyncLiveChainStarted);
522
523 self.start_immutable_followers();
524 if self.sync_tasks.len() == 1 {
527 set_follower_immutable_first_reached_tip();
528 self.dispatch_event(event::ChainIndexerEvent::SyncImmutableChainCompleted);
529 } else {
530 self.dispatch_event(event::ChainIndexerEvent::SyncImmutableChainStarted);
531 }
532
533 while let Some(completed) = self.sync_tasks.next().await {
539 self.sync_task_finished();
541
542 match completed {
543 Ok(finished) => {
544 let tips = ChainFollower::get_tips(self.cfg.chain).await;
545 let immutable_tip_slot = tips.0.slot_or_default();
546 let live_tip_slot = tips.1.slot_or_default();
547 info!(immutable_tip_slot=?immutable_tip_slot, live_tip_slot=?live_tip_slot, "Chain Indexer task finished");
548 if finished.end == Point::TIP {
556 if let Some(ref roll_forward_point) = finished.follower_roll_forward {
557 self.immutable_tip_slot = roll_forward_point.slot_or_default();
560
561 self.dispatch_event(
562 event::ChainIndexerEvent::ImmutableTipSlotChanged {
563 immutable_slot: self.immutable_tip_slot,
564 live_slot: self.live_tip_slot,
565 },
566 );
567 info!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished reaching TIP.");
568
569 self.start_immutable_followers();
570 self.dispatch_event(
571 event::ChainIndexerEvent::SyncImmutableChainStarted,
572 );
573 } else {
574 error!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished without to reach TIP.");
575 }
576
577 self.add_sync_task(finished.retry(), self.event_channel.0.clone());
579 } else if let Some(result) = finished.result.as_ref() {
580 match result {
581 Ok(()) => {
582 info!(chain=%self.cfg.chain, report=%finished,
583 "The Immutable follower completed successfully.");
584
585 finished.last_indexed_block.as_ref().inspect(|block| {
586 self.dispatch_event(
587 event::ChainIndexerEvent::IndexedSlotProgressed {
588 slot: block.slot_or_default(),
589 },
590 );
591 });
592
593 self.start_immutable_followers();
596 },
597 Err(error) => {
598 error!(chain=%self.cfg.chain, report=%finished, error=%error,
599 "An Immutable follower failed, restarting it.");
600 self.add_sync_task(finished.retry(), self.event_channel.0.clone());
603 },
604 }
605 } else {
606 error!(chain=%self.cfg.chain, report=%finished,
607 "BUG: The Immutable follower completed, but without a proper result.");
608 }
609 },
610 Err(error) => {
611 error!(chain=%self.cfg.chain, error=%error, "BUG: Sync task failed. Can not restart it, not enough information. Sync is probably failed at this point.");
612 },
613 }
614
615 if self.sync_tasks.len() == 1 {
624 set_follower_immutable_first_reached_tip();
625 self.dispatch_event(event::ChainIndexerEvent::SyncImmutableChainCompleted);
626
627 #[allow(clippy::arithmetic_side_effects)]
630 let purge_to_slot =
631 self.immutable_tip_slot - Settings::purge_backward_slot_buffer();
632 let purge_condition = PurgeCondition::PurgeBackwards(purge_to_slot);
633 info!(chain=%self.cfg.chain, purge_to_slot=?purge_to_slot, "Backwards purging volatile data.");
634 if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
635 error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
636 } else {
637 self.dispatch_event(event::ChainIndexerEvent::BackwardDataPurged);
638 }
639 }
640 }
641
642 error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped. This is an unexpected error!");
643 }
644
645 fn start_immutable_followers(&mut self) {
647 if self.start_slot < self.immutable_tip_slot {
653 while self.current_sync_tasks < self.cfg.sync_tasks {
655 let end_slot = self.immutable_tip_slot.min(
656 (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
657 .into(),
658 );
659
660 if let Some((first_point, last_point)) =
661 self.get_syncable_range(self.start_slot, end_slot)
662 {
663 self.add_sync_task(
664 SyncParams::new(self.cfg.chain, first_point, last_point.clone()),
665 self.event_channel.0.clone(),
666 );
667 }
668
669 self.start_slot = end_slot;
672
673 if end_slot == self.immutable_tip_slot {
674 break;
675 }
676 }
677 }
678 }
679
680 fn get_syncable_range(&self, start: Slot, end: Slot) -> Option<(Point, Point)> {
685 for sync_block in &self.sync_status {
686 if start >= sync_block.start_slot && start <= sync_block.end_slot {
688 if end <= sync_block.end_slot {
690 return None;
691 }
692
693 return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
700 }
701 }
702
703 let start_slot = if start == 0.into() {
704 Point::ORIGIN
705 } else {
706 Point::fuzzy(start)
707 };
708
709 Some((start_slot, Point::fuzzy(end)))
710 }
711}
712
713impl event::EventTarget<event::ChainIndexerEvent> for SyncTask {
714 fn add_event_listener(&mut self, listener: event::EventListenerFn<event::ChainIndexerEvent>) {
715 let mut rx = self.event_channel.0.subscribe();
716 tokio::spawn(async move {
717 loop {
718 match rx.recv().await {
719 Ok(event) => (listener)(&event),
720 Err(broadcast::error::RecvError::Lagged(lag)) => {
721 debug!(lag = lag, "Sync tasks event listener lagged");
722 },
723 Err(broadcast::error::RecvError::Closed) => break,
724 }
725 }
726 });
727 }
728
729 fn dispatch_event(&self, message: event::ChainIndexerEvent) {
730 debug!(event = ?message, "Chain Indexer Event");
731 let _ = self.event_channel.0.send(message);
732 }
733}
734
735pub(crate) async fn start_followers() -> anyhow::Result<()> {
737 let cfg = Settings::follower_cfg();
738
739 cfg.log();
741
742 start_sync_for(&cfg).await?;
744
745 tokio::spawn(async move {
746 use self::event::ChainIndexerEvent as Event;
747 use crate::metrics::chain_indexer::reporter;
748
749 let api_host_names = Settings::api_host_names().join(",");
750 let service_id = Settings::service_id();
751 let network = cfg.chain.to_string();
752
753 let mut sync_task = SyncTask::new(cfg);
754
755 sync_task.add_event_listener(Box::new(move |event: &Event| {
757 if let Event::SyncLiveChainStarted = event {
758 reporter::LIVE_REACHED_TIP
759 .with_label_values(&[&api_host_names, service_id, &network])
760 .set(0);
761 }
762 if let Event::SyncImmutableChainStarted = event {
763 reporter::IMMUTABLE_REACHED_TIP
764 .with_label_values(&[&api_host_names, service_id, &network])
765 .set(0);
766 }
767 if let Event::SyncLiveChainCompleted = event {
768 reporter::LIVE_REACHED_TIP
769 .with_label_values(&[&api_host_names, service_id, &network])
770 .set(1);
771 }
772 if let Event::SyncImmutableChainCompleted = event {
773 reporter::IMMUTABLE_REACHED_TIP
774 .with_label_values(&[&api_host_names, service_id, &network])
775 .set(1);
776 }
777 if let Event::SyncTasksChanged { current_sync_tasks } = event {
778 reporter::RUNNING_INDEXER_TASKS_COUNT
779 .with_label_values(&[&api_host_names, service_id, &network])
780 .set(From::from(*current_sync_tasks));
781 }
782 if let Event::LiveTipSlotChanged {
783 live_slot,
784 immutable_slot,
785 } = event
786 {
787 reporter::CURRENT_LIVE_TIP_SLOT
788 .with_label_values(&[&api_host_names, service_id, &network])
789 .set(i64::try_from(u64::from(*live_slot)).unwrap_or(-1));
790 reporter::SLOT_TIP_DIFF
791 .with_label_values(&[&api_host_names, service_id, &network])
792 .set(
793 u64::from(live_slot.sub(*immutable_slot))
794 .try_into()
795 .unwrap_or(-1),
796 );
797 }
798 if let Event::ImmutableTipSlotChanged {
799 live_slot,
800 immutable_slot,
801 } = event
802 {
803 reporter::CURRENT_IMMUTABLE_TIP_SLOT
804 .with_label_values(&[&api_host_names, service_id, &network])
805 .set(i64::try_from(u64::from(*immutable_slot)).unwrap_or(-1));
806
807 reporter::SLOT_TIP_DIFF
808 .with_label_values(&[&api_host_names, service_id, &network])
809 .set(
810 u64::from(live_slot.sub(*immutable_slot))
811 .try_into()
812 .unwrap_or(-1),
813 );
814 }
815 if let Event::IndexedSlotProgressed { slot } = event {
816 reporter::HIGHEST_COMPLETE_INDEXED_SLOT
817 .with_label_values(&[&api_host_names, service_id, &network])
818 .set(i64::try_from(u64::from(*slot)).unwrap_or(-1));
819 }
820 if let Event::BackwardDataPurged = event {
821 reporter::TRIGGERED_BACKWARD_PURGES_COUNT
822 .with_label_values(&[&api_host_names, service_id, &network])
823 .inc();
824 }
825 if let Event::ForwardDataPurged { purge_slots } = event {
826 reporter::TRIGGERED_FORWARD_PURGES_COUNT
827 .with_label_values(&[&api_host_names, service_id, &network])
828 .inc();
829 reporter::PURGED_SLOTS
830 .with_label_values(&[&api_host_names, service_id, &network])
831 .set(i64::try_from(*purge_slots).unwrap_or(-1));
832 }
833 }));
834
835 sync_task.run().await;
836 });
837
838 Ok(())
839}