{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Network.BlockFetch.ClientRegistry
(
FetchClientRegistry (..)
, newFetchClientRegistry
, bracketFetchClient
, bracketKeepAliveClient
, bracketSyncWithFetchClient
, setFetchClientContext
, FetchClientPolicy (..)
, readFetchClientsStatus
, readFetchClientsStateVars
, readPeerGSVs
) where
import Data.Functor.Contravariant (contramap)
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Set (Set)
import qualified Data.Set as Set
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (assert)
import Control.Monad (unless)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork (MonadFork (throwTo),
MonadThread (ThreadId, myThreadId))
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer)
import Ouroboros.Network.BlockFetch.ClientState
import Ouroboros.Network.DeltaQ
data FetchClientRegistry peer header block m =
FetchClientRegistry {
forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m))
fcrCtxVar
:: StrictTMVar
m ( Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
, WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
),
forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer (FetchClientStateVars m header))
fcrFetchRegistry
:: StrictTVar m (Map peer (FetchClientStateVars m header)),
forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
fcrSyncRegistry
:: StrictTVar m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())),
forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer PeerGSV)
fcrDqRegistry
:: StrictTVar m (Map peer PeerGSV),
forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
fcrKeepRegistry
:: StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())),
forall peer header block (m :: * -> *).
FetchClientRegistry peer header block m -> StrictTVar m (Set peer)
fcrDying
:: StrictTVar m (Set peer)
}
newFetchClientRegistry :: MonadSTM m
=> m (FetchClientRegistry peer header block m)
newFetchClientRegistry :: forall (m :: * -> *) peer header block.
MonadSTM m =>
m (FetchClientRegistry peer header block m)
newFetchClientRegistry = forall peer header block (m :: * -> *).
StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m))
-> StrictTVar m (Map peer (FetchClientStateVars m header))
-> StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
-> StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Set peer)
-> FetchClientRegistry peer header block m
FetchClientRegistry forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO forall k a. Map k a
Map.empty
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO forall k a. Map k a
Map.empty
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO forall k a. Map k a
Map.empty
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO forall k a. Map k a
Map.empty
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO forall a. Set a
Set.empty
bracketFetchClient :: forall m a peer header block version.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer)
=> FetchClientRegistry peer header block m
-> version
-> (version -> WhetherReceivingTentativeBlocks)
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient :: forall (m :: * -> *) a peer header block version.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer) =>
FetchClientRegistry peer header block m
-> version
-> (version -> WhetherReceivingTentativeBlocks)
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient (FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m))
ctxVar
StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry StrictTVar m (Set peer)
dyingRegistry)
version
version version -> WhetherReceivingTentativeBlocks
isPipeliningEnabled peer
peer FetchClientContext header block m -> m a
action = do
StrictTMVar m ()
ksVar <- forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (StrictTMVar m ()
-> m (FetchClientContext header block m,
(ThreadId m, StrictTMVar m ()))
register StrictTMVar m ()
ksVar) (forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (StrictTMVar m ()
-> FetchClientContext header block m
-> (ThreadId m, StrictTMVar m ())
-> m ()
unregister StrictTMVar m ()
ksVar)) (FetchClientContext header block m -> m a
action forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst)
where
register :: StrictTMVar m ()
-> m ( FetchClientContext header block m
, (ThreadId m, StrictTMVar m ()) )
register :: StrictTMVar m ()
-> m (FetchClientContext header block m,
(ThreadId m, StrictTMVar m ()))
register StrictTMVar m ()
ksVar = do
ThreadId m
tid <- forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
FetchClientContext header block m
ctx <- forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
Map peer (FetchClientStateVars m header)
fr <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (FetchClientStateVars m header)
fr)
Set peer
dr <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Set peer)
dyingRegistry
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (peer
peer forall a. Ord a => a -> Set a -> Bool
`Set.notMember` Set peer
dr)
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer, WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy) <- forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m))
ctxVar
Map peer PeerGSV
dqPeers <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer PeerGSV)
dqRegistry
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer PeerGSV
dqPeers)
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m ())
m ->
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m ())
m) forall a b. (a -> b) -> a -> b
$
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer (ThreadId m
tid, StrictTMVar m ()
ksVar) Map peer (ThreadId m, StrictTMVar m ())
m
FetchClientPolicy header block m
policy <- do
let pipeliningEnabled :: WhetherReceivingTentativeBlocks
pipeliningEnabled = version -> WhetherReceivingTentativeBlocks
isPipeliningEnabled version
version
WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy WhetherReceivingTentativeBlocks
pipeliningEnabled
FetchClientStateVars m header
stateVars <- forall (m :: * -> *) header.
MonadSTM m =>
STM m (FetchClientStateVars m header)
newFetchClientStateVars
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (FetchClientStateVars m header)
m ->
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (FetchClientStateVars m header)
m) forall a b. (a -> b) -> a -> b
$
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer FetchClientStateVars m header
stateVars Map peer (FetchClientStateVars m header)
m
forall (m :: * -> *) a. Monad m => a -> m a
return FetchClientContext {
fetchClientCtxTracer :: Tracer m (TraceFetchClientState header)
fetchClientCtxTracer = forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap (forall peerid a. peerid -> a -> TraceLabelPeer peerid a
TraceLabelPeer peer
peer) Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer,
fetchClientCtxPolicy :: FetchClientPolicy header block m
fetchClientCtxPolicy = FetchClientPolicy header block m
policy,
fetchClientCtxStateVars :: FetchClientStateVars m header
fetchClientCtxStateVars = FetchClientStateVars m header
stateVars
}
forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
onException (do
(ThreadId m, StrictTMVar m ())
syncclient <- forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
syncclients <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry
case forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup peer
peer Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
syncclients of
Maybe (ThreadId m, StrictTMVar m (), StrictTMVar m ())
Nothing -> forall (m :: * -> *) a. MonadSTM m => STM m a
retry
Just (ThreadId m
cTid, StrictTMVar m ()
doneVar, StrictTMVar m ()
startVar) -> do
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
startVar ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar (forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar forall a b. (a -> b) -> a -> b
$ forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars FetchClientContext header block m
ctx)
(forall header.
Set (Point header) -> IsIdle -> PeerFetchStatus header
PeerFetchStatusReady forall a. Set a
Set.empty IsIdle
IsIdle)
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId m
cTid, StrictTMVar m ()
doneVar)
forall (m :: * -> *) a. Monad m => a -> m a
return (FetchClientContext header block m
ctx, (ThreadId m, StrictTMVar m ())
syncclient))
(forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar (forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar forall a b. (a -> b) -> a -> b
$ forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars FetchClientContext header block m
ctx) forall header. PeerFetchStatus header
PeerFetchStatusShutdown
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
ksVar ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m ())
m ->
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m ())
m) forall a b. (a -> b) -> a -> b
$
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m ())
m
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (FetchClientStateVars m header)
m ->
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (FetchClientStateVars m header)
m) forall a b. (a -> b) -> a -> b
$
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (FetchClientStateVars m header)
m
)
unregister :: StrictTMVar m ()
-> FetchClientContext header block m
-> (ThreadId m, StrictTMVar m ())
-> m ()
unregister :: StrictTMVar m ()
-> FetchClientContext header block m
-> (ThreadId m, StrictTMVar m ())
-> m ()
unregister StrictTMVar m ()
ksVar FetchClientContext { fetchClientCtxStateVars :: forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars = FetchClientStateVars m header
stateVars }
(ThreadId m
tid, StrictTMVar m ()
doneVar) = forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> a -> STM m ()
writeTVar (forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar FetchClientStateVars m header
stateVars) forall header. PeerFetchStatus header
PeerFetchStatusShutdown
forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
tid AsyncCancelled
AsyncCancelled
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
ksVar ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m ())
m ->
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m ())
m) forall a b. (a -> b) -> a -> b
$
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m ())
m
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m ()
doneVar
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (FetchClientStateVars m header)
m ->
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (FetchClientStateVars m header)
m) forall a b. (a -> b) -> a -> b
$
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (FetchClientStateVars m header)
m
bracketSyncWithFetchClient :: forall m a peer header block.
(MonadSTM m, MonadFork m, MonadCatch m,
Ord peer)
=> FetchClientRegistry peer header block m
-> peer
-> m a
-> m a
bracketSyncWithFetchClient :: forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadCatch m, Ord peer) =>
FetchClientRegistry peer header block m -> peer -> m a -> m a
bracketSyncWithFetchClient (FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m))
_ctxVar
StrictTVar m (Map peer (FetchClientStateVars m header))
_fetchRegistry StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry StrictTVar m (Map peer PeerGSV)
_dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_keepRegistry StrictTVar m (Set peer)
_dyingRegistry) peer
peer m a
action = do
StrictTMVar m ()
doneVar <- forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
StrictTMVar m ()
startVar <- forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> m b -> m c -> m c
bracket_ (StrictTMVar m () -> StrictTMVar m () -> m ()
register StrictTMVar m ()
doneVar StrictTMVar m ()
startVar) (StrictTMVar m () -> m ()
unregister StrictTMVar m ()
doneVar) m a
action
where
register :: StrictTMVar m () -> StrictTMVar m () -> m ()
register :: StrictTMVar m () -> StrictTMVar m () -> m ()
register StrictTMVar m ()
doneVar StrictTMVar m ()
startVar = do
ThreadId m
tid <- forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
sr <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
sr)
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m ->
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m) forall a b. (a -> b) -> a -> b
$
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer (ThreadId m
tid, StrictTMVar m ()
doneVar, StrictTMVar m ()
startVar) Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m
forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
onException (forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m ()
startVar) (StrictTMVar m () -> m ()
unregister StrictTMVar m ()
doneVar)
unregister :: StrictTMVar m () -> m ()
unregister :: StrictTMVar m () -> m ()
unregister StrictTMVar m ()
doneVar =
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
doneVar ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
syncRegistry forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m ->
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m) forall a b. (a -> b) -> a -> b
$
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())
m
bracketKeepAliveClient :: forall m a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer)
=> FetchClientRegistry peer header block m
-> peer
-> ((StrictTVar m (Map peer PeerGSV)) -> m a)
-> m a
bracketKeepAliveClient :: forall (m :: * -> *) a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer) =>
FetchClientRegistry peer header block m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
bracketKeepAliveClient(FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m))
_ctxVar
StrictTVar m (Map peer (FetchClientStateVars m header))
_fetchRegistry StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_syncRegistry StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry StrictTVar m (Set peer)
dyingRegistry) peer
peer StrictTVar m (Map peer PeerGSV) -> m a
action = do
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> m b -> m c -> m c
bracket_ m ()
register m ()
unregister (StrictTVar m (Map peer PeerGSV) -> m a
action StrictTVar m (Map peer PeerGSV)
dqRegistry)
where
register :: m ()
register :: m ()
register =
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
Map peer PeerGSV
dr <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer PeerGSV)
dqRegistry
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer PeerGSV
dr)
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer PeerGSV)
dqRegistry forall a b. (a -> b) -> a -> b
$ \Map peer PeerGSV
m ->
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer PeerGSV
m) forall a b. (a -> b) -> a -> b
$
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer PeerGSV
defaultGSV Map peer PeerGSV
m
unregister :: m ()
unregister :: m ()
unregister = forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ forall a b. (a -> b) -> a -> b
$ do
Maybe (ThreadId m, StrictTMVar m ())
fetchclient_m <- forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
Map peer (ThreadId m, StrictTMVar m ())
fetchclients <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry
case forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup peer
peer Map peer (ThreadId m, StrictTMVar m ())
fetchclients of
Maybe (ThreadId m, StrictTMVar m ())
Nothing -> do
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer PeerGSV)
dqRegistry forall a b. (a -> b) -> a -> b
$ \Map peer PeerGSV
m ->
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer PeerGSV
m) forall a b. (a -> b) -> a -> b
$
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer PeerGSV
m
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just (ThreadId m, StrictTMVar m ())
rc -> do
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Set peer)
dyingRegistry forall a b. (a -> b) -> a -> b
$ \Set peer
s ->
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall a. Ord a => a -> Set a -> Bool
`Set.notMember` Set peer
s) forall a b. (a -> b) -> a -> b
$
forall a. Ord a => a -> Set a -> Set a
Set.insert peer
peer Set peer
s
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (ThreadId m, StrictTMVar m ())
rc
case Maybe (ThreadId m, StrictTMVar m ())
fetchclient_m of
Maybe (ThreadId m, StrictTMVar m ())
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just (ThreadId m
tid, StrictTMVar m ()
doneVar) -> do
forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
tid AsyncCancelled
AsyncCancelled
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m ()
doneVar
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer PeerGSV)
dqRegistry forall a b. (a -> b) -> a -> b
$ \Map peer PeerGSV
m ->
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer PeerGSV
m) forall a b. (a -> b) -> a -> b
$
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer PeerGSV
m
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Set peer)
dyingRegistry forall a b. (a -> b) -> a -> b
$ \Set peer
s ->
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer forall a. Ord a => a -> Set a -> Bool
`Set.member` Set peer
s) forall a b. (a -> b) -> a -> b
$
forall a. Ord a => a -> Set a -> Set a
Set.delete peer
peer Set peer
s
setFetchClientContext :: MonadSTM m
=> FetchClientRegistry peer header block m
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> ( WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
)
-> m ()
setFetchClientContext :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> (WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m))
-> m ()
setFetchClientContext (FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m))
ctxVar StrictTVar m (Map peer (FetchClientStateVars m header))
_ StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Set peer)
_) Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy =
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
Bool
ok <- forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m))
ctxVar (Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer, WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ok forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => [Char] -> a
error [Char]
"setFetchClientContext: called more than once"
readFetchClientsStatus :: MonadSTM m
=> FetchClientRegistry peer header block m
-> STM m (Map peer (PeerFetchStatus header))
readFetchClientsStatus :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> STM m (Map peer (PeerFetchStatus header))
readFetchClientsStatus (FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
registry StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Set peer)
_) =
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
registry forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar)
readFetchClientsStateVars :: MonadSTM m
=> FetchClientRegistry peer header block m
-> STM m (Map peer (FetchClientStateVars m header))
readFetchClientsStateVars :: forall (m :: * -> *) peer header block.
MonadSTM m =>
FetchClientRegistry peer header block m
-> STM m (Map peer (FetchClientStateVars m header))
readFetchClientsStateVars (FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
registry StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Set peer)
_) = forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
registry
readPeerGSVs :: forall block header m peer.
( MonadSTM m, Ord peer)
=> FetchClientRegistry peer header block m
-> STM m (Map peer PeerGSV)
readPeerGSVs :: forall block header (m :: * -> *) peer.
(MonadSTM m, Ord peer) =>
FetchClientRegistry peer header block m -> STM m (Map peer PeerGSV)
readPeerGSVs (FetchClientRegistry StrictTMVar
m
(Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
_ StrictTVar
m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry StrictTVar m (Set peer)
_) = do
Map peer PeerGSV
dr <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer PeerGSV)
dqRegistry
Map peer (ThreadId m, StrictTMVar m ())
kr <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall k a b. Ord k => Map k a -> Map k b -> Map k a
Map.intersection Map peer PeerGSV
dr Map peer (ThreadId m, StrictTMVar m ())
kr