{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTSyntax #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Network.Mux
(
newMux
, Mux
, MuxMode (..)
, HasInitiator
, HasResponder
, MiniProtocolBundle (..)
, MiniProtocolInfo (..)
, MiniProtocolNum (..)
, MiniProtocolDirection (..)
, MiniProtocolLimits (..)
, runMux
, runMiniProtocol
, StartOnDemandOrEagerly (..)
, stopMux
, MuxBearer
, MakeBearer (..)
, SDUSize (..)
, miniProtocolStateMap
, muxStopped
, MuxError (..)
, MuxErrorType (..)
, traceMuxBearerState
, MuxBearerState (..)
, MuxTrace (..)
, WithMuxBearer (..)
) where
import qualified Data.ByteString.Lazy as BL
import Data.Int (Int64)
import Data.Map (Map)
import qualified Data.Map.Strict as Map
import Data.Maybe (isNothing)
import Data.Monoid.Synchronisation (FirstToFinish (..))
import Control.Applicative
import Control.Concurrent.Class.MonadSTM.Strict
import qualified Control.Concurrent.JobPool as JobPool
import Control.Exception (SomeAsyncException (..))
import Control.Monad
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTimer.SI hiding (timeout)
import Control.Tracer
import Network.Mux.Bearer
import Network.Mux.Channel
import Network.Mux.Egress as Egress
import Network.Mux.Ingress as Ingress
import Network.Mux.Timeout
import Network.Mux.Trace
import Network.Mux.Types
data Mux (mode :: MuxMode) m =
Mux {
forall (mode :: MuxMode) (m :: * -> *).
Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: !(Map (MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m)),
forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: !(StrictTQueue m (ControlCmd mode m)),
forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
}
miniProtocolStateMap :: MonadSTM m
=> Mux mode m
-> Map (MiniProtocolNum, MiniProtocolDir)
(STM m MiniProtocolStatus)
miniProtocolStateMap :: forall (m :: * -> *) (mode :: MuxMode).
MonadSTM m =>
Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (STM m MiniProtocolStatus)
miniProtocolStateMap = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (mode :: MuxMode) (m :: * -> *).
Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols
muxStopped :: MonadSTM m => Mux mode m -> STM m (Maybe SomeException)
muxStopped :: forall (m :: * -> *) (mode :: MuxMode).
MonadSTM m =>
Mux mode m -> STM m (Maybe SomeException)
muxStopped Mux { StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> StrictTVar m MuxStatus
muxStatus } =
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m MuxStatus
muxStatus forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \MuxStatus
status -> case MuxStatus
status of
MuxStatus
MuxReady -> forall (m :: * -> *) a. MonadSTM m => STM m a
retry
MuxFailed SomeException
err -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just SomeException
err)
MuxStatus
MuxStopping -> forall (m :: * -> *) a. MonadSTM m => STM m a
retry
MuxStatus
MuxStopped -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
data MuxStatus
= MuxReady
| MuxFailed SomeException
| MuxStopping
| MuxStopped
newMux :: MonadSTM m => MiniProtocolBundle mode -> m (Mux mode m)
newMux :: forall (m :: * -> *) (mode :: MuxMode).
MonadSTM m =>
MiniProtocolBundle mode -> m (Mux mode m)
newMux (MiniProtocolBundle [MiniProtocolInfo mode]
ptcls) = do
Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols <- forall (m :: * -> *) (mode :: MuxMode).
MonadSTM m =>
[MiniProtocolInfo mode]
-> m (Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
mkMiniProtocolStateMap [MiniProtocolInfo mode]
ptcls
StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue <- forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall (m :: * -> *) a. MonadSTM m => STM m (StrictTQueue m a)
newTQueue
StrictTVar m MuxStatus
muxStatus <- forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO MuxStatus
MuxReady
forall (m :: * -> *) a. Monad m => a -> m a
return Mux {
Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols,
StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue,
StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus
}
mkMiniProtocolStateMap :: MonadSTM m
=> [MiniProtocolInfo mode]
-> m (Map (MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m))
mkMiniProtocolStateMap :: forall (m :: * -> *) (mode :: MuxMode).
MonadSTM m =>
[MiniProtocolInfo mode]
-> m (Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
mkMiniProtocolStateMap [MiniProtocolInfo mode]
ptcls =
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence
[ do MiniProtocolState mode m
state <- forall (m :: * -> *) (mode :: MuxMode).
MonadSTM m =>
MiniProtocolInfo mode -> m (MiniProtocolState mode m)
mkMiniProtocolState MiniProtocolInfo mode
ptcl
forall (m :: * -> *) a. Monad m => a -> m a
return ((MiniProtocolNum
miniProtocolNum, forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir), MiniProtocolState mode m
state)
| ptcl :: MiniProtocolInfo mode
ptcl@MiniProtocolInfo {MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum, MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir} <- [MiniProtocolInfo mode]
ptcls ]
mkMiniProtocolState :: MonadSTM m
=> MiniProtocolInfo mode
-> m (MiniProtocolState mode m)
mkMiniProtocolState :: forall (m :: * -> *) (mode :: MuxMode).
MonadSTM m =>
MiniProtocolInfo mode -> m (MiniProtocolState mode m)
mkMiniProtocolState MiniProtocolInfo mode
miniProtocolInfo = do
StrictTVar m ByteString
miniProtocolIngressQueue <- forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ByteString
BL.empty
StrictTVar m MiniProtocolStatus
miniProtocolStatusVar <- forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO MiniProtocolStatus
StatusIdle
forall (m :: * -> *) a. Monad m => a -> m a
return MiniProtocolState {
MiniProtocolInfo mode
miniProtocolInfo :: MiniProtocolInfo mode
miniProtocolInfo :: MiniProtocolInfo mode
miniProtocolInfo,
StrictTVar m ByteString
miniProtocolIngressQueue :: StrictTVar m ByteString
miniProtocolIngressQueue :: StrictTVar m ByteString
miniProtocolIngressQueue,
StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
}
stopMux :: MonadSTM m => Mux mode m -> m ()
stopMux :: forall (m :: * -> *) (mode :: MuxMode).
MonadSTM m =>
Mux mode m -> m ()
stopMux Mux{StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue} =
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadSTM m =>
StrictTQueue m a -> a -> STM m ()
writeTQueue StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue forall (mode :: MuxMode) (m :: * -> *). ControlCmd mode m
CmdShutdown
data MuxGroup = MuxJob
| MiniProtocolJob
deriving (MuxGroup -> MuxGroup -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: MuxGroup -> MuxGroup -> Bool
$c/= :: MuxGroup -> MuxGroup -> Bool
== :: MuxGroup -> MuxGroup -> Bool
$c== :: MuxGroup -> MuxGroup -> Bool
Eq, Eq MuxGroup
MuxGroup -> MuxGroup -> Bool
MuxGroup -> MuxGroup -> Ordering
MuxGroup -> MuxGroup -> MuxGroup
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: MuxGroup -> MuxGroup -> MuxGroup
$cmin :: MuxGroup -> MuxGroup -> MuxGroup
max :: MuxGroup -> MuxGroup -> MuxGroup
$cmax :: MuxGroup -> MuxGroup -> MuxGroup
>= :: MuxGroup -> MuxGroup -> Bool
$c>= :: MuxGroup -> MuxGroup -> Bool
> :: MuxGroup -> MuxGroup -> Bool
$c> :: MuxGroup -> MuxGroup -> Bool
<= :: MuxGroup -> MuxGroup -> Bool
$c<= :: MuxGroup -> MuxGroup -> Bool
< :: MuxGroup -> MuxGroup -> Bool
$c< :: MuxGroup -> MuxGroup -> Bool
compare :: MuxGroup -> MuxGroup -> Ordering
$ccompare :: MuxGroup -> MuxGroup -> Ordering
Ord)
runMux :: forall m mode.
( MonadAsync m
, MonadFork m
, MonadLabelledSTM m
, Alternative (STM m)
, MonadThrow (STM m)
, MonadTimer m
, MonadMask m
)
=> Tracer m MuxTrace
-> Mux mode m
-> MuxBearer m
-> m ()
runMux :: forall (m :: * -> *) (mode :: MuxMode).
(MonadAsync m, MonadFork m, MonadLabelledSTM m,
Alternative (STM m), MonadThrow (STM m), MonadTimer m,
MonadMask m) =>
Tracer m MuxTrace -> Mux mode m -> MuxBearer m -> m ()
runMux Tracer m MuxTrace
tracer Mux {Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols, StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue, StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> StrictTVar m MuxStatus
muxStatus} MuxBearer m
bearer = do
StrictTBQueue m (TranslocationServiceRequest m)
egressQueue <- forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadSTM m =>
Natural -> STM m (StrictTBQueue m a)
newTBQueue Natural
100
forall (m :: * -> *) a.
MonadLabelledSTM m =>
StrictTBQueue m a -> String -> m ()
labelTBQueueIO StrictTBQueue m (TranslocationServiceRequest m)
egressQueue String
"mux-eq"
forall group (m :: * -> *) a b.
(MonadAsync m, MonadThrow m, MonadLabelledSTM m) =>
(JobPool group m a -> m b) -> m b
JobPool.withJobPool
(\JobPool MuxGroup m MuxJobResult
jobpool -> do
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool MuxGroup m MuxJobResult
jobpool (StrictTBQueue m (TranslocationServiceRequest m)
-> Job MuxGroup m MuxJobResult
muxerJob StrictTBQueue m (TranslocationServiceRequest m)
egressQueue)
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool MuxGroup m MuxJobResult
jobpool Job MuxGroup m MuxJobResult
demuxerJob
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
Mature)
forall (m :: * -> *) b.
(MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m,
MonadMask m, MonadThrow (STM m)) =>
(TimeoutFn m -> m b) -> m b
withTimeoutSerial forall a b. (a -> b) -> a -> b
$ \TimeoutFn m
timeout ->
forall (mode :: MuxMode) (m :: * -> *).
(MonadAsync m, MonadMask m, Alternative (STM m),
MonadThrow (STM m)) =>
Tracer m MuxTrace
-> TimeoutFn m
-> JobPool MuxGroup m MuxJobResult
-> EgressQueue m
-> StrictTQueue m (ControlCmd mode m)
-> StrictTVar m MuxStatus
-> m ()
monitor Tracer m MuxTrace
tracer
TimeoutFn m
timeout
JobPool MuxGroup m MuxJobResult
jobpool
StrictTBQueue m (TranslocationServiceRequest m)
egressQueue
StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue
StrictTVar m MuxStatus
muxStatus
)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeAsyncException e
e) -> do
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus (SomeException -> MuxStatus
MuxFailed forall a b. (a -> b) -> a -> b
$ forall e. Exception e => e -> SomeException
toException e
e)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO e
e
where
muxerJob :: StrictTBQueue m (TranslocationServiceRequest m)
-> Job MuxGroup m MuxJobResult
muxerJob StrictTBQueue m (TranslocationServiceRequest m)
egressQueue =
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
JobPool.Job (forall (m :: * -> *) void.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m) =>
EgressQueue m -> MuxBearer m -> m void
muxer StrictTBQueue m (TranslocationServiceRequest m)
egressQueue MuxBearer m
bearer)
(forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> MuxJobResult
MuxerException)
MuxGroup
MuxJob
String
"muxer"
demuxerJob :: Job MuxGroup m MuxJobResult
demuxerJob =
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
JobPool.Job (forall (m :: * -> *) (mode :: MuxMode) void.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
MonadTimer m) =>
[MiniProtocolState mode m] -> MuxBearer m -> m void
demuxer (forall k a. Map k a -> [a]
Map.elems Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols) MuxBearer m
bearer)
(forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> MuxJobResult
DemuxerException)
MuxGroup
MuxJob
String
"demuxer"
miniProtocolJob
:: forall mode m.
( MonadSTM m
, MonadThread m
, MonadThrow (STM m)
)
=> Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> JobPool.Job MuxGroup m MuxJobResult
miniProtocolJob :: forall (mode :: MuxMode) (m :: * -> *).
(MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job MuxGroup m MuxJobResult
miniProtocolJob Tracer m MuxTrace
tracer EgressQueue m
egressQueue
MiniProtocolState {
miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo =
MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
},
IngressQueue m
miniProtocolIngressQueue :: IngressQueue m
miniProtocolIngressQueue :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> IngressQueue m
miniProtocolIngressQueue,
StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
}
(MiniProtocolAction Channel m -> m (a, Maybe ByteString)
protocolAction StrictTMVar m (Either SomeException a)
completionVar) =
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
JobPool.Job m MuxJobResult
jobAction
SomeException -> m MuxJobResult
jobHandler
MuxGroup
MiniProtocolJob
(forall a. Show a => a -> String
show MiniProtocolNum
miniProtocolNum forall a. [a] -> [a] -> [a]
++ String
"." forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show MiniProtocolDir
miniProtocolDirEnum)
where
jobAction :: m MuxJobResult
jobAction = do
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread (case MiniProtocolNum
miniProtocolNum of
MiniProtocolNum Word16
a -> String
"prtcl-" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show Word16
a)
IngressQueue m
w <- forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ByteString
BL.empty
let chan :: Channel m
chan = forall (m :: * -> *).
MonadSTM m =>
Tracer m MuxTrace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> Channel m
muxChannel Tracer m MuxTrace
tracer EgressQueue m
egressQueue (forall (m :: * -> *). StrictTVar m ByteString -> Wanton m
Wanton IngressQueue m
w)
MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum
IngressQueue m
miniProtocolIngressQueue
(a
result, Maybe ByteString
remainder) <- Channel m -> m (a, Maybe ByteString)
protocolAction Channel m
chan
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceTerminating MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar IngressQueue m
w forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool
BL.null
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar MiniProtocolStatus
StatusIdle
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m (Either SomeException a)
completionVar (forall a b. b -> Either a b
Right a
result)
forall (m :: * -> *) a. MonadSTM m => STM m a -> STM m a -> STM m a
`orElse` forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (MiniProtocolNum -> MuxRuntimeError
MuxBlockedOnCompletionVar MiniProtocolNum
miniProtocolNum)
case Maybe ByteString
remainder of
Just ByteString
trailing ->
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar IngressQueue m
miniProtocolIngressQueue (ByteString -> ByteString -> ByteString
BL.append ByteString
trailing)
Maybe ByteString
Nothing ->
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
forall (m :: * -> *) a. Monad m => a -> m a
return (MiniProtocolNum -> MiniProtocolDir -> MuxJobResult
MiniProtocolShutdown MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum)
jobHandler :: SomeException -> m MuxJobResult
jobHandler :: SomeException -> m MuxJobResult
jobHandler SomeException
e = do
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m (Either SomeException a)
completionVar (forall a b. a -> Either a b
Left SomeException
e)
forall (m :: * -> *) a. MonadSTM m => STM m a -> STM m a -> STM m a
`orElse`
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (MiniProtocolNum -> MuxRuntimeError
MuxBlockedOnCompletionVar MiniProtocolNum
miniProtocolNum)
forall (m :: * -> *) a. Monad m => a -> m a
return (MiniProtocolNum -> MiniProtocolDir -> SomeException -> MuxJobResult
MiniProtocolException MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum SomeException
e)
miniProtocolDirEnum :: MiniProtocolDir
miniProtocolDirEnum :: MiniProtocolDir
miniProtocolDirEnum = forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir
data ControlCmd mode m =
CmdStartProtocolThread
!StartOnDemandOrEagerly
!(MiniProtocolState mode m)
!(MiniProtocolAction m)
| CmdShutdown
data StartOnDemandOrEagerly = StartOnDemand | StartEagerly
deriving StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
$c/= :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
== :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
$c== :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
Eq
data MiniProtocolAction m where
MiniProtocolAction :: (Channel m -> m (a, Maybe BL.ByteString))
-> StrictTMVar m (Either SomeException a)
-> MiniProtocolAction m
type MiniProtocolKey = (MiniProtocolNum, MiniProtocolDir)
newtype MonitorCtx m mode = MonitorCtx {
forall (m :: * -> *) (mode :: MuxMode).
MonitorCtx m mode
-> Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols :: (Map MiniProtocolKey
(MiniProtocolState mode m, MiniProtocolAction m))
}
monitor :: forall mode m.
( MonadAsync m
, MonadMask m
, Alternative (STM m)
, MonadThrow (STM m)
)
=> Tracer m MuxTrace
-> TimeoutFn m
-> JobPool.JobPool MuxGroup m MuxJobResult
-> EgressQueue m
-> StrictTQueue m (ControlCmd mode m)
-> StrictTVar m MuxStatus
-> m ()
monitor :: forall (mode :: MuxMode) (m :: * -> *).
(MonadAsync m, MonadMask m, Alternative (STM m),
MonadThrow (STM m)) =>
Tracer m MuxTrace
-> TimeoutFn m
-> JobPool MuxGroup m MuxJobResult
-> EgressQueue m
-> StrictTQueue m (ControlCmd mode m)
-> StrictTVar m MuxStatus
-> m ()
monitor Tracer m MuxTrace
tracer TimeoutFn m
timeout JobPool MuxGroup m MuxJobResult
jobpool EgressQueue m
egressQueue StrictTQueue m (ControlCmd mode m)
cmdQueue StrictTVar m MuxStatus
muxStatus =
MonitorCtx m mode -> m ()
go (forall (m :: * -> *) (mode :: MuxMode).
Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
-> MonitorCtx m mode
MonitorCtx forall k a. Map k a
Map.empty)
where
go :: MonitorCtx m mode -> m ()
go :: MonitorCtx m mode -> m ()
go !monitorCtx :: MonitorCtx m mode
monitorCtx@MonitorCtx { Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols :: Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols :: forall (m :: * -> *) (mode :: MuxMode).
MonitorCtx m mode
-> Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols } = do
MonitorEvent mode m
result <- forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. FirstToFinish m a -> m a
runFirstToFinish forall a b. (a -> b) -> a -> b
$
(forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish forall a b. (a -> b) -> a -> b
$ forall (mode :: MuxMode) (m :: * -> *).
MuxJobResult -> MonitorEvent mode m
EventJobResult forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) group a.
MonadSTM m =>
JobPool group m a -> STM m a
JobPool.waitForJob JobPool MuxGroup m MuxJobResult
jobpool)
forall a. Semigroup a => a -> a -> a
<> (forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish forall a b. (a -> b) -> a -> b
$ forall (mode :: MuxMode) (m :: * -> *).
ControlCmd mode m -> MonitorEvent mode m
EventControlCmd forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadSTM m => StrictTQueue m a -> STM m a
readTQueue StrictTQueue m (ControlCmd mode m)
cmdQueue)
forall a. Semigroup a => a -> a -> a
<> forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap
(\(MiniProtocolState mode m
ptclState, MiniProtocolAction m
ptclAction) ->
forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish forall a b. (a -> b) -> a -> b
$ do
IngressQueue m -> STM m ()
checkNonEmptyQueue (forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> IngressQueue m
miniProtocolIngressQueue MiniProtocolState mode m
ptclState)
forall (m :: * -> *) a. Monad m => a -> m a
return (forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m
-> MiniProtocolAction m -> MonitorEvent mode m
EventStartOnDemand MiniProtocolState mode m
ptclState MiniProtocolAction m
ptclAction)
)
Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols
case MonitorEvent mode m
result of
EventJobResult (MiniProtocolShutdown MiniProtocolNum
pnum MiniProtocolDir
pmode) -> do
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceCleanExit MiniProtocolNum
pnum MiniProtocolDir
pmode)
MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx
EventJobResult (MiniProtocolException MiniProtocolNum
pnum MiniProtocolDir
pmode SomeException
e) -> do
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
Dead)
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> SomeException -> MuxTrace
MuxTraceExceptionExit MiniProtocolNum
pnum MiniProtocolDir
pmode SomeException
e)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus forall a b. (a -> b) -> a -> b
$ SomeException -> MuxStatus
MuxFailed SomeException
e
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e
EventJobResult (MuxerException SomeException
e) -> do
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
Dead)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus forall a b. (a -> b) -> a -> b
$ SomeException -> MuxStatus
MuxFailed SomeException
e
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e
EventJobResult (DemuxerException SomeException
e) -> do
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
Dead)
Bool
r <- forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
Int
size <- forall (m :: * -> *) group a.
(MonadSTM m, Eq group) =>
JobPool group m a -> group -> STM m Int
JobPool.readGroupSize JobPool MuxGroup m MuxJobResult
jobpool MuxGroup
MiniProtocolJob
case Int
size of
Int
0 | Just (MuxError MuxErrorType
MuxBearerClosed String
_) <- forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e
-> forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus MuxStatus
MuxStopped
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Int
_ -> forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus (SomeException -> MuxStatus
MuxFailed SomeException
e)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
r (forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e)
EventControlCmd (CmdStartProtocolThread
StartOnDemandOrEagerly
StartEagerly
ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
}
}
MiniProtocolAction m
ptclAction) -> do
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceStartEagerly MiniProtocolNum
miniProtocolNum
(forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool MuxGroup m MuxJobResult
jobpool forall a b. (a -> b) -> a -> b
$
forall (mode :: MuxMode) (m :: * -> *).
(MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job MuxGroup m MuxJobResult
miniProtocolJob
Tracer m MuxTrace
tracer
EgressQueue m
egressQueue
MiniProtocolState mode m
ptclState
MiniProtocolAction m
ptclAction
MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx
EventControlCmd (CmdStartProtocolThread
StartOnDemandOrEagerly
StartOnDemand
ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
}
}
MiniProtocolAction m
ptclAction) -> do
let monitorCtx' :: MonitorCtx m mode
monitorCtx' = MonitorCtx m mode
monitorCtx { mcOnDemandProtocols :: Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols =
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (MiniProtocolState mode m -> (MiniProtocolNum, MiniProtocolDir)
protocolKey MiniProtocolState mode m
ptclState)
(MiniProtocolState mode m
ptclState, MiniProtocolAction m
ptclAction)
Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols
}
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceStartedOnDemand MiniProtocolNum
miniProtocolNum
(forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx'
EventControlCmd ControlCmd mode m
CmdShutdown -> do
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer MuxTrace
MuxTraceStopping
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus MuxStatus
MuxStopping
forall (m :: * -> *) group a.
(MonadAsync m, Eq group) =>
JobPool group m a -> group -> m ()
JobPool.cancelGroup JobPool MuxGroup m MuxJobResult
jobpool MuxGroup
MiniProtocolJob
Maybe ()
_ <- TimeoutFn m
timeout DiffTime
2 forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
MonadSTM m =>
StrictTBQueue m a -> STM m (Maybe a)
tryPeekTBQueue EgressQueue m
egressQueue
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Maybe a -> Bool
isNothing
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus MuxStatus
MuxStopped
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer MuxTrace
MuxTraceStopped
EventStartOnDemand ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
},
StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
}
MiniProtocolAction m
ptclAction -> do
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceStartOnDemand MiniProtocolNum
miniProtocolNum
(forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar MiniProtocolStatus
StatusRunning
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool MuxGroup m MuxJobResult
jobpool forall a b. (a -> b) -> a -> b
$
forall (mode :: MuxMode) (m :: * -> *).
(MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job MuxGroup m MuxJobResult
miniProtocolJob
Tracer m MuxTrace
tracer
EgressQueue m
egressQueue
MiniProtocolState mode m
ptclState
MiniProtocolAction m
ptclAction
let ptclKey :: (MiniProtocolNum, MiniProtocolDir)
ptclKey = MiniProtocolState mode m -> (MiniProtocolNum, MiniProtocolDir)
protocolKey MiniProtocolState mode m
ptclState
monitorCtx' :: MonitorCtx m mode
monitorCtx' = MonitorCtx m mode
monitorCtx { mcOnDemandProtocols :: Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols =
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete (MiniProtocolNum, MiniProtocolDir)
ptclKey
Map
(MiniProtocolNum, MiniProtocolDir)
(MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols
}
MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx'
checkNonEmptyQueue :: IngressQueue m -> STM m ()
checkNonEmptyQueue :: IngressQueue m -> STM m ()
checkNonEmptyQueue IngressQueue m
q = do
ByteString
buf <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar IngressQueue m
q
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> Bool
not (ByteString -> Bool
BL.null ByteString
buf))
protocolKey :: MiniProtocolState mode m -> MiniProtocolKey
protocolKey :: MiniProtocolState mode m -> (MiniProtocolNum, MiniProtocolDir)
protocolKey MiniProtocolState {
miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
}
} =
(MiniProtocolNum
miniProtocolNum, forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir)
data MonitorEvent mode m =
EventJobResult MuxJobResult
| EventControlCmd (ControlCmd mode m)
| EventStartOnDemand (MiniProtocolState mode m)
(MiniProtocolAction m)
data MuxJobResult =
MiniProtocolShutdown MiniProtocolNum MiniProtocolDir
| MiniProtocolException MiniProtocolNum MiniProtocolDir SomeException
| MuxerException SomeException
| DemuxerException SomeException
muxChannel
:: forall m.
( MonadSTM m
)
=> Tracer m MuxTrace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> Channel m
muxChannel :: forall (m :: * -> *).
MonadSTM m =>
Tracer m MuxTrace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> Channel m
muxChannel Tracer m MuxTrace
tracer EgressQueue m
egressQueue want :: Wanton m
want@(Wanton StrictTVar m ByteString
w) MiniProtocolNum
mc MiniProtocolDir
md StrictTVar m ByteString
q =
Channel { ByteString -> m ()
send :: ByteString -> m ()
send :: ByteString -> m ()
send, m (Maybe ByteString)
recv :: m (Maybe ByteString)
recv :: m (Maybe ByteString)
recv}
where
perMiniProtocolBufferSize :: Int64
perMiniProtocolBufferSize :: Int64
perMiniProtocolBufferSize = Int64
0x3ffff
send :: BL.ByteString -> m ()
send :: ByteString -> m ()
send ByteString
encoding = do
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> Int -> MuxTrace
MuxTraceChannelSendStart MiniProtocolNum
mc (forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
encoding)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
ByteString
buf <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m ByteString
w
if ByteString -> Int64
BL.length ByteString
buf forall a. Ord a => a -> a -> Bool
< Int64
perMiniProtocolBufferSize
then do
let wasEmpty :: Bool
wasEmpty = ByteString -> Bool
BL.null ByteString
buf
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m ByteString
w (ByteString -> ByteString -> ByteString
BL.append ByteString
buf ByteString
encoding)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
wasEmpty forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
MonadSTM m =>
StrictTBQueue m a -> a -> STM m ()
writeTBQueue EgressQueue m
egressQueue (forall (m :: * -> *).
MiniProtocolNum
-> MiniProtocolDir -> Wanton m -> TranslocationServiceRequest m
TLSRDemand MiniProtocolNum
mc MiniProtocolDir
md Wanton m
want)
else forall (m :: * -> *) a. MonadSTM m => STM m a
retry
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> MuxTrace
MuxTraceChannelSendEnd MiniProtocolNum
mc
recv :: m (Maybe BL.ByteString)
recv :: m (Maybe ByteString)
recv = do
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> MuxTrace
MuxTraceChannelRecvStart MiniProtocolNum
mc
ByteString
blob <- forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
ByteString
blob <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m ByteString
q
if ByteString
blob forall a. Eq a => a -> a -> Bool
== ByteString
BL.empty
then forall (m :: * -> *) a. MonadSTM m => STM m a
retry
else forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m ByteString
q ByteString
BL.empty forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
blob
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> Int -> MuxTrace
MuxTraceChannelRecvEnd MiniProtocolNum
mc (forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
blob)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just ByteString
blob
traceMuxBearerState :: Tracer m MuxTrace -> MuxBearerState -> m ()
traceMuxBearerState :: forall (m :: * -> *). Tracer m MuxTrace -> MuxBearerState -> m ()
traceMuxBearerState Tracer m MuxTrace
tracer MuxBearerState
state =
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
state)
runMiniProtocol :: forall mode m a.
( Alternative (STM m)
, MonadSTM m
, MonadThrow m
, MonadThrow (STM m)
)
=> Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (Channel m -> m (a, Maybe BL.ByteString))
-> m (STM m (Either SomeException a))
runMiniProtocol :: forall (mode :: MuxMode) (m :: * -> *) a.
(Alternative (STM m), MonadSTM m, MonadThrow m,
MonadThrow (STM m)) =>
Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (Channel m -> m (a, Maybe ByteString))
-> m (STM m (Either SomeException a))
runMiniProtocol Mux { Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m
-> Map
(MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols, StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue , StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> StrictTVar m MuxStatus
muxStatus}
MiniProtocolNum
ptclNum MiniProtocolDirection mode
ptclDir StartOnDemandOrEagerly
startMode Channel m -> m (a, Maybe ByteString)
protocolAction
| Just ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState{StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar}
<- forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (MiniProtocolNum
ptclNum, MiniProtocolDir
ptclDir') Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols
= forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
MuxStatus
st <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m MuxStatus
muxStatus
case MuxStatus
st of
MuxStatus
MuxStopping -> forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (MuxErrorType -> String -> MuxError
MuxError (Maybe MuxErrorType -> MuxErrorType
MuxShutdown forall a. Maybe a
Nothing) String
"mux stopping")
MuxStatus
MuxStopped -> forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (MuxErrorType -> String -> MuxError
MuxError (Maybe MuxErrorType -> MuxErrorType
MuxShutdown forall a. Maybe a
Nothing) String
"mux stopped")
MuxStatus
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
MiniProtocolStatus
status <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (MiniProtocolStatus
status forall a. Eq a => a -> a -> Bool
== MiniProtocolStatus
StatusIdle) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (MiniProtocolNum
-> MiniProtocolDir -> MiniProtocolStatus -> MuxRuntimeError
ProtocolAlreadyRunning MiniProtocolNum
ptclNum MiniProtocolDir
ptclDir' MiniProtocolStatus
status)
let !status' :: MiniProtocolStatus
status' = case StartOnDemandOrEagerly
startMode of
StartOnDemandOrEagerly
StartOnDemand -> MiniProtocolStatus
StatusStartOnDemand
StartOnDemandOrEagerly
StartEagerly -> MiniProtocolStatus
StatusRunning
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar MiniProtocolStatus
status'
StrictTMVar m (Either SomeException a)
completionVar <- forall (m :: * -> *) a. MonadSTM m => STM m (StrictTMVar m a)
newEmptyTMVar
forall (m :: * -> *) a.
MonadSTM m =>
StrictTQueue m a -> a -> STM m ()
writeTQueue StrictTQueue m (ControlCmd mode m)
muxControlCmdQueue forall a b. (a -> b) -> a -> b
$
forall (mode :: MuxMode) (m :: * -> *).
StartOnDemandOrEagerly
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> ControlCmd mode m
CmdStartProtocolThread
StartOnDemandOrEagerly
startMode
MiniProtocolState mode m
ptclState
(forall (m :: * -> *) a.
(Channel m -> m (a, Maybe ByteString))
-> StrictTMVar m (Either SomeException a) -> MiniProtocolAction m
MiniProtocolAction Channel m -> m (a, Maybe ByteString)
protocolAction StrictTMVar m (Either SomeException a)
completionVar)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
completionAction StrictTMVar m (Either SomeException a)
completionVar
| Bool
otherwise
= forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (MiniProtocolNum -> MiniProtocolDir -> MuxRuntimeError
UnknownProtocolInternalError MiniProtocolNum
ptclNum MiniProtocolDir
ptclDir')
where
ptclDir' :: MiniProtocolDir
ptclDir' = forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
ptclDir
completionAction :: StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
completionAction StrictTMVar m (Either SomeException a)
completionVar = do
MuxStatus
st <- forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m MuxStatus
muxStatus
case MuxStatus
st of
MuxStatus
MuxReady -> forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
MuxStatus
MuxStopping -> forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ forall e. Exception e => e -> SomeException
toException (MuxErrorType -> String -> MuxError
MuxError (Maybe MuxErrorType -> MuxErrorType
MuxShutdown forall a. Maybe a
Nothing) String
"Mux stopping"))
MuxStatus
MuxStopped -> forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ forall e. Exception e => e -> SomeException
toException (MuxErrorType -> String -> MuxError
MuxError (Maybe MuxErrorType -> MuxErrorType
MuxShutdown forall a. Maybe a
Nothing) String
"Mux stopped"))
MuxFailed SomeException
e -> forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ forall e. Exception e => e -> SomeException
toException forall a b. (a -> b) -> a -> b
$
case forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
Just e' :: MuxError
e'@MuxError { MuxErrorType
errorType :: MuxError -> MuxErrorType
errorType :: MuxErrorType
errorType } ->
MuxError
e' { errorType :: MuxErrorType
errorType = Maybe MuxErrorType -> MuxErrorType
MuxShutdown (forall a. a -> Maybe a
Just MuxErrorType
errorType) }
Maybe MuxError
Nothing ->
MuxErrorType -> String -> MuxError
MuxError (Maybe MuxErrorType -> MuxErrorType
MuxShutdown forall a. Maybe a
Nothing) (forall a. Show a => a -> String
show SomeException
e))