{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Network.Mux.Bearer.AttenuatedChannel
  ( AttenuatedChannel (..)
  , Size
  , SuccessOrFailure (..)
  , Attenuation (..)
  , newConnectedAttenuatedChannelPair
  , attenuationChannelAsMuxBearer
    -- * Trace
  , AttenuatedChannelTrace (..)
    -- * Utils
  , resourceVanishedIOError
  ) where

import           Prelude hiding (read)

import           Control.Monad (when)
import           Control.Monad.Class.MonadSTM.Strict
import           Control.Monad.Class.MonadThrow
import           Control.Monad.Class.MonadTime
import           Control.Monad.Class.MonadTimer
import           Control.Tracer (Tracer, traceWith)

import           GHC.IO.Exception

import qualified Data.ByteString.Lazy as BL
import           Data.Int (Int64)

import           Network.Mux.Codec
import           Network.Mux.Time
import           Network.Mux.Timeout
import           Network.Mux.Trace
import           Network.Mux.Types


-- | Message frames passed through the network.
--
data Message =
      MsgClose
    | MsgBytes BL.ByteString
  deriving Message -> Message -> Bool
(Message -> Message -> Bool)
-> (Message -> Message -> Bool) -> Eq Message
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Message -> Message -> Bool
$c/= :: Message -> Message -> Bool
== :: Message -> Message -> Bool
$c== :: Message -> Message -> Bool
Eq


--
-- QueueChannel
--

-- | 'QueueChannel' is the low level bearer used by the simulated snocket.
--
-- Each read / write queues can be closed independently.  Read queue is closed
-- once 'MsgClose' is read from the queue; dually, write queue is closed once
-- 'MsgClose' is written.
--
data QueueChannel m = QueueChannel {
    QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcRead  :: StrictTVar m (Maybe (TQueue m Message)),
    QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcWrite :: StrictTVar m (Maybe (TQueue m Message))
  }

--
-- QueueChannel API
--

readQueueChannel :: ( MonadSTM        m
                    , MonadThrow (STM m)
                    )
                 => QueueChannel m -> m Message
readQueueChannel :: QueueChannel m -> m Message
readQueueChannel QueueChannel { StrictTVar m (Maybe (TQueue m Message))
qcRead :: StrictTVar m (Maybe (TQueue m Message))
qcRead :: forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcRead } =
    STM m Message -> m Message
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Message -> m Message) -> STM m Message -> m Message
forall a b. (a -> b) -> a -> b
$ do
      Maybe Message
a <- StrictTVar m (Maybe (TQueue m Message))
-> STM m (Maybe (TQueue m Message))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Maybe (TQueue m Message))
qcRead STM m (Maybe (TQueue m Message))
-> (Maybe (TQueue m Message) -> STM m (Maybe Message))
-> STM m (Maybe Message)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (TQueue m Message -> STM m Message)
-> Maybe (TQueue m Message) -> STM m (Maybe Message)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse TQueue m Message -> STM m Message
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> STM m a
readTQueue
      case Maybe Message
a of
        Maybe Message
Nothing           -> IOError -> STM m Message
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (String -> String -> IOError
resourceVanishedIOError
                                        String
"AttenuatedChannel.readQueueChannel"
                                         String
"channel vanished")
        Just msg :: Message
msg@Message
MsgClose -> StrictTVar m (Maybe (TQueue m Message))
-> Maybe (TQueue m Message) -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (Maybe (TQueue m Message))
qcRead Maybe (TQueue m Message)
forall a. Maybe a
Nothing
                          STM m () -> STM m Message -> STM m Message
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Message -> STM m Message
forall (m :: * -> *) a. Monad m => a -> m a
return Message
msg
        Just Message
msg          -> Message -> STM m Message
forall (m :: * -> *) a. Monad m => a -> m a
return Message
msg


writeQueueChannel :: MonadSTM m
                  => QueueChannel m -> Message -> m Bool
writeQueueChannel :: QueueChannel m -> Message -> m Bool
writeQueueChannel QueueChannel { StrictTVar m (Maybe (TQueue m Message))
qcWrite :: StrictTVar m (Maybe (TQueue m Message))
qcWrite :: forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcWrite, StrictTVar m (Maybe (TQueue m Message))
qcRead :: StrictTVar m (Maybe (TQueue m Message))
qcRead :: forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcRead } Message
msg =
    STM m Bool -> m Bool
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
      Maybe (TQueue m Message)
mq <- StrictTVar m (Maybe (TQueue m Message))
-> STM m (Maybe (TQueue m Message))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Maybe (TQueue m Message))
qcWrite
      -- Match SO_LINGER set with 0 interval, by not writing MsgClose
      -- and closing this end without any waiting for any ack. It is
      -- simulating a lost message, so if the receiver does not get the packet,
      -- it will get an error the next time it tries to send a packet, closing
      -- its end.
      case Maybe (TQueue m Message)
mq of
        Maybe (TQueue m Message)
Nothing -> do
          StrictTVar m (Maybe (TQueue m Message))
-> Maybe (TQueue m Message) -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (Maybe (TQueue m Message))
qcRead Maybe (TQueue m Message)
forall a. Maybe a
Nothing
          Bool -> STM m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Just TQueue m Message
q  -> do
          case Message
msg of
            Message
MsgClose -> StrictTVar m (Maybe (TQueue m Message))
-> Maybe (TQueue m Message) -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (Maybe (TQueue m Message))
qcWrite Maybe (TQueue m Message)
forall a. Maybe a
Nothing
            Message
_        -> TQueue m Message -> Message -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue TQueue m Message
q Message
msg
          Bool -> STM m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True


newConnectedQueueChannelPair :: ( MonadSTM         m
                                , MonadLabelledSTM m
                                )
                             => STM m ( QueueChannel m
                                      , QueueChannel m )
newConnectedQueueChannelPair :: STM m (QueueChannel m, QueueChannel m)
newConnectedQueueChannelPair = do
    TQueue m Message
read  <- STM m (TQueue m Message)
forall (m :: * -> *) a. MonadSTM m => STM m (TQueue m a)
newTQueue
    TQueue m Message
write <- STM m (TQueue m Message)
forall (m :: * -> *) a. MonadSTM m => STM m (TQueue m a)
newTQueue
    TQueue m Message -> String -> STM m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
TQueue m a -> String -> STM m ()
labelTQueue TQueue m Message
read  String
"qc-queue-read"
    TQueue m Message -> String -> STM m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
TQueue m a -> String -> STM m ()
labelTQueue TQueue m Message
write String
"qc-queue-write"
    QueueChannel m
q  <- StrictTVar m (Maybe (TQueue m Message))
-> StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m
forall (m :: * -> *).
StrictTVar m (Maybe (TQueue m Message))
-> StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m
QueueChannel (StrictTVar m (Maybe (TQueue m Message))
 -> StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
-> STM
     m (StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (TQueue m Message)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar (TQueue m Message -> Maybe (TQueue m Message)
forall a. a -> Maybe a
Just TQueue m Message
read)
                       STM m (StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
-> STM m (QueueChannel m)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe (TQueue m Message)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar (TQueue m Message -> Maybe (TQueue m Message)
forall a. a -> Maybe a
Just TQueue m Message
write)
    StrictTVar m (Maybe (TQueue m Message)) -> String -> STM m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
StrictTVar m a -> String -> STM m ()
labelTVar (QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcRead QueueChannel m
q)  String
"qc-read"
    StrictTVar m (Maybe (TQueue m Message)) -> String -> STM m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
StrictTVar m a -> String -> STM m ()
labelTVar (QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcWrite QueueChannel m
q) String
"qc-write"
    QueueChannel m
q' <- StrictTVar m (Maybe (TQueue m Message))
-> StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m
forall (m :: * -> *).
StrictTVar m (Maybe (TQueue m Message))
-> StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m
QueueChannel (StrictTVar m (Maybe (TQueue m Message))
 -> StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
-> STM
     m (StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (TQueue m Message)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar (TQueue m Message -> Maybe (TQueue m Message)
forall a. a -> Maybe a
Just TQueue m Message
write)
                       STM m (StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
-> STM m (QueueChannel m)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe (TQueue m Message)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar (TQueue m Message -> Maybe (TQueue m Message)
forall a. a -> Maybe a
Just TQueue m Message
read)
    StrictTVar m (Maybe (TQueue m Message)) -> String -> STM m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
StrictTVar m a -> String -> STM m ()
labelTVar (QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcRead QueueChannel m
q')  String
"qc-read'"
    StrictTVar m (Maybe (TQueue m Message)) -> String -> STM m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
StrictTVar m a -> String -> STM m ()
labelTVar (QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcWrite QueueChannel m
q') String
"qc-write'"
    (QueueChannel m, QueueChannel m)
-> STM m (QueueChannel m, QueueChannel m)
forall (m :: * -> *) a. Monad m => a -> m a
return (QueueChannel m
q, QueueChannel m
q')


--
-- AttenuatedChannel
--


-- | An AttenuatedChannel supports:
--
-- - attenuation applied after reading a message from 'QueueChannel';
-- - two-way close handshake with 120s timeout.  Read side is closed as soon as
--   an internal 'MsgClose' is received, write side has to be closed explicitly.
--
data AttenuatedChannel m = AttenuatedChannel {
    AttenuatedChannel m -> m ByteString
acRead  :: m BL.ByteString,
    AttenuatedChannel m -> ByteString -> m ()
acWrite :: BL.ByteString -> m (),
    AttenuatedChannel m -> m ()
acClose :: m ()
  }



data SuccessOrFailure = Success | Failure

type Size = Int64

-- | Attenuation of a channel.
--
data Attenuation = Attenuation {
    Attenuation -> Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation  :: Time -> Size -> ( DiffTime,
                                           SuccessOrFailure ),
    Attenuation -> Maybe Int
aWriteAttenuation :: Maybe Int
  }


-- | Make a 'AttenuatedChannel' from a 'QueueChannel'.
--
newAttenuatedChannel :: forall m.
                        ( MonadSTM        m
                        , MonadTime       m
                        , MonadTimer      m
                        , MonadThrow      m
                        , MonadThrow (STM m)
                        )
                     => Tracer m AttenuatedChannelTrace
                     -> Attenuation
                     -> QueueChannel m
                     -> STM m (AttenuatedChannel m)
newAttenuatedChannel :: Tracer m AttenuatedChannelTrace
-> Attenuation -> QueueChannel m -> STM m (AttenuatedChannel m)
newAttenuatedChannel Tracer m AttenuatedChannelTrace
tr Attenuation { Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation :: Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation :: Attenuation -> Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation,
                                      Maybe Int
aWriteAttenuation :: Maybe Int
aWriteAttenuation :: Attenuation -> Maybe Int
aWriteAttenuation } QueueChannel m
qc = do
    StrictTVar m Int
writeCounterVar <- Int -> STM m (StrictTVar m Int)
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar Int
0
    AttenuatedChannel m -> STM m (AttenuatedChannel m)
forall (m :: * -> *) a. Monad m => a -> m a
return AttenuatedChannel :: forall (m :: * -> *).
m ByteString -> (ByteString -> m ()) -> m () -> AttenuatedChannel m
AttenuatedChannel { m ByteString
acRead :: m ByteString
acRead :: m ByteString
acRead
                             , acWrite :: ByteString -> m ()
acWrite = StrictTVar m Int -> ByteString -> m ()
acWrite StrictTVar m Int
writeCounterVar
                             , m ()
acClose :: m ()
acClose :: m ()
acClose
                             }
  where
    acRead :: m BL.ByteString
    acRead :: m ByteString
acRead = do
      Message
msg <- QueueChannel m -> m Message
forall (m :: * -> *).
(MonadSTM m, MonadThrow (STM m)) =>
QueueChannel m -> m Message
readQueueChannel QueueChannel m
qc
      Time
t <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
      case Message
msg of
        -- match the 'Bearer.Snocket' behaviour and throw 'MuxError'
        -- when null byte is received from the network.
        Message
MsgClose -> do
          case Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation Time
t Size
1 of
            ( DiffTime
d, SuccessOrFailure
_       ) -> Tracer m AttenuatedChannelTrace -> AttenuatedChannelTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m AttenuatedChannelTrace
tr AttenuatedChannelTrace
AttChannRemoteClose
                           m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
d
                           m () -> m ByteString -> m ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MuxError -> m ByteString
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (MuxErrorType -> String -> MuxError
MuxError MuxErrorType
MuxBearerClosed
                                                String
"closed when reading data")
        MsgBytes ByteString
bs ->
          case Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation Time
t (ByteString -> Size
BL.length ByteString
bs) of
            ( DiffTime
d, SuccessOrFailure
Success ) -> DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
d
                           m () -> m ByteString -> m ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ByteString -> m ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs

            ( DiffTime
d, SuccessOrFailure
Failure ) -> DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
d
                           m () -> m ByteString -> m ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IOError -> m ByteString
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> String -> IOError
resourceVanishedIOError
                                        String
"AttenuatedChannel.read"
                                        String
"read attenuation")

    acWrite :: StrictTVar m Int
            -> BL.ByteString
            -> m ()
    acWrite :: StrictTVar m Int -> ByteString -> m ()
acWrite StrictTVar m Int
writeCounterVar ByteString
bs = do
      Int
wCount <- STM m Int -> m Int
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Int -> m Int) -> STM m Int -> m Int
forall a b. (a -> b) -> a -> b
$ do
        StrictTVar m Int -> (Int -> Int) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m Int
writeCounterVar Int -> Int
forall a. Enum a => a -> a
succ
        StrictTVar m Int -> STM m Int
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m Int
writeCounterVar
      case Maybe Int
aWriteAttenuation of
        Just Int
limit  | Int
wCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
limit
                   -> IOError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (IOError -> m ()) -> IOError -> m ()
forall a b. (a -> b) -> a -> b
$
                        String -> String -> IOError
resourceVanishedIOError
                          String
"AttenuatedChannel.write"
                          String
"write limit reached (write attenuation)"
        Maybe Int
_          -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

      Bool
sent <- QueueChannel m -> Message -> m Bool
forall (m :: * -> *).
MonadSTM m =>
QueueChannel m -> Message -> m Bool
writeQueueChannel QueueChannel m
qc (ByteString -> Message
MsgBytes ByteString
bs)
      Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
sent) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
        IOError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> String -> IOError
resourceVanishedIOError String
"AttenuatedChannel.write" String
"")

    -- acClose simulates SO_LINGER TCP option with interval set to 0.
    --
    -- It is assumed that the MsgClose is lost, where in this case
    -- we only close the local end. When the remote end gets
    -- used it will be closed.
    --
    acClose :: m ()
    acClose :: m ()
acClose = do
      -- send 'MsgClose' and close the underlying channel
      Bool
sent <- QueueChannel m -> Message -> m Bool
forall (m :: * -> *).
MonadSTM m =>
QueueChannel m -> Message -> m Bool
writeQueueChannel QueueChannel m
qc Message
MsgClose
      Tracer m AttenuatedChannelTrace -> AttenuatedChannelTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m AttenuatedChannelTrace
tr (Bool -> AttenuatedChannelTrace
AttChannLocalClose Bool
sent)


-- | Create a pair of connected 'AttenuatedChannel's.
--
newConnectedAttenuatedChannelPair
    :: forall m.
       ( MonadLabelledSTM m
       , MonadTime        m
       , MonadTimer       m
       , MonadThrow       m
       , MonadThrow  (STM m)
       )
    => Tracer m AttenuatedChannelTrace
    -> Tracer m AttenuatedChannelTrace
    -> Attenuation
    -> Attenuation
    -> STM m (AttenuatedChannel m, AttenuatedChannel m)
newConnectedAttenuatedChannelPair :: Tracer m AttenuatedChannelTrace
-> Tracer m AttenuatedChannelTrace
-> Attenuation
-> Attenuation
-> STM m (AttenuatedChannel m, AttenuatedChannel m)
newConnectedAttenuatedChannelPair Tracer m AttenuatedChannelTrace
tr Tracer m AttenuatedChannelTrace
tr' Attenuation
attenuation Attenuation
attenuation' = do
    (QueueChannel m
c, QueueChannel m
c') <- STM m (QueueChannel m, QueueChannel m)
forall (m :: * -> *).
(MonadSTM m, MonadLabelledSTM m) =>
STM m (QueueChannel m, QueueChannel m)
newConnectedQueueChannelPair
    AttenuatedChannel m
b  <- Tracer m AttenuatedChannelTrace
-> Attenuation -> QueueChannel m -> STM m (AttenuatedChannel m)
forall (m :: * -> *).
(MonadSTM m, MonadTime m, MonadTimer m, MonadThrow m,
 MonadThrow (STM m)) =>
Tracer m AttenuatedChannelTrace
-> Attenuation -> QueueChannel m -> STM m (AttenuatedChannel m)
newAttenuatedChannel Tracer m AttenuatedChannelTrace
tr  Attenuation
attenuation  QueueChannel m
c
    AttenuatedChannel m
b' <- Tracer m AttenuatedChannelTrace
-> Attenuation -> QueueChannel m -> STM m (AttenuatedChannel m)
forall (m :: * -> *).
(MonadSTM m, MonadTime m, MonadTimer m, MonadThrow m,
 MonadThrow (STM m)) =>
Tracer m AttenuatedChannelTrace
-> Attenuation -> QueueChannel m -> STM m (AttenuatedChannel m)
newAttenuatedChannel Tracer m AttenuatedChannelTrace
tr' Attenuation
attenuation' QueueChannel m
c'
    (AttenuatedChannel m, AttenuatedChannel m)
-> STM m (AttenuatedChannel m, AttenuatedChannel m)
forall (m :: * -> *) a. Monad m => a -> m a
return (AttenuatedChannel m
b, AttenuatedChannel m
b')

attenuationChannelAsMuxBearer :: forall m.
                                 ( MonadThrow         m
                                 , MonadMonotonicTime m
                                 )
                              => SDUSize
                              -> DiffTime
                              -> Tracer m MuxTrace
                              -> AttenuatedChannel m
                              -> MuxBearer m
attenuationChannelAsMuxBearer :: SDUSize
-> DiffTime
-> Tracer m MuxTrace
-> AttenuatedChannel m
-> MuxBearer m
attenuationChannelAsMuxBearer SDUSize
sduSize DiffTime
sduTimeout Tracer m MuxTrace
muxTracer AttenuatedChannel m
chan =
    MuxBearer :: forall (m :: * -> *).
(TimeoutFn m -> MuxSDU -> m Time)
-> (TimeoutFn m -> m (MuxSDU, Time)) -> SDUSize -> MuxBearer m
MuxBearer {
      read :: TimeoutFn m -> m (MuxSDU, Time)
read    = TimeoutFn m -> m (MuxSDU, Time)
readMux,
      write :: TimeoutFn m -> MuxSDU -> m Time
write   = TimeoutFn m -> MuxSDU -> m Time
writeMux,
      SDUSize
sduSize :: SDUSize
sduSize :: SDUSize
sduSize
    }
  where
    readMux :: TimeoutFn m -> m (MuxSDU, Time)
    readMux :: TimeoutFn m -> m (MuxSDU, Time)
readMux TimeoutFn m
timeoutFn = do
      Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
muxTracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MuxTrace
MuxTraceRecvHeaderStart
      Maybe ByteString
mbuf <- DiffTime -> m ByteString -> m (Maybe ByteString)
TimeoutFn m
timeoutFn DiffTime
sduTimeout (m ByteString -> m (Maybe ByteString))
-> m ByteString -> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ AttenuatedChannel m -> m ByteString
forall (m :: * -> *). AttenuatedChannel m -> m ByteString
acRead AttenuatedChannel m
chan
      case Maybe ByteString
mbuf of
        Maybe ByteString
Nothing -> do
          Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
muxTracer MuxTrace
MuxTraceSDUReadTimeoutException
          MuxError -> m (MuxSDU, Time)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (MuxErrorType -> String -> MuxError
MuxError MuxErrorType
MuxSDUReadTimeout String
"Mux SDU Timeout")

        Just ByteString
buf -> do
          let (ByteString
hbuf, ByteString
payload) = Size -> ByteString -> (ByteString, ByteString)
BL.splitAt Size
8 ByteString
buf
          case ByteString -> Either MuxError MuxSDU
decodeMuxSDU ByteString
hbuf of
            Left  MuxError
e      -> MuxError -> m (MuxSDU, Time)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO MuxError
e
            Right MuxSDU
muxsdu -> do
              let header :: MuxSDUHeader
header = MuxSDU -> MuxSDUHeader
msHeader MuxSDU
muxsdu
              Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
muxTracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MuxSDUHeader -> MuxTrace
MuxTraceRecvHeaderEnd MuxSDUHeader
header
              Time
ts <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
              Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
muxTracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MuxSDUHeader -> Time -> MuxTrace
MuxTraceRecvDeltaQObservation MuxSDUHeader
header Time
ts
              (MuxSDU, Time) -> m (MuxSDU, Time)
forall (m :: * -> *) a. Monad m => a -> m a
return (MuxSDU
muxsdu {msBlob :: ByteString
msBlob = ByteString
payload}, Time
ts)

    writeMux :: TimeoutFn m -> MuxSDU -> m Time
    writeMux :: TimeoutFn m -> MuxSDU -> m Time
writeMux TimeoutFn m
_ MuxSDU
sdu = do
      Time
ts <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
      let ts32 :: Word32
ts32 = Time -> Word32
timestampMicrosecondsLow32Bits Time
ts
          sdu' :: MuxSDU
sdu' = MuxSDU -> RemoteClockModel -> MuxSDU
setTimestamp MuxSDU
sdu (Word32 -> RemoteClockModel
RemoteClockModel Word32
ts32)
          buf :: ByteString
buf  = MuxSDU -> ByteString
encodeMuxSDU MuxSDU
sdu'
      Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
muxTracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MuxSDUHeader -> MuxTrace
MuxTraceSendStart (MuxSDU -> MuxSDUHeader
msHeader MuxSDU
sdu')
      AttenuatedChannel m -> ByteString -> m ()
forall (m :: * -> *). AttenuatedChannel m -> ByteString -> m ()
acWrite AttenuatedChannel m
chan ByteString
buf

      Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
muxTracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MuxTrace
MuxTraceSendEnd
      Time -> m Time
forall (m :: * -> *) a. Monad m => a -> m a
return Time
ts

--
-- Trace
--

data AttenuatedChannelTrace =
    AttChannLocalClose Bool
  | AttChannRemoteClose
  deriving Int -> AttenuatedChannelTrace -> ShowS
[AttenuatedChannelTrace] -> ShowS
AttenuatedChannelTrace -> String
(Int -> AttenuatedChannelTrace -> ShowS)
-> (AttenuatedChannelTrace -> String)
-> ([AttenuatedChannelTrace] -> ShowS)
-> Show AttenuatedChannelTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [AttenuatedChannelTrace] -> ShowS
$cshowList :: [AttenuatedChannelTrace] -> ShowS
show :: AttenuatedChannelTrace -> String
$cshow :: AttenuatedChannelTrace -> String
showsPrec :: Int -> AttenuatedChannelTrace -> ShowS
$cshowsPrec :: Int -> AttenuatedChannelTrace -> ShowS
Show

--
-- Utils
--

resourceVanishedIOError :: String -> String -> IOError
resourceVanishedIOError :: String -> String -> IOError
resourceVanishedIOError String
ioe_location String
ioe_description = IOError :: Maybe Handle
-> IOErrorType
-> String
-> String
-> Maybe CInt
-> Maybe String
-> IOError
IOError
  { ioe_handle :: Maybe Handle
ioe_handle      = Maybe Handle
forall a. Maybe a
Nothing
  , ioe_type :: IOErrorType
ioe_type        = IOErrorType
ResourceVanished
  , String
ioe_location :: String
ioe_location :: String
ioe_location
  , String
ioe_description :: String
ioe_description :: String
ioe_description
  , ioe_errno :: Maybe CInt
ioe_errno       = Maybe CInt
forall a. Maybe a
Nothing
  , ioe_filename :: Maybe String
ioe_filename    = Maybe String
forall a. Maybe a
Nothing
  }