Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | 35x 35x 35x 35x 35x 5x 5x 5x 35x 5x 4x 4x 4x 4x 1x 35x 6x 6x 6x 6x 6x 6x 13x 6x 6x 6x 5x 5x 5x 5x | import { BaseProjectionEvent } from '@cardano-sdk/projection'; import { EMPTY, Observable, OperatorFunction, Subscription, last, map, merge, of, share, startWith, switchMap, takeUntil, timer } from 'rxjs'; import { HealthCheckResponse, Milliseconds } from '@cardano-sdk/core'; import { HttpService } from '../Http'; import { Logger } from 'ts-log'; import { ProjectionName } from './prepareTypeormProjection'; import express from 'express'; export interface ProjectionServiceProps<T> { projection$: Observable<T>; projectionNames: ProjectionName[]; /** Service will report unhealthy if it didn't project a block in `healthTimeout` ms. 60_000 by default. */ healthTimeout?: Milliseconds; dryRun?: boolean; } export interface ProjectionServiceDependencies { logger: Logger; router?: express.Router; } const whileSourceOpen = <In, Out>(op: OperatorFunction<In, Out>): OperatorFunction<In, Out> => (source$) => { const sharedEvt$ = source$.pipe(share()); return sharedEvt$.pipe(op, takeUntil(sharedEvt$.pipe(last(null, null)))); }; const toProjectionHealth = (maxFrequency: Milliseconds) => (evt$: Observable<BaseProjectionEvent>) => evt$.pipe( map((e): HealthCheckResponse => { Iif (e.tip === 'origin') { return { ok: false, reason: 'CardanoNode at origin' }; } return { localNode: { ledgerTip: e.tip }, ok: e.tip.blockNo === e.block.header.blockNo && e.tip.hash === e.block.header.hash, projectedTip: e.block.header }; }), map((health) => of(health)), startWith(EMPTY), whileSourceOpen( switchMap((health$) => merge( health$, // switchMap unsubscribes/cancels the timer if source emits sooner timer(maxFrequency).pipe( map((): HealthCheckResponse => ({ ok: false, reason: `Projection timeout (${maxFrequency}ms)` })) ) ) ) ) ); /** * Manages subscription to provided `projection$` observable by implementing `RunnableModule`. * * Implements `HttpService.healthCheck()` by considering itself health only when * local node tip is equal to the projected tip. */ export class ProjectionHttpService<T extends BaseProjectionEvent> extends HttpService { #projection$: Observable<T>; #projectionSubscription?: Subscription; #healthTimeout: Milliseconds; #health: HealthCheckResponse = { ok: false, reason: 'ProjectionHttpService not started' }; #dryRun?: boolean; constructor( { projection$, projectionNames, healthTimeout = Milliseconds(180_000), dryRun }: ProjectionServiceProps<T>, { logger, router = express.Router() }: ProjectionServiceDependencies ) { super( `Projection(${projectionNames.join(',')})`, { healthCheck: async () => this.#health }, router, __dirname, logger ); this.#dryRun = dryRun; this.#projection$ = projection$; this.#healthTimeout = healthTimeout; } async shutdownImpl(): Promise<void> { this.#projectionSubscription?.unsubscribe(); } async startImpl(): Promise<void> { Iif (this.#dryRun) return; this.#projectionSubscription = this.#projection$.pipe(toProjectionHealth(this.#healthTimeout)).subscribe({ complete: () => { throw new Error('Projection stopped'); }, next: (health) => (this.#health = health) }); } } |