All files / src/services TransactionReemitter.ts

100% Statements 67/67
100% Branches 14/14
100% Functions 27/27
100% Lines 61/61

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 17842x   42x                             42x                 42x 42x                                               42x 42x 42x 42x     42x                     133x   4x 4x 7x   12x 18x     37x   36x   36x       36x     6x 6x     18x 6x 6x   3x   6x   18x 18x       12x   12x 12x 14x 12x 12x     36x       133x 6x       11x 5x 1x 1x   4x         4x 4x 1x       1x 1x   3x         133x         133x 2x 1x 1x     133x   6x   133x   8x   2x     133x     6x        
import { Cardano, calculateStabilityWindowSlotsCount } from '@cardano-sdk/core';
import { Logger } from 'ts-log';
import {
  Observable,
  combineLatest,
  filter,
  from,
  map,
  merge,
  mergeMap,
  partition,
  scan,
  share,
  tap,
  withLatestFrom
} from 'rxjs';
 
import {
  FailedTx,
  Milliseconds,
  OutgoingOnChainTx,
  OutgoingTx,
  TransactionFailure,
  TransactionsTracker
} from './types';
import { WalletStores } from '../persistence';
import { isNotNil } from '@cardano-sdk/util';
import pick from 'lodash/pick.js';
 
export interface TransactionReemitterProps {
  transactions: Pick<TransactionsTracker, 'rollback$'> & {
    outgoing: Pick<TransactionsTracker['outgoing'], 'onChain$' | 'submitting$' | 'inFlight$'>;
  };
  stores: Pick<WalletStores, 'inFlightTransactions' | 'volatileTransactions'>;
  tipSlot$: Observable<Cardano.Slot>;
  genesisParameters$: Observable<
    Pick<Cardano.CompactGenesis, 'securityParameter' | 'activeSlotsCoefficient' | 'slotLength'>
  >;
  /**
   * It is possible that a transaction is rolled back before it is found on chain.
   * This option can be used to re-emit (and then attempt to re-submit) a transaction if it takes too long to show up on chain.
   */
  maxInterval: Milliseconds;
  logger: Logger;
}
 
export interface TransactionReemiter {
  reemit$: Observable<OutgoingTx>;
  failed$: Observable<FailedTx>;
}
 
enum txSource {
  store,
  onChain,
  submitting
}
 
export const createTransactionReemitter = ({
  transactions: {
    rollback$,
    outgoing: { onChain$, submitting$, inFlight$ }
  },
  stores,
  tipSlot$,
  maxInterval,
  genesisParameters$,
  logger
}: TransactionReemitterProps): TransactionReemiter => {
  const volatileTransactions$ = merge(
    stores.volatileTransactions.get().pipe(
      tap((txs) => logger.debug(`Store contains ${txs.length} volatile transactions`)),
      mergeMap((txs) => from(txs)),
      map((tx) => ({ source: txSource.store, tx } as const))
    ),
    onChain$.pipe(map((tx) => ({ onChain: tx, source: txSource.onChain } as const))),
    submitting$.pipe(map((tx) => ({ source: txSource.submitting, tx } as const)))
  ).pipe(
    mergeMap((vt) =>
      genesisParameters$.pipe(
        map(({ securityParameter, activeSlotsCoefficient }) =>
          calculateStabilityWindowSlotsCount({ activeSlotsCoefficient, securityParameter })
        ),
        map((sw) => ({ sw, vt }))
      )
    ),
    scan((volatiles, { vt, sw: stabilityWindowSlotsCount }) => {
      switch (vt.source) {
        case txSource.store: {
          // Do not calculate stability window for old transactions coming from the store
          volatiles = [...volatiles, vt.tx];
          break;
        }
        case txSource.submitting: {
          volatiles = volatiles.filter((tx) => {
            const isResubmittedTx = tx.id === vt.tx.id;
            if (isResubmittedTx) {
              // Volatile transactions in submitting are the ones reemitted. Remove them from volatiles
              logger.debug(`Transaction ${vt.tx.id} is being resubmitted. Remove it from volatiles`);
            }
            return !isResubmittedTx;
          });
          stores.volatileTransactions.set(volatiles);
          break;
        }
        case txSource.onChain: {
          const oldestAcceptedSlot =
            vt.onChain.slot > stabilityWindowSlotsCount ? vt.onChain.slot - stabilityWindowSlotsCount : 0;
          // Remove transactions considered stable
          logger.debug(`Removing stable transactions (slot <= ${oldestAcceptedSlot}), from volatiles`);
          logger.debug(`Adding new volatile transaction ${vt.onChain.id}`);
          volatiles = [...volatiles.filter(({ slot }) => slot > oldestAcceptedSlot), vt.onChain];
          stores.volatileTransactions.set(volatiles);
          break;
        }
      }
      return volatiles;
    }, [] as OutgoingOnChainTx[])
  );
 
  const rollbacks$ = rollback$.pipe(
    filter((tx) => !Cardano.util.isPhase2ValidationErrTx(tx)),
    withLatestFrom(volatileTransactions$),
    map(([tx, volatiles]) => {
      // Get the onChain Tx transaction to be retried
      const reemitTx = volatiles.find((txVolatile) => txVolatile.id === tx.id);
      if (!reemitTx) {
        logger.error(`Could not find onChain transaction with id ${tx.id} that was rolled back`);
        return;
      }
      return reemitTx!;
    }),
    filter(isNotNil),
    withLatestFrom(tipSlot$),
    map(([tx, tipSlot]) => {
      const invalidHereafter = tx.body?.validityInterval?.invalidHereafter;
      if (invalidHereafter && tipSlot > invalidHereafter) {
        const err: FailedTx = {
          reason: TransactionFailure.Timeout,
          ...tx
        };
        logger.error(`Rolled back transaction with id ${err.id} is no longer valid`, err.reason);
        return err;
      }
      return tx;
    }),
    share()
  );
 
  const [failed$, rollbackRetry$] = partition(rollbacks$, (v): v is FailedTx => (v as FailedTx).reason !== undefined);
 
  // If there are any transactions without `submittedAt` in store on load, it means that
  // wallet was shut down before transaction submission resolved.
  // Submission might have failed and could be retryable, so we should attempt to re-submit it.
  const unsubmitted$ = stores.inFlightTransactions.get().pipe(
    map((txs) => txs.filter(({ submittedAt }) => !submittedAt)),
    mergeMap((txs) => from(txs)),
    tap((tx) => logger.debug('Reemitting in-flight tx that was never submitted', tx.id))
  );
 
  const reemitSubmittedBefore$ = tipSlot$.pipe(
    withLatestFrom(genesisParameters$),
    map(([tip, { slotLength }]) => tip - maxInterval / (slotLength * 1000))
  );
  const reemitUnconfirmed$ = combineLatest([reemitSubmittedBefore$, inFlight$]).pipe(
    mergeMap(([reemitSubmittedBefore, inFlight]) =>
      from(inFlight.filter(({ submittedAt }) => submittedAt && submittedAt < reemitSubmittedBefore))
    ),
    tap((tx) => logger.debug('Reemitting unconfirmed in-flight tx', tx.id, 'submitted at slot', tx.submittedAt))
  );
 
  return {
    failed$,
    reemit$: merge(rollbackRetry$, unsubmitted$, reemitUnconfirmed$).pipe(
      map((tx) => pick(tx, ['cbor', 'body', 'id', 'context']))
    )
  };
};