All files / src/services TipTracker.ts

90.47% Statements 19/21
100% Branches 6/6
83.33% Functions 10/12
90% Lines 18/20

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97  42x   42x                                                                         42x 137x   42x 132x 132x           132x           277x                     156x       153x         140x       1x     120x     132x 132x       2x 2x      
import { Cardano } from '@cardano-sdk/core';
import { ConnectionStatus, PersistentDocumentTrackerSubject, tipEquals } from './util';
import { DocumentStore } from '../persistence';
import {
  EMPTY,
  Observable,
  Subject,
  auditTime,
  combineLatest,
  concat,
  distinctUntilChanged,
  exhaustMap,
  filter,
  finalize,
  merge,
  of,
  startWith,
  switchMap,
  takeUntil,
  tap,
  timeout
} from 'rxjs';
import { Logger } from 'ts-log';
import { Milliseconds } from './types';
import { SyncStatus } from '../types';
export interface TipTrackerProps {
  provider$: Observable<Cardano.Tip>;
  syncStatus: SyncStatus;
  connectionStatus$: Observable<ConnectionStatus>;
  store: DocumentStore<Cardano.Tip>;
  /** Once */
  minPollInterval: Milliseconds;
  maxPollInterval: Milliseconds;
  logger: Logger;
}
 
export interface TipTrackerInternals {
  externalTrigger$?: Subject<void>;
}
 
const triggerOrInterval$ = <T = unknown>(trigger$: Observable<T>, interval: number): Observable<T | boolean> =>
  trigger$.pipe(timeout({ each: interval, with: () => concat(of(true), triggerOrInterval$(trigger$, interval)) }));
 
export class TipTracker extends PersistentDocumentTrackerSubject<Cardano.Tip> {
  #externalTrigger$ = new Subject<void>();
  #logger: Logger;
 
  constructor(
    { provider$, minPollInterval, maxPollInterval, store, syncStatus, connectionStatus$, logger }: TipTrackerProps,
    { externalTrigger$ = new Subject() }: TipTrackerInternals = {}
  ) {
    super(
      merge(
        // schedule a fetch:
        // - after some delay once fully synced and online
        // - if it's not settled for maxPollInterval
        combineLatest([
          triggerOrInterval$(syncStatus.isSettled$.pipe(filter((isSettled) => isSettled)), maxPollInterval).pipe(
            // Do not allow more than one fetch per minPollInterval.
            // The first syncStatus starts a minPollInterval timer. Only the latest emission from the minPollInterval
            // is emitted in case the syncStatus changes multiple times.
            auditTime(minPollInterval),
            // trigger fetch on start
            startWith(null)
          ),
          connectionStatus$
        ]).pipe(
          tap(([, connectionStatus]) => {
            logger.debug(connectionStatus === ConnectionStatus.down ? 'Skipping fetch tip' : 'Fetching tip...');
          }),
          // Throttle syncing by interval, cancel ongoing request on external trigger
          exhaustMap(([, connectionStatus]) =>
            connectionStatus === ConnectionStatus.down
              ? EMPTY
              : provider$.pipe(takeUntil(externalTrigger$.pipe(tap(() => logger.debug('Tip fetch canceled')))))
          ),
          distinctUntilChanged(tipEquals),
          tap((tip) => logger.debug('Fetched new tip', tip))
        ),
        // Always immediately restart request on external trigger
        externalTrigger$.pipe(
          switchMap(() => provider$),
          tap((tip) => logger.debug('External trigger fetched tip', tip))
        )
      ).pipe(finalize(() => this.#externalTrigger$.complete())),
      store
    );
    this.#externalTrigger$ = externalTrigger$;
    this.#logger = logger;
  }
 
  sync() {
    this.#logger.debug('Manual sync triggered');
    this.#externalTrigger$.next();
  }
}