All files / src/services TipTracker.ts

90.47% Statements 19/21
100% Branches 10/10
83.33% Functions 10/12
90% Lines 18/20

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111  42x   42x                                                                             42x 135x   42x 130x 130x                             130x           420x                       159x           155x         141x       1x     116x     130x 130x       2x 2x      
import { Cardano } from '@cardano-sdk/core';
import { ConnectionStatus, PersistentDocumentTrackerSubject, tipEquals } from './util';
import { DocumentStore } from '../persistence';
import {
  EMPTY,
  Observable,
  Subject,
  auditTime,
  combineLatest,
  concat,
  distinctUntilChanged,
  exhaustMap,
  filter,
  finalize,
  merge,
  of,
  startWith,
  switchMap,
  takeUntil,
  tap,
  timeout
} from 'rxjs';
import { Logger } from 'ts-log';
import { Milliseconds } from './types';
import { SyncStatus } from '../types';
export interface TipTrackerProps {
  provider$: Observable<Cardano.Tip>;
  syncStatus: SyncStatus;
  connectionStatus$: Observable<ConnectionStatus>;
  /** control whether tip tracker should be polling */
  pollController$: Observable<boolean>;
  store: DocumentStore<Cardano.Tip>;
  /** Once */
  minPollInterval: Milliseconds;
  maxPollInterval: Milliseconds;
  logger: Logger;
}
 
export interface TipTrackerInternals {
  externalTrigger$?: Subject<void>;
}
 
const triggerOrInterval$ = <T = unknown>(trigger$: Observable<T>, interval: number): Observable<T | boolean> =>
  trigger$.pipe(timeout({ each: interval, with: () => concat(of(true), triggerOrInterval$(trigger$, interval)) }));
 
export class TipTracker extends PersistentDocumentTrackerSubject<Cardano.Tip> {
  #externalTrigger$ = new Subject<void>();
  #logger: Logger;
 
  constructor(
    {
      provider$,
      pollController$,
      minPollInterval,
      maxPollInterval,
      store,
      syncStatus,
      connectionStatus$,
      logger
    }: TipTrackerProps,
    { externalTrigger$ = new Subject() }: TipTrackerInternals = {}
  ) {
    super(
      merge(
        // schedule a fetch:
        // - after some delay once fully synced and online
        // - if it's not settled for maxPollInterval
        combineLatest([
          triggerOrInterval$(syncStatus.isSettled$.pipe(filter((isSettled) => isSettled)), maxPollInterval).pipe(
            // Do not allow more than one fetch per minPollInterval.
            // The first syncStatus starts a minPollInterval timer. Only the latest emission from the minPollInterval
            // is emitted in case the syncStatus changes multiple times.
            auditTime(minPollInterval),
            // trigger fetch on start
            startWith(null)
          ),
          connectionStatus$,
          pollController$
        ]).pipe(
          tap(([, connectionStatus, poll]) => {
            logger.debug(
              connectionStatus === ConnectionStatus.down || !poll ? 'Skipping fetch tip' : 'Fetching tip...'
            );
          }),
          // Throttle syncing by interval, cancel ongoing request on external trigger
          exhaustMap(([, connectionStatus, poll]) =>
            connectionStatus === ConnectionStatus.down || !poll
              ? EMPTY
              : provider$.pipe(takeUntil(externalTrigger$.pipe(tap(() => logger.debug('Tip fetch canceled')))))
          ),
          distinctUntilChanged(tipEquals),
          tap((tip) => logger.debug('Fetched new tip', tip))
        ),
        // Always immediately restart request on external trigger
        externalTrigger$.pipe(
          switchMap(() => provider$),
          tap((tip) => logger.debug('External trigger fetched tip', tip))
        )
      ).pipe(finalize(() => this.#externalTrigger$.complete())),
      store
    );
    this.#externalTrigger$ = externalTrigger$;
    this.#logger = logger;
  }
 
  sync() {
    this.#logger.debug('Manual sync triggered');
    this.#externalTrigger$.next();
  }
}