Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | 43x 43x 43x 134x 43x 130x 130x 130x 417x 153x 150x 139x 1x 118x 130x 130x 2x 2x | import { Cardano } from '@cardano-sdk/core'; import { ConnectionStatus, PersistentDocumentTrackerSubject, tipEquals } from './util'; import { DocumentStore } from '../persistence'; import { EMPTY, Observable, Subject, auditTime, combineLatest, concat, distinctUntilChanged, exhaustMap, filter, finalize, merge, of, startWith, switchMap, takeUntil, tap, timeout } from 'rxjs'; import { Logger } from 'ts-log'; import { Milliseconds } from './types'; import { SyncStatus } from '../types'; export interface TipTrackerProps { provider$: Observable<Cardano.Tip>; syncStatus: SyncStatus; connectionStatus$: Observable<ConnectionStatus>; store: DocumentStore<Cardano.Tip>; /** Once */ minPollInterval: Milliseconds; maxPollInterval: Milliseconds; logger: Logger; } export interface TipTrackerInternals { externalTrigger$?: Subject<void>; } const triggerOrInterval$ = <T = unknown>(trigger$: Observable<T>, interval: number): Observable<T | boolean> => trigger$.pipe(timeout({ each: interval, with: () => concat(of(true), triggerOrInterval$(trigger$, interval)) })); export class TipTracker extends PersistentDocumentTrackerSubject<Cardano.Tip> { #externalTrigger$ = new Subject<void>(); #logger: Logger; constructor( { provider$, minPollInterval, maxPollInterval, store, syncStatus, connectionStatus$, logger }: TipTrackerProps, { externalTrigger$ = new Subject() }: TipTrackerInternals = {} ) { super( merge( // schedule a fetch: // - after some delay once fully synced and online // - if it's not settled for maxPollInterval combineLatest([ triggerOrInterval$(syncStatus.isSettled$.pipe(filter((isSettled) => isSettled)), maxPollInterval).pipe( // Do not allow more than one fetch per minPollInterval. // The first syncStatus starts a minPollInterval timer. Only the latest emission from the minPollInterval // is emitted in case the syncStatus changes multiple times. auditTime(minPollInterval), // trigger fetch on start startWith(null) ), connectionStatus$ ]).pipe( tap(([, connectionStatus]) => { logger.debug(connectionStatus === ConnectionStatus.down ? 'Skipping fetch tip' : 'Fetching tip...'); }), // Throttle syncing by interval, cancel ongoing request on external trigger exhaustMap(([, connectionStatus]) => connectionStatus === ConnectionStatus.down ? EMPTY : provider$.pipe(takeUntil(externalTrigger$.pipe(tap(() => logger.debug('Tip fetch canceled'))))) ), distinctUntilChanged(tipEquals), tap((tip) => logger.debug('Fetched new tip', tip)) ), // Always immediately restart request on external trigger externalTrigger$.pipe( switchMap(() => provider$), tap((tip) => logger.debug('External trigger fetched tip', tip)) ) ).pipe(finalize(() => this.#externalTrigger$.complete())), store ); this.#externalTrigger$ = externalTrigger$; this.#logger = logger; } sync() { this.#logger.debug('Manual sync triggered'); this.#externalTrigger$.next(); } } |