All files / src/Program/services blockfrost.ts

6.77% Statements 4/59
0% Branches 0/8
0% Functions 0/10
7.27% Lines 4/55

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 13235x 35x                         35x                                     35x                                                                                                                                                                                                    
import { BlockfrostService, BlockfrostServiceConfig, BlockfrostServiceDependencies } from '../../Blockfrost';
import { HttpServer, getListen } from '../../Http';
import { Pool } from 'pg';
 
export type BlockfrostWorkerConfig = BlockfrostServiceConfig & {
  apiUrl: URL;
  createSchema: boolean;
  dropSchema: boolean;
  dryRun: boolean;
  scanInterval: number;
};
 
export type BlockfrostWorkerDependencies = BlockfrostServiceDependencies;
 
const schema = [
  `
CREATE TABLE IF NOT EXISTS blockfrost.pool_metric (
  pool_hash_id integer NOT NULL,
  last_reward_epoch smallint NOT NULL,
  cache_time double precision NOT NULL,
  blocks_created integer NOT NULL,
  delegators integer NOT NULL,
  active_stake numeric NOT NULL,
  live_stake numeric NOT NULL,
  live_pledge numeric NOT NULL,
  saturation double precision NOT NULL,
  reward_address varchar NOT NULL,
  extra varchar NOT NULL,
  status varchar NOT NULL
)`,
  'CREATE UNIQUE INDEX IF NOT EXISTS "blockfrost.pool_metric_id" ON blockfrost.pool_metric (pool_hash_id)'
];
 
export class BlockfrostWorker extends HttpServer {
  #blockfrostService: BlockfrostService;
  #createSchema: boolean;
  #db: Pool;
  #dropSchema: boolean;
  #dryRun: boolean;
  #scanInterval: number;
  #timeOut?: NodeJS.Timeout;
 
  constructor(cfg: BlockfrostWorkerConfig, deps: BlockfrostWorkerDependencies) {
    const { apiUrl, createSchema, dropSchema, dryRun, scanInterval } = cfg;
    const { db, logger } = deps;
    const blockfrostService = new BlockfrostService(cfg, deps);
 
    super(
      { listen: getListen(apiUrl), name: 'blockfrost-worker' },
      { logger, runnableDependencies: [], services: [blockfrostService] }
    );
 
    this.#blockfrostService = blockfrostService;
    this.#createSchema = createSchema;
    this.#db = db;
    this.#dropSchema = dropSchema;
    this.#dryRun = dryRun;
    this.#scanInterval = scanInterval;
  }
 
  protected async initializeImpl() {
    await super.initializeImpl();
 
    Iif (!this.#dryRun) {
      Iif (this.#dropSchema) {
        this.logger.info('Going to drop the schema');
        await this.#db.query('DROP SCHEMA IF EXISTS blockfrost CASCADE');
        this.logger.info('Schema dropped');
      }
 
      Iif (this.#createSchema) {
        this.logger.info('Going to create the schema');
        await this.#db.query('CREATE SCHEMA IF NOT EXISTS blockfrost');
        this.logger.info('Schema created');
      }
 
      this.logger.info('Going to create tables and indexes');
      for (const obj of schema) await this.#db.query(obj);
      this.logger.info('Tables and indexes created');
    }
  }
 
  protected async startImpl() {
    await super.startImpl();
 
    this.run();
  }
 
  protected async shutdownImpl() {
    Iif (this.#timeOut) clearTimeout(this.#timeOut);
 
    await super.shutdownImpl();
  }
 
  private dryRun() {
    this.logger.info('Dry run');
 
    return new Promise((resolve) => setTimeout(resolve, 100));
  }
 
  private async main() {
    const start = Date.now();
 
    this.#timeOut = undefined;
    this.logger.info('Starting new run');
 
    try {
      await (this.#dryRun ? this.dryRun() : this.#blockfrostService.refreshCache());
    } catch (error) {
      this.logger.error('Process failed with', error);
    }
 
    const restart = this.#scanInterval * 60_000 - (Date.now() - start);
 
    if (restart <= 0) {
      this.logger.info('Restarting immediately due to scanInterval expired in previous run');
      this.run();
    } else {
      this.logger.info(`Sleeping for ${restart} milliseconds to start next run`);
      this.#timeOut = setTimeout(() => this.run(), restart);
    }
  }
 
  private run() {
    this.main().catch((error) => {
      this.logger.error('Error while run', error);
      // eslint-disable-next-line unicorn/no-process-exit
      process.exit(1);
    });
  }
}