All files / src/PgBoss stakePoolMetricsHandler.ts

50% Statements 22/44
50% Branches 5/10
40% Functions 2/5
48.57% Lines 17/35

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99  37x           37x   37x   37x 37x 37x                   37x 5x 5x 5x                           5x 5x     1x           37x                                             37x 2x             37x                                  
import { Cardano, StakePoolProvider } from '@cardano-sdk/core';
import {
  CurrentPoolMetricsEntity,
  STAKE_POOL_METRICS_UPDATE,
  StakePoolEntity,
  StakePoolMetricsUpdateJob
} from '@cardano-sdk/projection-typeorm';
import { DataSource, LessThan } from 'typeorm';
import { Logger } from 'ts-log';
import { ServiceNames } from '../Program/programs/types';
import { WorkerHandlerFactory } from './types';
import { isErrorWithConstraint } from './util';
import { missingProviderUrlOption } from '../Program/options';
import { stakePoolHttpProvider } from '@cardano-sdk/cardano-services-client';
 
interface RefreshPoolMetricsOptions {
  dataSource: DataSource;
  id: Cardano.PoolId;
  logger: Logger;
  provider: StakePoolProvider;
  slot: Cardano.Slot;
}
 
export const savePoolMetrics = async (options: RefreshPoolMetricsOptions & { metrics: Cardano.StakePoolMetrics }) => {
  const { dataSource, id, metrics, slot } = options;
  const repos = dataSource.getRepository(CurrentPoolMetricsEntity);
  const entity = {
    activeSize: metrics.size.active,
    activeStake: metrics.stake.active,
    id,
    liveDelegators: metrics.delegators,
    livePledge: metrics.livePledge,
    liveSaturation: metrics.saturation,
    liveSize: metrics.size.live,
    liveStake: metrics.stake.live,
    mintedBlocks: metrics.blocksCreated,
    slot,
    stakePool: { id }
  };
 
  try {
    await repos.upsert(entity, ['stakePool']);
  } catch (error) {
    // If no poolRegistration record is present, it was rolled back: do nothing
    if (isErrorWithConstraint(error) && error.constraint === 'FK_current_pool_metrics_stake_pool_id') return;
 
    throw error;
  }
};
 
export const refreshPoolMetrics = async (options: RefreshPoolMetricsOptions) => {
  const { id, logger, provider } = options;
 
  logger.info(`Refreshing metrics for stake pool ${id}`);
 
  try {
    const { pageResults, totalResultCount } = await provider.queryStakePools({
      filters: { identifier: { values: [{ id }] } },
      pagination: { limit: 1, startAt: 0 }
    });
 
    Iif (totalResultCount === 0) return logger.warn(`No data fetched for stake pool ${id}`);
 
    const { metrics } = pageResults[0];
 
    Iif (!metrics) return logger.warn(`No metrics found for stake pool ${id}`);
 
    await savePoolMetrics({ ...options, metrics });
  } catch (error) {
    logger.error(`Error while refreshing metrics for stake pool ${id}`, error);
  }
};
 
export const getPoolIdsToUpdate = async (dataSource: DataSource, outdatedSlot?: Cardano.Slot) =>
  outdatedSlot
    ? await dataSource.getRepository(StakePoolEntity).find({
        select: { id: true },
        where: [{ metrics: { slot: LessThan(outdatedSlot) } }, { metrics: undefined }]
      })
    : await dataSource.getRepository(StakePoolEntity).find({ select: { id: true } });
 
export const stakePoolMetricsHandlerFactory: WorkerHandlerFactory = (options) => {
  const { dataSource, logger, stakePoolProviderUrl } = options;
 
  Iif (!stakePoolProviderUrl) throw missingProviderUrlOption(STAKE_POOL_METRICS_UPDATE, ServiceNames.StakePool);
 
  const provider = stakePoolHttpProvider({ baseUrl: stakePoolProviderUrl, logger });
 
  return async (data: StakePoolMetricsUpdateJob) => {
    const { slot, outdatedSlot } = data;
 
    logger.info(
      `Starting stake pools metrics job for slot ${slot}, updating ${outdatedSlot ? 'only outdated' : 'all'}`
    );
    const pools = await getPoolIdsToUpdate(dataSource, outdatedSlot);
    for (const { id } of pools) await refreshPoolMetrics({ dataSource, id: id!, logger, provider, slot });
  };
};