{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.BlockFetch.ClientRegistry
  ( -- * Registry of block fetch clients
    FetchClientRegistry (..)
  , newFetchClientRegistry
  , bracketFetchClient
  , bracketKeepAliveClient
  , bracketSyncWithFetchClient
  , setFetchClientContext
  , FetchClientPolicy (..)
  , readFetchClientsStatus
  , readFetchClientsStateVars
  , readPeerGSVs
  ) where

import           Data.Functor.Contravariant (contramap)
import           Data.Map (Map)
import qualified Data.Map as Map
import           Data.Set (Set)
import qualified Data.Set as Set

import           Control.Concurrent.Class.MonadSTM.Strict
import           Control.Exception (assert)
import           Control.Monad (unless)
import           Control.Monad.Class.MonadAsync
import           Control.Monad.Class.MonadFork (MonadFork (throwTo),
                     MonadThread (ThreadId, myThreadId))
import           Control.Monad.Class.MonadThrow
import           Control.Tracer (Tracer)

import           Ouroboros.Network.BlockFetch.ClientState
import           Ouroboros.Network.DeltaQ



-- | A registry for the threads that are executing the client side of the
-- 'BlockFetch' protocol to communicate with our peers.
--
-- The registry contains the shared variables we use to communicate with these
-- threads, both to track their status and to provide instructions.
--
-- The threads add\/remove themselves to\/from this registry when they start up
-- and shut down.
--
data FetchClientRegistry peer header block m =
     FetchClientRegistry {
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTMVar
     m
     (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
      WhetherReceivingTentativeBlocks
      -> STM m (FetchClientPolicy header block m))
fcrCtxVar
         :: StrictTMVar
              m ( Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
                , WhetherReceivingTentativeBlocks
                    -> STM m (FetchClientPolicy header block m)
                ),
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer (FetchClientStateVars m header))
fcrFetchRegistry
         :: StrictTVar  m (Map peer (FetchClientStateVars m header)),
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar
     m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
fcrSyncRegistry
         :: StrictTVar  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())),
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer PeerGSV)
fcrDqRegistry
         :: StrictTVar  m (Map peer PeerGSV),
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
fcrKeepRegistry
         :: StrictTVar  m (Map peer (ThreadId m, StrictTMVar m ())),
       forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m -> StrictTVar m (Set peer)
fcrDying
         :: StrictTVar m (Set peer)
                         }

newFetchClientRegistry :: MonadSTM m
                       => m (FetchClientRegistry peer header block m)
newFetchClientRegistry :: forall (m :: * -> *) peer header block.
MonadSTM m =>
m (FetchClientRegistry peer header block m)
newFetchClientRegistry = forall peer header block (m :: * -> *).
StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
-> StrictTVar m (Map peer (FetchClientStateVars m header))
-> StrictTVar
     m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m
FetchClientRegistry forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
                                             forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO forall k a. Map k a
Map.empty
                                             forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO forall k a. Map k a
Map.empty
                                             forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO forall k a. Map k a
Map.empty
                                             forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO forall k a. Map k a
Map.empty
                                             forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO forall a. Set a
Set.empty

-- | This is needed to start a block fetch client. It provides the required
-- 'FetchClientContext'. It registers and unregisters the fetch client on
-- start and end.
--
-- It also manages synchronisation with the corresponding chain sync client.
--
bracketFetchClient :: forall m a peer header block version.
                      (MonadSTM m, MonadFork m, MonadMask m, Ord peer)
                   => FetchClientRegistry peer header block m
                   -> version
                   -> (version -> WhetherReceivingTentativeBlocks)
                   -- ^ is pipelining enabled function
                   -> peer
                   -> (FetchClientContext header block m -> m a)
                   -> m a
bracketFetchClient :: forall (m :: * -> *) a peer header block version.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer) =>
FetchClientRegistry peer header block m
-> version
-> (version -> WhetherReceivingTentativeBlocks)
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
ctxVar
                      StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry StrictTVar m (Set peer)
dyingRegistry)
                   version
version version -> WhetherReceivingTentativeBlocks
isPipeliningEnabled peer
peer FetchClientContext header block m -> m a
action = do
    StrictTMVar m ()
ksVar <- forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
    forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (StrictTMVar m ()
-> m (FetchClientContext header block m,
      (ThreadId m, StrictTMVar m ()))
register StrictTMVar m ()
ksVar) (forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (StrictTMVar m ()
-> FetchClientContext header block m
-> (ThreadId m, StrictTMVar m ())
-> m ()
unregister StrictTMVar m ()
ksVar)) (FetchClientContext header block m -> m a
action forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst)
  where
    register :: StrictTMVar m ()
             -> m ( FetchClientContext header block m
                  , (ThreadId m, StrictTMVar m ()) )
    register :: StrictTMVar m ()
-> m (FetchClientContext header block m,
      (ThreadId m, StrictTMVar m ()))
register StrictTMVar m ()
ksVar = do
      ThreadId m
tid <- forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
      FetchClientContext header block m
ctx <- forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
        -- wait for any potential older blockfetch to finish cleanup
        Map peer (FetchClientStateVars m header)
fr <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry
        forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (FetchClientStateVars m header)
fr)

        -- don't start if keepalive is attempting to die
        Set peer
dr <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Set peer)
dyingRegistry
        forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (peer
peer forall a. Ord a => a -> Set a -> Bool
`Set.notMember` Set peer
dr)

        -- blocks until setFetchClientContext is called
        (Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer, WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy) <- forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
ctxVar

        -- wait for and register with keepAlive
        Map peer PeerGSV
dqPeers <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer PeerGSV)
dqRegistry
        forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer PeerGSV
dqPeers)
        forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m ())
m ->
          forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m ())
m) forall a b. (a -> b) -> a -> b
$
          forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer (ThreadId m
tid, StrictTMVar m ()
ksVar) Map peer (ThreadId m, StrictTMVar m ())
m

        -- allocate the policy specific for this peer's negotiated version
        FetchClientPolicy header block m
policy <- do
          let pipeliningEnabled :: WhetherReceivingTentativeBlocks
pipeliningEnabled = version -> WhetherReceivingTentativeBlocks
isPipeliningEnabled version
version
          WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy WhetherReceivingTentativeBlocks
pipeliningEnabled

        FetchClientStateVars m header
stateVars <- forall (m :: * -> *) header.
MonadSTM m =>
STM m (FetchClientStateVars m header)
newFetchClientStateVars
        forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (FetchClientStateVars m header)
m ->
          forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (FetchClientStateVars m header)
m) forall a b. (a -> b) -> a -> b
$
          forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer FetchClientStateVars m header
stateVars Map peer (FetchClientStateVars m header)
m
        forall (m :: * -> *) a. Monad m => a -> m a
return FetchClientContext {
          fetchClientCtxTracer :: Tracer m (TraceFetchClientState header)
fetchClientCtxTracer    = forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (forall peerid a. peerid -> a -> TraceLabelPeer peerid a
TraceLabelPeer peer
peer) Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer,
          fetchClientCtxPolicy :: FetchClientPolicy header block m
fetchClientCtxPolicy    = FetchClientPolicy header block m
policy,
          fetchClientCtxStateVars :: FetchClientStateVars m header
fetchClientCtxStateVars = FetchClientStateVars m header
stateVars
          }
      -- Now wait for the sync client to start up.
      forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
onException (do
        (ThreadId m, StrictTMVar m ())
syncclient <- forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
          Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
syncclients <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry
          case forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup peer
peer Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
syncclients of
               Maybe (ThreadId m, StrictTMVar m (), StrictTMVar m ())
Nothing -> forall (m :: * -> *) a. MonadSTM m => STM m a
retry
               Just (ThreadId m
cTid, StrictTMVar m ()
doneVar, StrictTMVar m ()
startVar) -> do
                 forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
startVar ()
                 forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar (forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar forall a b. (a -> b) -> a -> b
$ forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars FetchClientContext header block m
ctx)
                           (forall header.
Set (Point header) -> IsIdle -> PeerFetchStatus header
PeerFetchStatusReady forall a. Set a
Set.empty IsIdle
IsIdle)
                 forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId m
cTid, StrictTMVar m ()
doneVar)
        forall (m :: * -> *) a. Monad m => a -> m a
return (FetchClientContext header block m
ctx, (ThreadId m, StrictTMVar m ())
syncclient))
        (forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
         -- we've been killed before the sync client started, cleanup
         forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar (forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar forall a b. (a -> b) -> a -> b
$ forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars FetchClientContext header block m
ctx) forall header. PeerFetchStatus header
PeerFetchStatusShutdown
         forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
ksVar ()
         forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m ())
m ->
           forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m ())
m) forall a b. (a -> b) -> a -> b
$
           forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m ())
m

         forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (FetchClientStateVars m header)
m ->
           forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (FetchClientStateVars m header)
m) forall a b. (a -> b) -> a -> b
$
           forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (FetchClientStateVars m header)
m
         )

    unregister :: StrictTMVar m ()
               -> FetchClientContext header block m
               -> (ThreadId m, StrictTMVar m ())
               -> m ()
    unregister :: StrictTMVar m ()
-> FetchClientContext header block m
-> (ThreadId m, StrictTMVar m ())
-> m ()
unregister StrictTMVar m ()
ksVar FetchClientContext { fetchClientCtxStateVars :: forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars = FetchClientStateVars m header
stateVars }
               (ThreadId m
tid, StrictTMVar m ()
doneVar)  = forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ forall a b. (a -> b) -> a -> b
$ do
      -- Signal we are shutting down
      forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$
        forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar (forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar FetchClientStateVars m header
stateVars) forall header. PeerFetchStatus header
PeerFetchStatusShutdown
      -- Kill the sync client if it is still running
      forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
tid AsyncCancelled
AsyncCancelled
      -- Wait for the sync client to terminate and finally unregister ourselves
      forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
        -- Signal to keepAlive that we're going away
        forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
ksVar ()
        forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m ())
m ->
          forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m ())
m) forall a b. (a -> b) -> a -> b
$
          forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m ())
m

        forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m ()
doneVar
        forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (FetchClientStateVars m header)
m ->
          forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (FetchClientStateVars m header)
m) forall a b. (a -> b) -> a -> b
$
          forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (FetchClientStateVars m header)
m


-- | The block fetch and chain sync clients for each peer need to synchronise
-- their startup and shutdown. This bracket operation provides that
-- synchronisation for the chain sync client.
--
-- This must be used for the chain sync client /outside/ of its own state
-- registration and deregistration.
--
bracketSyncWithFetchClient :: forall m a peer header block.
                              (MonadSTM m, MonadFork m, MonadCatch m,
                               Ord peer)
                           => FetchClientRegistry peer header block m
                           -> peer
                           -> m a
                           -> m a
bracketSyncWithFetchClient :: forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadCatch m, Ord peer) =>
FetchClientRegistry peer header block m -> peer -> m a -> m a
bracketSyncWithFetchClient (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ctxVar
                              StrictTVar m (Map peer (FetchClientStateVars m header))
_fetchRegistry StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry StrictTVar m (Map peer PeerGSV)
_dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_keepRegistry StrictTVar m (Set peer)
_dyingRegistry) peer
peer m a
action = do
    StrictTMVar m ()
doneVar <- forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
    StrictTMVar m ()
startVar <- forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
    forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> m b -> m c -> m c
bracket_ (StrictTMVar m () -> StrictTMVar m () -> m ()
register StrictTMVar m ()
doneVar StrictTMVar m ()
startVar) (StrictTMVar m () -> m ()
unregister StrictTMVar m ()
doneVar) m a
action
  where
    -- The goal here is that the block fetch client should be registered
    -- before the sync client starts running.
    --
    -- On the shutdown side, the sync client should stop before the block fetch
    -- is unregistered. This has to happen even if either client is terminated
    -- abnormally or being cancelled (which of course can happen in any order).

    register :: StrictTMVar m () -> StrictTMVar m () -> m ()
    register :: StrictTMVar m () -> StrictTMVar m () -> m ()
register StrictTMVar m ()
doneVar StrictTMVar m ()
startVar = do
      ThreadId m
tid <- forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
      -- We first register ourselves
      forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
        -- wait for any potential older chainsync clients to finish cleanup
        Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
sr <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry
        forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
sr)

        forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m ->
          forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m) forall a b. (a -> b) -> a -> b
$
          forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer (ThreadId m
tid, StrictTMVar m ()
doneVar, StrictTMVar m ()
startVar) Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m
      -- Then we wait for fetch to notice us
      forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
onException (forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m ()
startVar) (StrictTMVar m () -> m ()
unregister StrictTMVar m ()
doneVar)

    unregister :: StrictTMVar m () -> m ()
    unregister :: StrictTMVar m () -> m ()
unregister StrictTMVar m ()
doneVar =
      forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
        forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
doneVar ()
        forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m ->
          forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m) forall a b. (a -> b) -> a -> b
$
          forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m

bracketKeepAliveClient :: forall m a peer header block.
                              (MonadSTM m, MonadFork m, MonadMask m, Ord peer)
                       => FetchClientRegistry peer header block m
                       -> peer
                       -> ((StrictTVar  m (Map peer PeerGSV)) -> m a)
                       -> m a
bracketKeepAliveClient :: forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer) =>
FetchClientRegistry peer header block m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
bracketKeepAliveClient(FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ctxVar
                              StrictTVar m (Map peer (FetchClientStateVars m header))
_fetchRegistry StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_syncRegistry StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry StrictTVar m (Set peer)
dyingRegistry) peer
peer StrictTVar m (Map peer PeerGSV) -> m a
action = do
    forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> m b -> m c -> m c
bracket_ m ()
register m ()
unregister (StrictTVar m (Map peer PeerGSV) -> m a
action StrictTVar m (Map peer PeerGSV)
dqRegistry)
  where
    -- the keepAliveClient will register a PeerGSV and the block fetch client will wait on it.
    register :: m ()
    register :: m ()
register =
      forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
        -- Wait for previous keep alive client to cleanup
        Map peer PeerGSV
dr <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer PeerGSV)
dqRegistry
        forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer PeerGSV
dr)

        forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer PeerGSV)
dqRegistry forall a b. (a -> b) -> a -> b
$ \Map peer PeerGSV
m ->
          forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer PeerGSV
m) forall a b. (a -> b) -> a -> b
$
          forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer PeerGSV
defaultGSV Map peer PeerGSV
m

    -- It is possible for the keepAlive client to keep running even without a fetch client, but
    -- a fetch client shouldn't run without a keepAlive client.
    unregister :: m ()
    unregister :: m ()
unregister = forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ forall a b. (a -> b) -> a -> b
$ do
      Maybe (ThreadId m, StrictTMVar m ())
fetchclient_m <- forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
        Map peer (ThreadId m, StrictTMVar m ())
fetchclients <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry
        case forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup peer
peer Map peer (ThreadId m, StrictTMVar m ())
fetchclients of
             Maybe (ThreadId m, StrictTMVar m ())
Nothing -> do
               -- If the fetch client is already dead we remove PeerGSV ourself directly.
               forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer PeerGSV)
dqRegistry forall a b. (a -> b) -> a -> b
$ \Map peer PeerGSV
m ->
                 forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer PeerGSV
m) forall a b. (a -> b) -> a -> b
$
                 forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer PeerGSV
m
               forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
             Just (ThreadId m, StrictTMVar m ())
rc -> do
               -- Prevent a new fetchclient from starting while we are killing the old one.
               forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Set peer)
dyingRegistry forall a b. (a -> b) -> a -> b
$ \Set peer
s ->
                 forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall a. Ord a => a -> Set a -> Bool
`Set.notMember` Set peer
s) forall a b. (a -> b) -> a -> b
$
                 forall a. Ord a => a -> Set a -> Set a
Set.insert peer
peer Set peer
s
               forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (ThreadId m, StrictTMVar m ())
rc
      case Maybe (ThreadId m, StrictTMVar m ())
fetchclient_m of
           Maybe (ThreadId m, StrictTMVar m ())
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
           Just (ThreadId m
tid, StrictTMVar m ()
doneVar) -> do
             -- Cancel the fetch client.
             forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
tid AsyncCancelled
AsyncCancelled
             forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
               -- wait for fetch client to exit.
               forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m ()
doneVar
               forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer PeerGSV)
dqRegistry forall a b. (a -> b) -> a -> b
$ \Map peer PeerGSV
m ->
                 forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer PeerGSV
m) forall a b. (a -> b) -> a -> b
$
                 forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer PeerGSV
m
               forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Set peer)
dyingRegistry forall a b. (a -> b) -> a -> b
$ \Set peer
s ->
                 forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall a. Ord a => a -> Set a -> Bool
`Set.member` Set peer
s) forall a b. (a -> b) -> a -> b
$
                 forall a. Ord a => a -> Set a -> Set a
Set.delete peer
peer Set peer
s

setFetchClientContext :: MonadSTM m
                      => FetchClientRegistry peer header block m
                      -> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
                      -> (   WhetherReceivingTentativeBlocks
                          -> STM m (FetchClientPolicy header block m)
                         )
                      -> m ()
setFetchClientContext :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> (WhetherReceivingTentativeBlocks
    -> STM m (FetchClientPolicy header block m))
-> m ()
setFetchClientContext (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
ctxVar StrictTVar m (Map peer (FetchClientStateVars m header))
_ StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Set peer)
_) Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy =
    forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
      Bool
ok <- forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
ctxVar (Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer, WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy)
      forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ok forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => [Char] -> a
error [Char]
"setFetchClientContext: called more than once"

-- | A read-only 'STM' action to get the current 'PeerFetchStatus' for all
-- fetch clients in the 'FetchClientRegistry'.
--
readFetchClientsStatus :: MonadSTM m
                       => FetchClientRegistry peer header block m
                       -> STM m (Map peer (PeerFetchStatus header))
readFetchClientsStatus :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> STM m (Map peer (PeerFetchStatus header))
readFetchClientsStatus (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
registry StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Set peer)
_) =
  forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
registry forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar)

-- | A read-only 'STM' action to get the 'FetchClientStateVars' for all fetch
-- clients in the 'FetchClientRegistry'.
--
readFetchClientsStateVars :: MonadSTM m
                          => FetchClientRegistry peer header block m
                          -> STM m (Map peer (FetchClientStateVars m header))
readFetchClientsStateVars :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> STM m (Map peer (FetchClientStateVars m header))
readFetchClientsStateVars (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
registry StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Set peer)
_) = forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
registry

-- | A read-only 'STM' action to get the 'PeerGSV's for all fetch
-- clients in the 'FetchClientRegistry'.
--
readPeerGSVs :: forall block header m peer.
                ( MonadSTM m, Ord peer)
             => FetchClientRegistry peer header block m
             -> STM m (Map peer PeerGSV)
readPeerGSVs :: forall block header (m :: * -> *) peer.
(MonadSTM m, Ord peer) =>
FetchClientRegistry peer header block m -> STM m (Map peer PeerGSV)
readPeerGSVs (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
_ StrictTVar
  m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry StrictTVar m (Set peer)
_) = do
  Map peer PeerGSV
dr <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer PeerGSV)
dqRegistry
  Map peer (ThreadId m, StrictTMVar m ())
kr <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry
  -- The intersection gives us only the currently hot peers
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall k a b. Ord k => Map k a -> Map k b -> Map k a
Map.intersection Map peer PeerGSV
dr Map peer (ThreadId m, StrictTMVar m ())
kr