All files / src/services AssetsTracker.ts

100% Statements 58/58
100% Branches 15/15
100% Functions 30/30
100% Lines 51/51

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 14642x   42x                                   42x 42x 42x 42x 42x   42x 482x 128x       42x 131x 131x   42x 42x 42x         125x 228x   228x       238x               128x                             42x 490x 127x   42x       131x 131x 131x 131x 131x   125x   125x           125x             127x       127x 230x 230x     127x   18x 18x 18x 1x             127x   127x         245x 354x         245x 145x   141x 275x 141x        
import { Asset, Cardano } from '@cardano-sdk/core';
import { Logger } from 'ts-log';
import {
  Observable,
  buffer,
  concat,
  connect,
  debounceTime,
  distinctUntilChanged,
  filter,
  map,
  of,
  share,
  switchMap,
  take,
  tap
} from 'rxjs';
import { RetryBackoffConfig } from 'backoff-rxjs';
import { TrackedAssetProvider } from './ProviderTracker';
import { TransactionsTracker } from './types';
import { coldObservableProvider, concatAndCombineLatest } from '@cardano-sdk/util-rxjs';
import { deepEquals, isNotNil } from '@cardano-sdk/util';
import { newTransactions$ } from './TransactionsTracker';
import chunk from 'lodash/chunk.js';
import uniq from 'lodash/uniq.js';
 
const isAssetInfoComplete = (assetInfo: Asset.AssetInfo): boolean =>
  assetInfo.nftMetadata !== undefined && assetInfo.tokenMetadata !== undefined;
const isEveryAssetInfoComplete = (assetInfos: Asset.AssetInfo[]): boolean => assetInfos.every(isAssetInfoComplete);
 
/** Buffers the source Observable values emitted at the same time (within 1 ms) */
const bufferTick =
  <T>() =>
  (source$: Observable<T>) =>
    source$.pipe(connect((shared$) => shared$.pipe(buffer(shared$.pipe(debounceTime(1))))));
 
const ASSET_INFO_FETCH_CHUNK_SIZE = 100;
export const createAssetService =
  (
    assetProvider: TrackedAssetProvider,
    retryBackoffConfig: RetryBackoffConfig,
    onFatalError?: (value: unknown) => void
  ) =>
  (assetIds: Cardano.AssetId[]) =>
    concatAndCombineLatest(
      chunk(assetIds, ASSET_INFO_FETCH_CHUNK_SIZE).map((assetIdsChunk) =>
        coldObservableProvider({
          onFatalError,
          pollUntil: isEveryAssetInfoComplete,
          provider: () =>
            assetProvider.getAssets({
              assetIds: assetIdsChunk,
              extraData: { nftMetadata: true, tokenMetadata: true }
            }),
          retryBackoffConfig,
          trigger$: of(true) // fetch only once
        })
      )
    ).pipe(map((arr) => arr.flat())); // concat the chunk results
export type AssetService = ReturnType<typeof createAssetService>;
 
export interface AssetsTrackerProps {
  transactionsTracker: TransactionsTracker;
  assetProvider: TrackedAssetProvider;
  retryBackoffConfig: RetryBackoffConfig;
  logger: Logger;
  onFatalError?: (value: unknown) => void;
}
 
interface AssetsTrackerInternals {
  assetService?: AssetService;
}
 
const uniqueAssetIds = ({ body: { outputs } }: Cardano.OnChainTx) =>
  outputs.flatMap(({ value: { assets } }) => (assets ? [...assets.keys()] : []));
const flatUniqueAssetIds = (txes: Cardano.OnChainTx[]) => uniq(txes.flatMap(uniqueAssetIds));
 
export const createAssetsTracker = (
  { assetProvider, transactionsTracker: { history$ }, retryBackoffConfig, logger, onFatalError }: AssetsTrackerProps,
  { assetService = createAssetService(assetProvider, retryBackoffConfig, onFatalError) }: AssetsTrackerInternals = {}
) =>
  new Observable<Map<Cardano.AssetId, Asset.AssetInfo>>((subscriber) => {
    let fetchedAssetInfoMap = new Map<Cardano.AssetId, Asset.AssetInfo>();
    const allAssetIds = new Set<Cardano.AssetId>();
    const sharedHistory$ = history$.pipe(share());
    return concat(
      sharedHistory$.pipe(
        map((historyTxs) => uniq(historyTxs.flatMap(uniqueAssetIds))),
        tap((assetIds) =>
          logger.debug(
            assetIds.length > 0
              ? `Historical total assets: ${assetIds.length}`
              : 'Setting assetProvider stats as initialized'
          )
        ),
        tap((assetIds) => assetIds.length === 0 && assetProvider.setStatInitialized(assetProvider.stats.getAsset$)),
        take(1)
      ),
      newTransactions$(sharedHistory$).pipe(
        bufferTick(),
        map(flatUniqueAssetIds),
        map((assetIds) => {
          const newAssetIds = assetIds.filter((assetId) => !allAssetIds.has(assetId));
          // re-fetch all asset infos that either
          // - weren't fetched yet
          // - were fetched with incomplete metadata
          const assetIdsToRefetch = [...allAssetIds.values()].filter((assetId) => {
            const assetInfo = fetchedAssetInfoMap.get(assetId);
            return !assetInfo || !isAssetInfoComplete(assetInfo);
          });
          // When we see a CIP-68 reference NFT, it means metadata for a user NFT that we own might have changed
          const assetsWithCip68MetadataUpdates = assetIds
            .map((assetId) => {
              const assetName = Cardano.AssetId.getAssetName(assetId);
              const decoded = Asset.AssetNameLabel.decode(assetName);
              if (decoded?.label === Asset.AssetNameLabelNum.ReferenceNFT) {
                return Cardano.AssetId.fromParts(
                  Cardano.AssetId.getPolicyId(assetId),
                  Asset.AssetNameLabel.encode(decoded.content, Asset.AssetNameLabelNum.UserNFT)
                );
              }
            })
            .filter(isNotNil);
          return uniq([...newAssetIds, ...assetIdsToRefetch, ...assetsWithCip68MetadataUpdates]);
        }),
        filter((assetIds) => assetIds.length > 0)
      )
    )
      .pipe(
        tap((assetIds) => {
          for (const assetId of assetIds) {
            allAssetIds.add(assetId);
          }
        }),
        // Restart inner observable if there are new assets to be fetched,
        // otherwise the whole pipe will hang waiting for all assetInfos to resolve
        switchMap((assetIdsToFetch) => (assetIdsToFetch.length > 0 ? assetService(assetIdsToFetch) : of([]))),
        map((fetchedAssetInfos) => [...[...fetchedAssetInfoMap.values()].filter(isNotNil), ...fetchedAssetInfos]),
        distinctUntilChanged(deepEquals), // It optimizes to not process duplicate emissions of the assets
        tap((assetInfos) => logger.debug(`Got metadata for ${assetInfos.length} assets`)),
        map((assetInfos) => new Map(assetInfos.map((assetInfo) => [assetInfo.assetId, assetInfo]))),
        tap((v) => (fetchedAssetInfoMap = v))
      )
      .subscribe(subscriber);
  });