All files / src/services/DelegationTracker RewardsHistory.ts

100% Statements 47/47
100% Branches 4/4
100% Functions 21/21
100% Lines 39/39

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 11842x 42x     42x         42x 42x 42x 42x   42x   126x   462x 115x   42x 42x 126x         124x 103x     103x             21x 21x         42x 131x   138x   134x       138x       42x                 132x   131x   131x     5x   130x   130x     130x               135x 135x 20x 20x               115x           115x               115x      
import { BigIntMath, isNotNil } from '@cardano-sdk/util';
import { Cardano, Reward, getCertificatesByType } from '@cardano-sdk/core';
import { KeyValueStore } from '../../persistence';
import { Logger } from 'ts-log';
import { Observable, concat, distinctUntilChanged, map, of, switchMap, tap } from 'rxjs';
import { RetryBackoffConfig } from 'backoff-rxjs';
import { RewardsHistory } from '../types';
import { TrackedRewardsProvider } from '../ProviderTracker';
import { TxWithEpoch } from './types';
import { coldObservableProvider } from '@cardano-sdk/util-rxjs';
import first from 'lodash/first.js';
import flatten from 'lodash/flatten.js';
import sortBy from 'lodash/sortBy.js';
 
const DELEGATION_EPOCHS_AHEAD_COUNT = 2;
 
export const calcFirstDelegationEpoch = (epoch: Cardano.EpochNo): number => epoch + DELEGATION_EPOCHS_AHEAD_COUNT;
 
const sumRewards = (arrayOfRewards: Reward[]) => BigIntMath.sum(arrayOfRewards.map(({ rewards }) => rewards));
const avgReward = (arrayOfRewards: Reward[]) => sumRewards(arrayOfRewards) / BigInt(arrayOfRewards.length);
 
export const createRewardsHistoryProvider =
  (rewardsProvider: TrackedRewardsProvider, retryBackoffConfig: RetryBackoffConfig) =>
  (
    rewardAccounts: Cardano.RewardAccount[],
    lowerBound: Cardano.EpochNo | null,
    onFatalError?: (value: unknown) => void
  ): Observable<Map<Cardano.RewardAccount, Reward[]>> => {
    if (lowerBound) {
      return coldObservableProvider({
        onFatalError,
        provider: () =>
          rewardsProvider.rewardsHistory({
            epochs: { lowerBound },
            rewardAccounts
          }),
        retryBackoffConfig
      });
    }
    rewardsProvider.setStatInitialized(rewardsProvider.stats.rewardsHistory$);
    return of(new Map());
  };
 
export type RewardsHistoryProvider = ReturnType<typeof createRewardsHistoryProvider>;
 
const firstDelegationEpoch$ = (transactions$: Observable<TxWithEpoch[]>, rewardAccounts: Cardano.RewardAccount[]) =>
  transactions$.pipe(
    map((transactions) =>
      first(
        transactions.filter(
          ({ tx }) => getCertificatesByType(tx, rewardAccounts, Cardano.StakeDelegationCertificateTypes).length > 0
        )
      )
    ),
    map((tx) => (isNotNil(tx) ? calcFirstDelegationEpoch(tx.epoch) : null)),
    distinctUntilChanged()
  );
 
export const createRewardsHistoryTracker = (
  transactions$: Observable<TxWithEpoch[]>,
  rewardAccounts$: Observable<Cardano.RewardAccount[]>,
  rewardsHistoryProvider: RewardsHistoryProvider,
  rewardsHistoryStore: KeyValueStore<Cardano.RewardAccount, Reward[]>,
  logger: Logger,
  onFatalError?: (value: unknown) => void
  // eslint-disable-next-line max-params
): Observable<RewardsHistory> =>
  rewardAccounts$
    .pipe(
      tap((rewardsAccounts) => logger.debug(`Fetching rewards for ${rewardsAccounts.length} accounts`)),
      switchMap((rewardAccounts) =>
        concat(
          rewardsHistoryStore
            .getValues(rewardAccounts)
            .pipe(map((rewards) => new Map(rewardAccounts.map((rewardAccount, i) => [rewardAccount, rewards[i]])))),
          firstDelegationEpoch$(transactions$, rewardAccounts).pipe(
            tap((firstEpoch) => logger.debug(`Fetching history rewards since epoch ${firstEpoch}`)),
            switchMap((firstEpoch) =>
              rewardsHistoryProvider(rewardAccounts, Cardano.EpochNo(firstEpoch!), onFatalError)
            ),
            tap((allRewards) =>
              rewardsHistoryStore.setAll([...allRewards.entries()].map(([key, value]) => ({ key, value })))
            )
          )
        )
      )
    )
    .pipe(
      map((rewardsByAccount) => {
        const all = sortBy(flatten([...rewardsByAccount.values()]), 'epoch');
        if (all.length === 0) {
          logger.debug('No rewards found');
          return {
            all: [],
            avgReward: null,
            lastReward: null,
            lifetimeRewards: 0n
          } as RewardsHistory;
        }
 
        const rewardsHistory: RewardsHistory = {
          all,
          avgReward: avgReward(all),
          lastReward: all[all.length - 1],
          lifetimeRewards: sumRewards(all)
        };
        logger.debug(
          `Rewards between epochs ${rewardsHistory.all[0].epoch} and ${
            rewardsHistory.all[rewardsHistory.all.length - 1].epoch
          }`,
          `average:${rewardsHistory.avgReward}`,
          `lastRewards:${rewardsHistory.lastReward}`,
          `lifetimeRewards:${rewardsHistory.lifetimeRewards}`
        );
        return rewardsHistory;
      })
    );