All files / src/Projection ProjectionHttpService.ts

91.17% Statements 31/34
50% Branches 3/6
92.85% Functions 13/14
93.75% Lines 30/32

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118  35x                             35x 35x     35x                               35x 5x 5x 5x     35x 5x   4x           4x               4x       4x       1x                         35x 6x 6x 6x 6x 6x           6x   13x         6x 6x 6x       5x       5x 5x       5x        
import { BaseProjectionEvent } from '@cardano-sdk/projection';
import {
  EMPTY,
  Observable,
  OperatorFunction,
  Subscription,
  last,
  map,
  merge,
  of,
  share,
  startWith,
  switchMap,
  takeUntil,
  timer
} from 'rxjs';
import { HealthCheckResponse, Milliseconds } from '@cardano-sdk/core';
import { HttpService } from '../Http';
import { Logger } from 'ts-log';
import { ProjectionName } from './prepareTypeormProjection';
import express from 'express';
 
export interface ProjectionServiceProps<T> {
  projection$: Observable<T>;
  projectionNames: ProjectionName[];
  /** Service will report unhealthy if it didn't project a block in `healthTimeout` ms. 60_000 by default. */
  healthTimeout?: Milliseconds;
  dryRun?: boolean;
}
 
export interface ProjectionServiceDependencies {
  logger: Logger;
  router?: express.Router;
}
 
const whileSourceOpen =
  <In, Out>(op: OperatorFunction<In, Out>): OperatorFunction<In, Out> =>
  (source$) => {
    const sharedEvt$ = source$.pipe(share());
    return sharedEvt$.pipe(op, takeUntil(sharedEvt$.pipe(last(null, null))));
  };
 
const toProjectionHealth = (maxFrequency: Milliseconds) => (evt$: Observable<BaseProjectionEvent>) =>
  evt$.pipe(
    map((e): HealthCheckResponse => {
      Iif (e.tip === 'origin') {
        return {
          ok: false,
          reason: 'CardanoNode at origin'
        };
      }
      return {
        localNode: {
          ledgerTip: e.tip
        },
        ok: e.tip.blockNo === e.block.header.blockNo && e.tip.hash === e.block.header.hash,
        projectedTip: e.block.header
      };
    }),
    map((health) => of(health)),
    startWith(EMPTY),
    whileSourceOpen(
      switchMap((health$) =>
        merge(
          health$,
          // switchMap unsubscribes/cancels the timer if source emits sooner
          timer(maxFrequency).pipe(
            map((): HealthCheckResponse => ({ ok: false, reason: `Projection timeout (${maxFrequency}ms)` }))
          )
        )
      )
    )
  );
 
/**
 * Manages subscription to provided `projection$` observable by implementing `RunnableModule`.
 *
 * Implements `HttpService.healthCheck()` by considering itself health only when
 * local node tip is equal to the projected tip.
 */
export class ProjectionHttpService<T extends BaseProjectionEvent> extends HttpService {
  #projection$: Observable<T>;
  #projectionSubscription?: Subscription;
  #healthTimeout: Milliseconds;
  #health: HealthCheckResponse = { ok: false, reason: 'ProjectionHttpService not started' };
  #dryRun?: boolean;
 
  constructor(
    { projection$, projectionNames, healthTimeout = Milliseconds(180_000), dryRun }: ProjectionServiceProps<T>,
    { logger, router = express.Router() }: ProjectionServiceDependencies
  ) {
    super(
      `Projection(${projectionNames.join(',')})`,
      { healthCheck: async () => this.#health },
      router,
      __dirname,
      logger
    );
    this.#dryRun = dryRun;
    this.#projection$ = projection$;
    this.#healthTimeout = healthTimeout;
  }
 
  async shutdownImpl(): Promise<void> {
    this.#projectionSubscription?.unsubscribe();
  }
 
  async startImpl(): Promise<void> {
    Iif (this.#dryRun) return;
    this.#projectionSubscription = this.#projection$.pipe(toProjectionHealth(this.#healthTimeout)).subscribe({
      complete: () => {
        throw new Error('Projection stopped');
      },
      next: (health) => (this.#health = health)
    });
  }
}