All files / src/services AddressTracker.ts

90.9% Statements 30/33
33.33% Branches 1/3
100% Functions 14/14
88.46% Lines 23/26

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96    43x                               43x 43x               43x   242x 242x   123x 123x             123x 121x 121x   119x               2x       121x 121x       5x 8x         5x     5x                 123x   5x 5x   13x                      
import { GroupedAddress } from '@cardano-sdk/key-management';
import { Logger } from 'ts-log';
import {
  Observable,
  Subject,
  defaultIfEmpty,
  distinctUntilChanged,
  filter,
  map,
  merge,
  mergeMap,
  of,
  shareReplay,
  switchMap,
  take,
  tap
} from 'rxjs';
import { WalletStores } from '../persistence';
import { groupedAddressesEquals } from './util';
import { sortAddresses } from './util/sortAddresses';
 
export type AddressTrackerDependencies = {
  store: WalletStores['addresses'];
  addressDiscovery$: Observable<GroupedAddress[]>;
  logger: Logger;
};
 
export const createAddressTracker = ({ addressDiscovery$, store, logger }: AddressTrackerDependencies) => {
  // eslint-disable-next-line unicorn/consistent-function-scoping
  const storeAddresses = () => (addresses$: Observable<GroupedAddress[]>) =>
    addresses$.pipe(switchMap((addresses) => store.set(addresses).pipe(map(() => addresses))));
 
  const newAddresses$ = new Subject<GroupedAddress[]>();
  const addresses$ = store
    .get()
    .pipe(
      defaultIfEmpty([]),
      mergeMap(
        // derive addresses if none available
        (addresses) => {
          if (addresses.length === 0) {
            logger.debug('No addresses available; initiating address discovery process');
            return addressDiscovery$.pipe(
              tap((derivedAddresses) => {
                Iif (derivedAddresses.length === 0) {
                  throw new Error('Address discovery derived 0 addresses');
                }
              }),
              storeAddresses()
            );
          }
 
          return of(addresses);
        }
      ),
      switchMap((addresses) => {
        const addressCache = [...addresses];
        return merge(
          of(addresses),
          newAddresses$.pipe(
            map((newAddresses) => {
              for (const newAddress of newAddresses) {
                Iif (addressCache.some((addr) => addr.address === newAddress.address)) {
                  logger.warn('Address already exists', newAddress.address);
                  continue;
                }
 
                addressCache.push(newAddress);
              }
 
              return [...addressCache];
            }),
            storeAddresses()
          )
        ).pipe(distinctUntilChanged(groupedAddressesEquals));
      })
    )
    .pipe(shareReplay(1));
 
  return {
    addAddresses: (newAddresses: GroupedAddress[]) => {
      newAddresses$.next(newAddresses);
      return addresses$.pipe(
        filter((addresses) =>
          newAddresses.every(({ address: newAddress }) => addresses.some(({ address }) => address === newAddress))
        ),
        take(1)
      );
    },
    addresses$: addresses$.pipe(map(sortAddresses)),
    shutdown: newAddresses$.complete.bind(newAddresses$)
  };
};
 
export type AddressTracker = ReturnType<typeof createAddressTracker>;