1use std::{fmt::Display, ops::Sub, sync::Arc, time::Duration};
4
5use cardano_blockchain_types::{MultiEraBlock, Network, Point, Slot};
6use cardano_chain_follower::{ChainFollower, ChainSyncConfig};
7use duration_string::DurationString;
8use event::EventTarget;
9use futures::{stream::FuturesUnordered, StreamExt};
10use rand::{Rng, SeedableRng};
11use tokio::sync::broadcast;
12use tracing::{debug, error, info};
13
14use crate::{
15    db::index::{
16        block::{
17            index_block,
18            roll_forward::{self, PurgeCondition},
19        },
20        queries::sync_status::{
21            get::{get_sync_status, SyncStatus},
22            update::update_sync_status,
23        },
24        session::CassandraSession,
25    },
26    service::utilities::health::{
27        set_follower_immutable_first_reached_tip, set_follower_live_first_reached_tip,
28    },
29    settings::{chain_follower, Settings},
30};
31
32pub(crate) mod event;
33
34pub(crate) const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
36
37async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
39    let chain = cfg.chain;
40    let dl_config = cfg.dl_config.clone();
41
42    let mut cfg = ChainSyncConfig::default_for(chain);
43    cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
44    info!(chain = %chain, "Starting Chain Sync Task");
45
46    if let Err(error) = cfg.run().await {
47        error!(chain=%chain, error=%error, "Failed to start Chain Sync Task");
48        Err(error)?;
49    }
50
51    Ok(())
52}
53
54#[derive(Clone)]
56struct SyncParams {
57    chain: Network,
59    start: Point,
61    end: Point,
63    first_indexed_block: Option<Point>,
65    first_is_immutable: bool,
67    last_indexed_block: Option<Point>,
69    last_is_immutable: bool,
71    total_blocks_synced: u64,
73    last_blocks_synced: u64,
75    retries: u64,
77    backoff_delay: Option<Duration>,
79    result: Arc<Option<anyhow::Result<()>>>,
81    follower_roll_forward: Option<Point>,
83}
84
85impl Display for SyncParams {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        if self.result.is_none() {
88            write!(f, "Sync_Params {{ ")?;
89        } else {
90            write!(f, "Sync_Result {{ ")?;
91        }
92
93        write!(f, "start: {}, end: {}", self.start, self.end)?;
94
95        if let Some(first) = self.first_indexed_block.as_ref() {
96            write!(
97                f,
98                ", first_indexed_block: {first}{}",
99                if self.first_is_immutable { ":I" } else { "" }
100            )?;
101        }
102
103        if let Some(last) = self.last_indexed_block.as_ref() {
104            write!(
105                f,
106                ", last_indexed_block: {last}{}",
107                if self.last_is_immutable { ":I" } else { "" }
108            )?;
109        }
110
111        if self.retries > 0 {
112            write!(f, ", retries: {}", self.retries)?;
113        }
114
115        if self.retries > 0 || self.result.is_some() {
116            write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
117        }
118
119        if self.result.is_some() {
120            write!(f, ", last_sync: {}", self.last_blocks_synced)?;
121        }
122
123        if let Some(backoff) = self.backoff_delay.as_ref() {
124            write!(f, ", backoff: {}", DurationString::from(*backoff))?;
125        }
126
127        if let Some(result) = self.result.as_ref() {
128            match result {
129                Ok(()) => write!(f, ", Success")?,
130                Err(error) => write!(f, ", {error}")?,
131            };
132        }
133
134        f.write_str(" }")
135    }
136}
137
138const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
140
141impl SyncParams {
142    fn new(chain: Network, start: Point, end: Point) -> Self {
144        Self {
145            chain,
146            start,
147            end,
148            first_indexed_block: None,
149            first_is_immutable: false,
150            last_indexed_block: None,
151            last_is_immutable: false,
152            total_blocks_synced: 0,
153            last_blocks_synced: 0,
154            retries: 0,
155            backoff_delay: None,
156            result: Arc::new(None),
157            follower_roll_forward: None,
158        }
159    }
160
161    fn retry(&self) -> Self {
163        let retry_count = self.retries.saturating_add(1);
164
165        let mut backoff = None;
166
167        if self.last_blocks_synced == 0 {
170            backoff = match retry_count {
172                1 => Some(Duration::from_secs(1)),     2..5 => Some(Duration::from_secs(10)), _ => Some(Duration::from_secs(30)),    };
176        }
177
178        let mut retry = self.clone();
179        retry.last_blocks_synced = 0;
180        retry.retries = retry_count;
181        retry.backoff_delay = backoff;
182        retry.result = Arc::new(None);
183        retry.follower_roll_forward = None;
184
185        retry
186    }
187
188    pub(crate) fn done(
190        &self, first: Option<Point>, first_immutable: bool, last: Option<Point>,
191        last_immutable: bool, synced: u64, result: anyhow::Result<()>,
192    ) -> Self {
193        if result.is_ok() && self.end != Point::TIP {
194            update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
198        }
199        let mut done = self.clone();
200        done.first_indexed_block = first;
201        done.first_is_immutable = first_immutable;
202        done.last_indexed_block = last;
203        done.last_is_immutable = last_immutable;
204        done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
205        done.last_blocks_synced = synced;
206        done.result = Arc::new(Some(result));
207
208        done
209    }
210
211    fn actual_start(&self) -> Point {
213        self.last_indexed_block
214            .as_ref()
215            .unwrap_or(&self.start)
216            .clone()
217    }
218
219    async fn backoff(&self) {
225        if let Some(backoff) = self.backoff_delay {
226            let mut rng = rand::rngs::StdRng::from_entropy();
227            let actual_backoff =
228                rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
229
230            tokio::time::sleep(actual_backoff).await;
231        }
232    }
233}
234
235#[allow(clippy::too_many_lines)]
238fn sync_subchain(
239    params: SyncParams, event_sender: broadcast::Sender<event::ChainIndexerEvent>,
240) -> tokio::task::JoinHandle<SyncParams> {
241    tokio::spawn(async move {
242        params.backoff().await;
244
245        drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
247        info!(chain=%params.chain, params=%params,"Starting Chain Indexing Task");
248
249        let mut first_indexed_block = params.first_indexed_block.clone();
250        let mut first_immutable = params.first_is_immutable;
251        let mut last_indexed_block = params.last_indexed_block.clone();
252        let mut last_immutable = params.last_is_immutable;
253        let mut blocks_synced = 0u64;
254
255        let mut follower =
256            ChainFollower::new(params.chain, params.actual_start(), params.end.clone()).await;
257        while let Some(chain_update) = follower.next().await {
258            let tips = ChainFollower::get_tips(params.chain).await;
259            let immutable_slot = tips.0.slot_or_default();
260            let live_slot = tips.1.slot_or_default();
261            let event = event::ChainIndexerEvent::LiveTipSlotChanged {
262                immutable_slot,
263                live_slot,
264            };
265            if let Err(err) = event_sender.send(event) {
266                error!(error=%err, "Unable to send event.");
267            } else {
268                debug!(live_tip_slot=?live_slot, "Chain Indexer update");
269            }
270            match chain_update.kind {
271                cardano_chain_follower::Kind::ImmutableBlockRollForward => {
272                    if params.end == Point::TIP {
274                        info!(chain=%params.chain, point=?chain_update.block_data().point(), "Immutable chain rolled forward.");
278                        let mut result = params.done(
279                            first_indexed_block,
280                            first_immutable,
281                            last_indexed_block,
282                            last_immutable,
283                            blocks_synced,
284                            Ok(()),
285                        );
286                        result.follower_roll_forward = Some(chain_update.block_data().point());
289                        return result;
290                    }
291                },
292                cardano_chain_follower::Kind::Block => {
293                    let block = chain_update.block_data();
294
295                    if let Err(error) = index_block(block).await {
296                        let error_msg = format!("Failed to index block {}", block.point());
297                        error!(chain=%params.chain, error=%error, params=%params, error_msg);
298                        return params.done(
299                            first_indexed_block,
300                            first_immutable,
301                            last_indexed_block,
302                            last_immutable,
303                            blocks_synced,
304                            Err(error.context(error_msg)),
305                        );
306                    }
307
308                    if chain_update.tip && !set_follower_live_first_reached_tip() {
309                        let _ = event_sender.send(event::ChainIndexerEvent::SyncLiveChainCompleted);
310                    }
311
312                    update_block_state(
313                        block,
314                        &mut first_indexed_block,
315                        &mut first_immutable,
316                        &mut last_indexed_block,
317                        &mut last_immutable,
318                        &mut blocks_synced,
319                    );
320                },
321
322                cardano_chain_follower::Kind::Rollback => {
323                    let rollback_slot = chain_update.block_data().slot();
325
326                    let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
327                    if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
328                        error!(chain=%params.chain, error=%error,
329                            "Chain follower rollback, purging volatile data task failed."
330                        );
331                    } else {
332                        #[allow(clippy::arithmetic_side_effects)]
334                        let purge_slots = params
335                            .last_indexed_block
336                            .as_ref()
337                            .map_or(0.into(), |l| l.slot_or_default() - rollback_slot);
339
340                        let _ = event_sender.send(event::ChainIndexerEvent::ForwardDataPurged {
341                            purge_slots: purge_slots.into(),
342                        });
343
344                        let block = chain_update.block_data();
346                        if let Err(error) = index_block(block).await {
347                            let error_msg =
348                                format!("Failed to index block after rollback {}", block.point());
349                            error!(chain=%params.chain, error=%error, params=%params, error_msg);
350                            return params.done(
351                                first_indexed_block,
352                                first_immutable,
353                                last_indexed_block,
354                                last_immutable,
355                                blocks_synced,
356                                Err(error.context(error_msg)),
357                            );
358                        }
359
360                        update_block_state(
361                            block,
362                            &mut first_indexed_block,
363                            &mut first_immutable,
364                            &mut last_indexed_block,
365                            &mut last_immutable,
366                            &mut blocks_synced,
367                        );
368                    }
369                },
370            }
371        }
372
373        let result = params.done(
374            first_indexed_block,
375            first_immutable,
376            last_indexed_block,
377            last_immutable,
378            blocks_synced,
379            Ok(()),
380        );
381
382        info!(chain = %result.chain, result=%result, "Indexing Blockchain Completed: OK");
383
384        result
385    })
386}
387
388fn update_block_state(
390    block: &MultiEraBlock, first_indexed_block: &mut Option<Point>, first_immutable: &mut bool,
391    last_indexed_block: &mut Option<Point>, last_immutable: &mut bool, blocks_synced: &mut u64,
392) {
393    *last_immutable = block.is_immutable();
394    *last_indexed_block = Some(block.point());
395
396    if first_indexed_block.is_none() {
397        *first_immutable = *last_immutable;
398        *first_indexed_block = Some(block.point());
399    }
400
401    *blocks_synced = blocks_synced.saturating_add(1);
402}
403
404struct SyncTask {
407    cfg: chain_follower::EnvVars,
409
410    sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
412
413    current_sync_tasks: u16,
415
416    start_slot: Slot,
418
419    immutable_tip_slot: Slot,
421
422    live_tip_slot: Slot,
424
425    sync_status: Vec<SyncStatus>,
427
428    event_channel: (
430        broadcast::Sender<event::ChainIndexerEvent>,
431        broadcast::Receiver<event::ChainIndexerEvent>,
432    ),
433}
434
435impl SyncTask {
436    fn new(cfg: chain_follower::EnvVars) -> SyncTask {
438        Self {
439            cfg,
440            sync_tasks: FuturesUnordered::new(),
441            start_slot: 0.into(),
442            current_sync_tasks: 0,
443            immutable_tip_slot: 0.into(),
444            live_tip_slot: 0.into(),
445            sync_status: Vec::new(),
446            event_channel: broadcast::channel(10),
447        }
448    }
449
450    fn add_sync_task(
452        &mut self, params: SyncParams, event_sender: broadcast::Sender<event::ChainIndexerEvent>,
453    ) {
454        self.sync_tasks.push(sync_subchain(params, event_sender));
455        self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
456        debug!(current_sync_tasks=%self.current_sync_tasks, "Added new Sync Task");
457        self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
458            current_sync_tasks: self.current_sync_tasks,
459        });
460    }
461
462    fn sync_task_finished(&mut self) {
464        self.current_sync_tasks = self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
465            error!("current_sync_tasks -= 1 overflow");
466            0
467        });
468        debug!(current_sync_tasks=%self.current_sync_tasks, "Finished Sync Task");
469        self.dispatch_event(event::ChainIndexerEvent::SyncTasksChanged {
470            current_sync_tasks: self.current_sync_tasks,
471        });
472    }
473
474    #[allow(clippy::too_many_lines)]
483    async fn run(&mut self) {
484        let tips = ChainFollower::get_tips(self.cfg.chain).await;
487        self.immutable_tip_slot = tips.0.slot_or_default();
488        self.live_tip_slot = tips.1.slot_or_default();
489        info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Running the primary blockchain follower task.");
490
491        self.dispatch_event(event::ChainIndexerEvent::ImmutableTipSlotChanged {
492            immutable_slot: self.immutable_tip_slot,
493            live_slot: self.live_tip_slot,
494        });
495        self.dispatch_event(event::ChainIndexerEvent::LiveTipSlotChanged {
496            immutable_slot: self.immutable_tip_slot,
497            live_slot: self.live_tip_slot,
498        });
499
500        drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
506
507        info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state for indexing");
508        self.sync_status = get_sync_status().await;
509        debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
510
511        self.add_sync_task(
514            SyncParams::new(
515                self.cfg.chain,
516                Point::fuzzy(self.immutable_tip_slot),
517                Point::TIP,
518            ),
519            self.event_channel.0.clone(),
520        );
521        self.dispatch_event(event::ChainIndexerEvent::SyncLiveChainStarted);
522
523        self.start_immutable_followers();
524        if self.sync_tasks.len() == 1 {
527            set_follower_immutable_first_reached_tip();
528            self.dispatch_event(event::ChainIndexerEvent::SyncImmutableChainCompleted);
529        } else {
530            self.dispatch_event(event::ChainIndexerEvent::SyncImmutableChainStarted);
531        }
532
533        while let Some(completed) = self.sync_tasks.next().await {
539            self.sync_task_finished();
541
542            match completed {
543                Ok(finished) => {
544                    let tips = ChainFollower::get_tips(self.cfg.chain).await;
545                    let immutable_tip_slot = tips.0.slot_or_default();
546                    let live_tip_slot = tips.1.slot_or_default();
547                    info!(immutable_tip_slot=?immutable_tip_slot, live_tip_slot=?live_tip_slot, "Chain Indexer task finished");
548                    if finished.end == Point::TIP {
556                        if let Some(ref roll_forward_point) = finished.follower_roll_forward {
557                            self.immutable_tip_slot = roll_forward_point.slot_or_default();
560
561                            self.dispatch_event(
562                                event::ChainIndexerEvent::ImmutableTipSlotChanged {
563                                    immutable_slot: self.immutable_tip_slot,
564                                    live_slot: self.live_tip_slot,
565                                },
566                            );
567                            info!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished reaching TIP.");
568
569                            self.start_immutable_followers();
570                            self.dispatch_event(
571                                event::ChainIndexerEvent::SyncImmutableChainStarted,
572                            );
573                        } else {
574                            error!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished without to reach TIP.");
575                        }
576
577                        self.add_sync_task(finished.retry(), self.event_channel.0.clone());
579                    } else if let Some(result) = finished.result.as_ref() {
580                        match result {
581                            Ok(()) => {
582                                info!(chain=%self.cfg.chain, report=%finished,
583                                    "The Immutable follower completed successfully.");
584
585                                finished.last_indexed_block.as_ref().inspect(|block| {
586                                    self.dispatch_event(
587                                        event::ChainIndexerEvent::IndexedSlotProgressed {
588                                            slot: block.slot_or_default(),
589                                        },
590                                    );
591                                });
592
593                                self.start_immutable_followers();
596                            },
597                            Err(error) => {
598                                error!(chain=%self.cfg.chain, report=%finished, error=%error,
599                                        "An Immutable follower failed, restarting it.");
600                                self.add_sync_task(finished.retry(), self.event_channel.0.clone());
603                            },
604                        }
605                    } else {
606                        error!(chain=%self.cfg.chain, report=%finished,
607                        "BUG: The Immutable follower completed, but without a proper result.");
608                    }
609                },
610                Err(error) => {
611                    error!(chain=%self.cfg.chain, error=%error, "BUG: Sync task failed. Can not restart it, not enough information.  Sync is probably failed at this point.");
612                },
613            }
614
615            if self.sync_tasks.len() == 1 {
624                set_follower_immutable_first_reached_tip();
625                self.dispatch_event(event::ChainIndexerEvent::SyncImmutableChainCompleted);
626
627                #[allow(clippy::arithmetic_side_effects)]
630                let purge_to_slot =
631                    self.immutable_tip_slot - Settings::purge_backward_slot_buffer();
632                let purge_condition = PurgeCondition::PurgeBackwards(purge_to_slot);
633                info!(chain=%self.cfg.chain, purge_to_slot=?purge_to_slot, "Backwards purging volatile data.");
634                if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
635                    error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
636                } else {
637                    self.dispatch_event(event::ChainIndexerEvent::BackwardDataPurged);
638                }
639            }
640        }
641
642        error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped.  This is an unexpected error!");
643    }
644
645    fn start_immutable_followers(&mut self) {
647        if self.start_slot < self.immutable_tip_slot {
653            while self.current_sync_tasks < self.cfg.sync_tasks {
655                let end_slot = self.immutable_tip_slot.min(
656                    (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
657                        .into(),
658                );
659
660                if let Some((first_point, last_point)) =
661                    self.get_syncable_range(self.start_slot, end_slot)
662                {
663                    self.add_sync_task(
664                        SyncParams::new(self.cfg.chain, first_point, last_point.clone()),
665                        self.event_channel.0.clone(),
666                    );
667                }
668
669                self.start_slot = end_slot;
672
673                if end_slot == self.immutable_tip_slot {
674                    break;
675                }
676            }
677        }
678    }
679
680    fn get_syncable_range(&self, start: Slot, end: Slot) -> Option<(Point, Point)> {
685        for sync_block in &self.sync_status {
686            if start >= sync_block.start_slot && start <= sync_block.end_slot {
688                if end <= sync_block.end_slot {
690                    return None;
691                }
692
693                return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
700            }
701        }
702
703        let start_slot = if start == 0.into() {
704            Point::ORIGIN
705        } else {
706            Point::fuzzy(start)
707        };
708
709        Some((start_slot, Point::fuzzy(end)))
710    }
711}
712
713impl event::EventTarget<event::ChainIndexerEvent> for SyncTask {
714    fn add_event_listener(&mut self, listener: event::EventListenerFn<event::ChainIndexerEvent>) {
715        let mut rx = self.event_channel.0.subscribe();
716        tokio::spawn(async move {
717            loop {
718                match rx.recv().await {
719                    Ok(event) => (listener)(&event),
720                    Err(broadcast::error::RecvError::Lagged(lag)) => {
721                        debug!(lag = lag, "Sync tasks event listener lagged");
722                    },
723                    Err(broadcast::error::RecvError::Closed) => break,
724                }
725            }
726        });
727    }
728
729    fn dispatch_event(&self, message: event::ChainIndexerEvent) {
730        debug!(event = ?message, "Chain Indexer Event");
731        let _ = self.event_channel.0.send(message);
732    }
733}
734
735pub(crate) async fn start_followers() -> anyhow::Result<()> {
737    let cfg = Settings::follower_cfg();
738
739    cfg.log();
741
742    start_sync_for(&cfg).await?;
744
745    tokio::spawn(async move {
746        use self::event::ChainIndexerEvent as Event;
747        use crate::metrics::chain_indexer::reporter;
748
749        let api_host_names = Settings::api_host_names().join(",");
750        let service_id = Settings::service_id();
751        let network = cfg.chain.to_string();
752
753        let mut sync_task = SyncTask::new(cfg);
754
755        sync_task.add_event_listener(Box::new(move |event: &Event| {
757            if let Event::SyncLiveChainStarted = event {
758                reporter::LIVE_REACHED_TIP
759                    .with_label_values(&[&api_host_names, service_id, &network])
760                    .set(0);
761            }
762            if let Event::SyncImmutableChainStarted = event {
763                reporter::IMMUTABLE_REACHED_TIP
764                    .with_label_values(&[&api_host_names, service_id, &network])
765                    .set(0);
766            }
767            if let Event::SyncLiveChainCompleted = event {
768                reporter::LIVE_REACHED_TIP
769                    .with_label_values(&[&api_host_names, service_id, &network])
770                    .set(1);
771            }
772            if let Event::SyncImmutableChainCompleted = event {
773                reporter::IMMUTABLE_REACHED_TIP
774                    .with_label_values(&[&api_host_names, service_id, &network])
775                    .set(1);
776            }
777            if let Event::SyncTasksChanged { current_sync_tasks } = event {
778                reporter::RUNNING_INDEXER_TASKS_COUNT
779                    .with_label_values(&[&api_host_names, service_id, &network])
780                    .set(From::from(*current_sync_tasks));
781            }
782            if let Event::LiveTipSlotChanged {
783                live_slot,
784                immutable_slot,
785            } = event
786            {
787                reporter::CURRENT_LIVE_TIP_SLOT
788                    .with_label_values(&[&api_host_names, service_id, &network])
789                    .set(i64::try_from(u64::from(*live_slot)).unwrap_or(-1));
790                reporter::SLOT_TIP_DIFF
791                    .with_label_values(&[&api_host_names, service_id, &network])
792                    .set(
793                        u64::from(live_slot.sub(*immutable_slot))
794                            .try_into()
795                            .unwrap_or(-1),
796                    );
797            }
798            if let Event::ImmutableTipSlotChanged {
799                live_slot,
800                immutable_slot,
801            } = event
802            {
803                reporter::CURRENT_IMMUTABLE_TIP_SLOT
804                    .with_label_values(&[&api_host_names, service_id, &network])
805                    .set(i64::try_from(u64::from(*immutable_slot)).unwrap_or(-1));
806
807                reporter::SLOT_TIP_DIFF
808                    .with_label_values(&[&api_host_names, service_id, &network])
809                    .set(
810                        u64::from(live_slot.sub(*immutable_slot))
811                            .try_into()
812                            .unwrap_or(-1),
813                    );
814            }
815            if let Event::IndexedSlotProgressed { slot } = event {
816                reporter::HIGHEST_COMPLETE_INDEXED_SLOT
817                    .with_label_values(&[&api_host_names, service_id, &network])
818                    .set(i64::try_from(u64::from(*slot)).unwrap_or(-1));
819            }
820            if let Event::BackwardDataPurged = event {
821                reporter::TRIGGERED_BACKWARD_PURGES_COUNT
822                    .with_label_values(&[&api_host_names, service_id, &network])
823                    .inc();
824            }
825            if let Event::ForwardDataPurged { purge_slots } = event {
826                reporter::TRIGGERED_FORWARD_PURGES_COUNT
827                    .with_label_values(&[&api_host_names, service_id, &network])
828                    .inc();
829                reporter::PURGED_SLOTS
830                    .with_label_values(&[&api_host_names, service_id, &network])
831                    .set(i64::try_from(*purge_slots).unwrap_or(-1));
832            }
833        }));
834
835        sync_task.run().await;
836    });
837
838    Ok(())
839}