Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | 42x 42x 42x 42x 42x 42x 42x 42x 42x 42x 133x 4x 4x 7x 12x 18x 37x 36x 36x 36x 6x 6x 18x 6x 6x 3x 6x 18x 18x 12x 12x 12x 14x 12x 12x 36x 133x 6x 11x 5x 1x 1x 4x 4x 4x 1x 1x 1x 3x 133x 133x 2x 1x 1x 133x 6x 133x 8x 2x 133x 6x | import { Cardano, calculateStabilityWindowSlotsCount } from '@cardano-sdk/core'; import { Logger } from 'ts-log'; import { Observable, combineLatest, filter, from, map, merge, mergeMap, partition, scan, share, tap, withLatestFrom } from 'rxjs'; import { FailedTx, Milliseconds, OutgoingOnChainTx, OutgoingTx, TransactionFailure, TransactionsTracker } from './types'; import { WalletStores } from '../persistence'; import { isNotNil } from '@cardano-sdk/util'; import pick from 'lodash/pick.js'; export interface TransactionReemitterProps { transactions: Pick<TransactionsTracker, 'rollback$'> & { outgoing: Pick<TransactionsTracker['outgoing'], 'onChain$' | 'submitting$' | 'inFlight$'>; }; stores: Pick<WalletStores, 'inFlightTransactions' | 'volatileTransactions'>; tipSlot$: Observable<Cardano.Slot>; genesisParameters$: Observable< Pick<Cardano.CompactGenesis, 'securityParameter' | 'activeSlotsCoefficient' | 'slotLength'> >; /** * It is possible that a transaction is rolled back before it is found on chain. * This option can be used to re-emit (and then attempt to re-submit) a transaction if it takes too long to show up on chain. */ maxInterval: Milliseconds; logger: Logger; } export interface TransactionReemiter { reemit$: Observable<OutgoingTx>; failed$: Observable<FailedTx>; } enum txSource { store, onChain, submitting } export const createTransactionReemitter = ({ transactions: { rollback$, outgoing: { onChain$, submitting$, inFlight$ } }, stores, tipSlot$, maxInterval, genesisParameters$, logger }: TransactionReemitterProps): TransactionReemiter => { const volatileTransactions$ = merge( stores.volatileTransactions.get().pipe( tap((txs) => logger.debug(`Store contains ${txs.length} volatile transactions`)), mergeMap((txs) => from(txs)), map((tx) => ({ source: txSource.store, tx } as const)) ), onChain$.pipe(map((tx) => ({ onChain: tx, source: txSource.onChain } as const))), submitting$.pipe(map((tx) => ({ source: txSource.submitting, tx } as const))) ).pipe( mergeMap((vt) => genesisParameters$.pipe( map(({ securityParameter, activeSlotsCoefficient }) => calculateStabilityWindowSlotsCount({ activeSlotsCoefficient, securityParameter }) ), map((sw) => ({ sw, vt })) ) ), scan((volatiles, { vt, sw: stabilityWindowSlotsCount }) => { switch (vt.source) { case txSource.store: { // Do not calculate stability window for old transactions coming from the store volatiles = [...volatiles, vt.tx]; break; } case txSource.submitting: { volatiles = volatiles.filter((tx) => { const isResubmittedTx = tx.id === vt.tx.id; if (isResubmittedTx) { // Volatile transactions in submitting are the ones reemitted. Remove them from volatiles logger.debug(`Transaction ${vt.tx.id} is being resubmitted. Remove it from volatiles`); } return !isResubmittedTx; }); stores.volatileTransactions.set(volatiles); break; } case txSource.onChain: { const oldestAcceptedSlot = vt.onChain.slot > stabilityWindowSlotsCount ? vt.onChain.slot - stabilityWindowSlotsCount : 0; // Remove transactions considered stable logger.debug(`Removing stable transactions (slot <= ${oldestAcceptedSlot}), from volatiles`); logger.debug(`Adding new volatile transaction ${vt.onChain.id}`); volatiles = [...volatiles.filter(({ slot }) => slot > oldestAcceptedSlot), vt.onChain]; stores.volatileTransactions.set(volatiles); break; } } return volatiles; }, [] as OutgoingOnChainTx[]) ); const rollbacks$ = rollback$.pipe( filter((tx) => !Cardano.util.isPhase2ValidationErrTx(tx)), withLatestFrom(volatileTransactions$), map(([tx, volatiles]) => { // Get the onChain Tx transaction to be retried const reemitTx = volatiles.find((txVolatile) => txVolatile.id === tx.id); if (!reemitTx) { logger.error(`Could not find onChain transaction with id ${tx.id} that was rolled back`); return; } return reemitTx!; }), filter(isNotNil), withLatestFrom(tipSlot$), map(([tx, tipSlot]) => { const invalidHereafter = tx.body?.validityInterval?.invalidHereafter; if (invalidHereafter && tipSlot > invalidHereafter) { const err: FailedTx = { reason: TransactionFailure.Timeout, ...tx }; logger.error(`Rolled back transaction with id ${err.id} is no longer valid`, err.reason); return err; } return tx; }), share() ); const [failed$, rollbackRetry$] = partition(rollbacks$, (v): v is FailedTx => (v as FailedTx).reason !== undefined); // If there are any transactions without `submittedAt` in store on load, it means that // wallet was shut down before transaction submission resolved. // Submission might have failed and could be retryable, so we should attempt to re-submit it. const unsubmitted$ = stores.inFlightTransactions.get().pipe( map((txs) => txs.filter(({ submittedAt }) => !submittedAt)), mergeMap((txs) => from(txs)), tap((tx) => logger.debug('Reemitting in-flight tx that was never submitted', tx.id)) ); const reemitSubmittedBefore$ = tipSlot$.pipe( withLatestFrom(genesisParameters$), map(([tip, { slotLength }]) => tip - maxInterval / (slotLength * 1000)) ); const reemitUnconfirmed$ = combineLatest([reemitSubmittedBefore$, inFlight$]).pipe( mergeMap(([reemitSubmittedBefore, inFlight]) => from(inFlight.filter(({ submittedAt }) => submittedAt && submittedAt < reemitSubmittedBefore)) ), tap((tx) => logger.debug('Reemitting unconfirmed in-flight tx', tx.id, 'submitted at slot', tx.submittedAt)) ); return { failed$, reemit$: merge(rollbackRetry$, unsubmitted$, reemitUnconfirmed$).pipe( map((tx) => pick(tx, ['cbor', 'body', 'id', 'context'])) ) }; }; |