Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | 35x 35x 35x 35x 35x 5x 5x 5x 35x 5x 4x 4x 4x 4x 1x 35x 6x 6x 6x 6x 6x 6x 13x 6x 6x 6x 5x 5x 5x 5x | import { BaseProjectionEvent } from '@cardano-sdk/projection';
import {
EMPTY,
Observable,
OperatorFunction,
Subscription,
last,
map,
merge,
of,
share,
startWith,
switchMap,
takeUntil,
timer
} from 'rxjs';
import { HealthCheckResponse, Milliseconds } from '@cardano-sdk/core';
import { HttpService } from '../Http';
import { Logger } from 'ts-log';
import { ProjectionName } from './prepareTypeormProjection';
import express from 'express';
export interface ProjectionServiceProps<T> {
projection$: Observable<T>;
projectionNames: ProjectionName[];
/** Service will report unhealthy if it didn't project a block in `healthTimeout` ms. 60_000 by default. */
healthTimeout?: Milliseconds;
dryRun?: boolean;
}
export interface ProjectionServiceDependencies {
logger: Logger;
router?: express.Router;
}
const whileSourceOpen =
<In, Out>(op: OperatorFunction<In, Out>): OperatorFunction<In, Out> =>
(source$) => {
const sharedEvt$ = source$.pipe(share());
return sharedEvt$.pipe(op, takeUntil(sharedEvt$.pipe(last(null, null))));
};
const toProjectionHealth = (maxFrequency: Milliseconds) => (evt$: Observable<BaseProjectionEvent>) =>
evt$.pipe(
map((e): HealthCheckResponse => {
Iif (e.tip === 'origin') {
return {
ok: false,
reason: 'CardanoNode at origin'
};
}
return {
localNode: {
ledgerTip: e.tip
},
ok: e.tip.blockNo === e.block.header.blockNo && e.tip.hash === e.block.header.hash,
projectedTip: e.block.header
};
}),
map((health) => of(health)),
startWith(EMPTY),
whileSourceOpen(
switchMap((health$) =>
merge(
health$,
// switchMap unsubscribes/cancels the timer if source emits sooner
timer(maxFrequency).pipe(
map((): HealthCheckResponse => ({ ok: false, reason: `Projection timeout (${maxFrequency}ms)` }))
)
)
)
)
);
/**
* Manages subscription to provided `projection$` observable by implementing `RunnableModule`.
*
* Implements `HttpService.healthCheck()` by considering itself health only when
* local node tip is equal to the projected tip.
*/
export class ProjectionHttpService<T extends BaseProjectionEvent> extends HttpService {
#projection$: Observable<T>;
#projectionSubscription?: Subscription;
#healthTimeout: Milliseconds;
#health: HealthCheckResponse = { ok: false, reason: 'ProjectionHttpService not started' };
#dryRun?: boolean;
constructor(
{ projection$, projectionNames, healthTimeout = Milliseconds(180_000), dryRun }: ProjectionServiceProps<T>,
{ logger, router = express.Router() }: ProjectionServiceDependencies
) {
super(
`Projection(${projectionNames.join(',')})`,
{ healthCheck: async () => this.#health },
router,
__dirname,
logger
);
this.#dryRun = dryRun;
this.#projection$ = projection$;
this.#healthTimeout = healthTimeout;
}
async shutdownImpl(): Promise<void> {
this.#projectionSubscription?.unsubscribe();
}
async startImpl(): Promise<void> {
Iif (this.#dryRun) return;
this.#projectionSubscription = this.#projection$.pipe(toProjectionHealth(this.#healthTimeout)).subscribe({
complete: () => {
throw new Error('Projection stopped');
},
next: (health) => (this.#health = health)
});
}
}
|