{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.Mux.Bearer.Queues ( QueueChannel (..) , queueChannelAsMuxBearer ) where import qualified Data.ByteString.Lazy as BL import Control.Concurrent.Class.MonadSTM.Strict import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Tracer import qualified Network.Mux.Codec as Mx import Network.Mux.Time as Mx import qualified Network.Mux.Timeout as Mx import qualified Network.Mux.Trace as Mx import Network.Mux.Types (MuxBearer) import qualified Network.Mux.Types as Mx data QueueChannel m = QueueChannel { forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString readQueue :: StrictTBQueue m BL.ByteString, forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString writeQueue :: StrictTBQueue m BL.ByteString } queueChannelAsMuxBearer :: forall m. ( MonadSTM m , MonadMonotonicTime m , MonadThrow m ) => Mx.SDUSize -> Tracer m Mx.MuxTrace -> QueueChannel m -> MuxBearer m queueChannelAsMuxBearer :: forall (m :: * -> *). (MonadSTM m, MonadMonotonicTime m, MonadThrow m) => SDUSize -> Tracer m MuxTrace -> QueueChannel m -> MuxBearer m queueChannelAsMuxBearer SDUSize sduSize Tracer m MuxTrace tracer QueueChannel { StrictTBQueue m ByteString writeQueue :: StrictTBQueue m ByteString writeQueue :: forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString writeQueue, StrictTBQueue m ByteString readQueue :: StrictTBQueue m ByteString readQueue :: forall (m :: * -> *). QueueChannel m -> StrictTBQueue m ByteString readQueue } = do Mx.MuxBearer { read :: TimeoutFn m -> m (MuxSDU, Time) Mx.read = TimeoutFn m -> m (MuxSDU, Time) readMux, write :: TimeoutFn m -> MuxSDU -> m Time Mx.write = TimeoutFn m -> MuxSDU -> m Time writeMux, sduSize :: SDUSize Mx.sduSize = SDUSize sduSize } where readMux :: Mx.TimeoutFn m -> m (Mx.MuxSDU, Time) readMux :: TimeoutFn m -> m (MuxSDU, Time) readMux TimeoutFn m _ = do forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer forall a b. (a -> b) -> a -> b $ MuxTrace Mx.MuxTraceRecvHeaderStart ByteString buf <- forall (m :: * -> *) a. (MonadSTM m, HasCallStack) => STM m a -> m a atomically forall a b. (a -> b) -> a -> b $ forall (m :: * -> *) a. MonadSTM m => StrictTBQueue m a -> STM m a readTBQueue StrictTBQueue m ByteString readQueue let (ByteString hbuf, ByteString payload) = Int64 -> ByteString -> (ByteString, ByteString) BL.splitAt Int64 8 ByteString buf case ByteString -> Either MuxError MuxSDU Mx.decodeMuxSDU ByteString hbuf of Left MuxError e -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a throwIO MuxError e Right MuxSDU header -> do forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer forall a b. (a -> b) -> a -> b $ MuxSDUHeader -> MuxTrace Mx.MuxTraceRecvHeaderEnd (MuxSDU -> MuxSDUHeader Mx.msHeader MuxSDU header) Time ts <- forall (m :: * -> *). MonadMonotonicTime m => m Time getMonotonicTime forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer forall a b. (a -> b) -> a -> b $ MuxSDUHeader -> Time -> MuxTrace Mx.MuxTraceRecvDeltaQObservation (MuxSDU -> MuxSDUHeader Mx.msHeader MuxSDU header) Time ts forall (m :: * -> *) a. Monad m => a -> m a return (MuxSDU header {msBlob :: ByteString Mx.msBlob = ByteString payload}, Time ts) writeMux :: Mx.TimeoutFn m -> Mx.MuxSDU -> m Time writeMux :: TimeoutFn m -> MuxSDU -> m Time writeMux TimeoutFn m _ MuxSDU sdu = do Time ts <- forall (m :: * -> *). MonadMonotonicTime m => m Time getMonotonicTime let ts32 :: Word32 ts32 = Time -> Word32 Mx.timestampMicrosecondsLow32Bits Time ts sdu' :: MuxSDU sdu' = MuxSDU -> RemoteClockModel -> MuxSDU Mx.setTimestamp MuxSDU sdu (Word32 -> RemoteClockModel Mx.RemoteClockModel Word32 ts32) buf :: ByteString buf = MuxSDU -> ByteString Mx.encodeMuxSDU MuxSDU sdu' forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer forall a b. (a -> b) -> a -> b $ MuxSDUHeader -> MuxTrace Mx.MuxTraceSendStart (MuxSDU -> MuxSDUHeader Mx.msHeader MuxSDU sdu') forall (m :: * -> *) a. (MonadSTM m, HasCallStack) => STM m a -> m a atomically forall a b. (a -> b) -> a -> b $ forall (m :: * -> *) a. MonadSTM m => StrictTBQueue m a -> a -> STM m () writeTBQueue StrictTBQueue m ByteString writeQueue ByteString buf forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer forall a b. (a -> b) -> a -> b $ MuxTrace Mx.MuxTraceSendEnd forall (m :: * -> *) a. Monad m => a -> m a return Time ts