All files / src/services TipTracker.ts

90.47% Statements 19/21
100% Branches 10/10
83.33% Functions 10/12
90% Lines 18/20

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111  42x   42x                                                                             42x 134x   42x 129x 129x                             129x           416x                       158x           154x         140x       1x     115x     129x 129x       2x 2x      
import { Cardano } from '@cardano-sdk/core';
import { ConnectionStatus, PersistentDocumentTrackerSubject, tipEquals } from './util';
import { DocumentStore } from '../persistence';
import {
  EMPTY,
  Observable,
  Subject,
  auditTime,
  combineLatest,
  concat,
  distinctUntilChanged,
  exhaustMap,
  filter,
  finalize,
  merge,
  of,
  startWith,
  switchMap,
  takeUntil,
  tap,
  timeout
} from 'rxjs';
import { Logger } from 'ts-log';
import { Milliseconds } from './types';
import { SyncStatus } from '../types';
export interface TipTrackerProps {
  provider$: Observable<Cardano.Tip>;
  syncStatus: SyncStatus;
  connectionStatus$: Observable<ConnectionStatus>;
  /** control whether tip tracker should be polling */
  pollController$: Observable<boolean>;
  store: DocumentStore<Cardano.Tip>;
  /** Once */
  minPollInterval: Milliseconds;
  maxPollInterval: Milliseconds;
  logger: Logger;
}
 
export interface TipTrackerInternals {
  externalTrigger$?: Subject<void>;
}
 
const triggerOrInterval$ = <T = unknown>(trigger$: Observable<T>, interval: number): Observable<T | boolean> =>
  trigger$.pipe(timeout({ each: interval, with: () => concat(of(true), triggerOrInterval$(trigger$, interval)) }));
 
export class TipTracker extends PersistentDocumentTrackerSubject<Cardano.Tip> {
  #externalTrigger$ = new Subject<void>();
  #logger: Logger;
 
  constructor(
    {
      provider$,
      pollController$,
      minPollInterval,
      maxPollInterval,
      store,
      syncStatus,
      connectionStatus$,
      logger
    }: TipTrackerProps,
    { externalTrigger$ = new Subject() }: TipTrackerInternals = {}
  ) {
    super(
      merge(
        // schedule a fetch:
        // - after some delay once fully synced and online
        // - if it's not settled for maxPollInterval
        combineLatest([
          triggerOrInterval$(syncStatus.isSettled$.pipe(filter((isSettled) => isSettled)), maxPollInterval).pipe(
            // Do not allow more than one fetch per minPollInterval.
            // The first syncStatus starts a minPollInterval timer. Only the latest emission from the minPollInterval
            // is emitted in case the syncStatus changes multiple times.
            auditTime(minPollInterval),
            // trigger fetch on start
            startWith(null)
          ),
          connectionStatus$,
          pollController$
        ]).pipe(
          tap(([, connectionStatus, poll]) => {
            logger.debug(
              connectionStatus === ConnectionStatus.down || !poll ? 'Skipping fetch tip' : 'Fetching tip...'
            );
          }),
          // Throttle syncing by interval, cancel ongoing request on external trigger
          exhaustMap(([, connectionStatus, poll]) =>
            connectionStatus === ConnectionStatus.down || !poll
              ? EMPTY
              : provider$.pipe(takeUntil(externalTrigger$.pipe(tap(() => logger.debug('Tip fetch canceled')))))
          ),
          distinctUntilChanged(tipEquals),
          tap((tip) => logger.debug('Fetched new tip', tip))
        ),
        // Always immediately restart request on external trigger
        externalTrigger$.pipe(
          switchMap(() => provider$),
          tap((tip) => logger.debug('External trigger fetched tip', tip))
        )
      ).pipe(finalize(() => this.#externalTrigger$.complete())),
      store
    );
    this.#externalTrigger$ = externalTrigger$;
    this.#logger = logger;
  }
 
  sync() {
    this.#logger.debug('Manual sync triggered');
    this.#externalTrigger$.next();
  }
}