Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | 42x 42x 42x 137x 42x 132x 132x 132x 430x 161x 157x 143x 1x 117x 132x 132x 2x 2x | import { Cardano } from '@cardano-sdk/core';
import { ConnectionStatus, PersistentDocumentTrackerSubject, tipEquals } from './util';
import { DocumentStore } from '../persistence';
import {
EMPTY,
Observable,
Subject,
auditTime,
combineLatest,
concat,
distinctUntilChanged,
exhaustMap,
filter,
finalize,
merge,
of,
startWith,
switchMap,
takeUntil,
tap,
timeout
} from 'rxjs';
import { Logger } from 'ts-log';
import { Milliseconds } from './types';
import { SyncStatus } from '../types';
export interface TipTrackerProps {
provider$: Observable<Cardano.Tip>;
syncStatus: SyncStatus;
connectionStatus$: Observable<ConnectionStatus>;
/** control whether tip tracker should be polling */
pollController$: Observable<boolean>;
store: DocumentStore<Cardano.Tip>;
/** Once */
minPollInterval: Milliseconds;
maxPollInterval: Milliseconds;
logger: Logger;
}
export interface TipTrackerInternals {
externalTrigger$?: Subject<void>;
}
const triggerOrInterval$ = <T = unknown>(trigger$: Observable<T>, interval: number): Observable<T | boolean> =>
trigger$.pipe(timeout({ each: interval, with: () => concat(of(true), triggerOrInterval$(trigger$, interval)) }));
export class TipTracker extends PersistentDocumentTrackerSubject<Cardano.Tip> {
#externalTrigger$ = new Subject<void>();
#logger: Logger;
constructor(
{
provider$,
pollController$,
minPollInterval,
maxPollInterval,
store,
syncStatus,
connectionStatus$,
logger
}: TipTrackerProps,
{ externalTrigger$ = new Subject() }: TipTrackerInternals = {}
) {
super(
merge(
// schedule a fetch:
// - after some delay once fully synced and online
// - if it's not settled for maxPollInterval
combineLatest([
triggerOrInterval$(syncStatus.isSettled$.pipe(filter((isSettled) => isSettled)), maxPollInterval).pipe(
// Do not allow more than one fetch per minPollInterval.
// The first syncStatus starts a minPollInterval timer. Only the latest emission from the minPollInterval
// is emitted in case the syncStatus changes multiple times.
auditTime(minPollInterval),
// trigger fetch on start
startWith(null)
),
connectionStatus$,
pollController$
]).pipe(
tap(([, connectionStatus, poll]) => {
logger.debug(
connectionStatus === ConnectionStatus.down || !poll ? 'Skipping fetch tip' : 'Fetching tip...'
);
}),
// Throttle syncing by interval, cancel ongoing request on external trigger
exhaustMap(([, connectionStatus, poll]) =>
connectionStatus === ConnectionStatus.down || !poll
? EMPTY
: provider$.pipe(takeUntil(externalTrigger$.pipe(tap(() => logger.debug('Tip fetch canceled')))))
),
distinctUntilChanged(tipEquals),
tap((tip) => logger.debug('Fetched new tip', tip))
),
// Always immediately restart request on external trigger
externalTrigger$.pipe(
switchMap(() => provider$),
tap((tip) => logger.debug('External trigger fetched tip', tip))
)
).pipe(finalize(() => this.#externalTrigger$.complete())),
store
);
this.#externalTrigger$ = externalTrigger$;
this.#logger = logger;
}
sync() {
this.#logger.debug('Manual sync triggered');
this.#externalTrigger$.next();
}
}
|