{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TypeInType #-}
{-# LANGUAGE EmptyCase #-}

-- | Actions for running 'Peer's with a 'Driver'
--
module Network.TypedProtocol.Driver (

  -- * Introduction
  -- $intro

  -- * Driver interface
  Driver(..),
  SomeMessage(..),

  -- * Normal peers
  runPeerWithDriver,

  -- * Pipelined peers
  runPipelinedPeerWithDriver,
  ) where

import Data.Void (Void)

import Network.TypedProtocol.Core
import Network.TypedProtocol.Pipelined

import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork


-- $intro
--
-- A 'Peer' is a particular implementation of an agent that engages in a
-- typed protocol. To actualy run one we need a source and sink for the typed
-- protocol messages. These are provided by a 'Channel' and a 'Codec'. The
-- 'Channel' represents one end of an untyped duplex message transport, and
-- the 'Codec' handles conversion between the typed protocol messages and
-- the untyped channel.
--
-- So given the 'Peer' and a compatible 'Codec' and 'Channel' we can run the
-- peer in some appropriate monad. The peer and codec have to agree on
-- the same protocol and role in that protocol. The codec and channel have to
-- agree on the same untyped medium, e.g. text or bytes. All three have to
-- agree on the same monad in which they will run.
--
-- This module provides drivers for normal and pipelined peers. There is
-- very little policy involved here so typically it should be possible to
-- use these drivers, and customise things by adjusting the peer, or codec
-- or channel.
--
-- It is of course possible to write custom drivers and the code for these ones
-- may provide a useful starting point. The 'runDecoder' function may be a
-- helpful utility for use in custom drives.
--


--
-- Driver interface
--

data Driver ps dstate m =
        Driver {
          Driver ps dstate m
-> forall (pr :: PeerRole) (st :: ps) (st' :: ps).
   PeerHasAgency pr st -> Message ps st st' -> m ()
sendMessage :: forall (pr :: PeerRole) (st :: ps) (st' :: ps).
                         PeerHasAgency pr st
                      -> Message ps st st'
                      -> m ()

        , Driver ps dstate m
-> forall (pr :: PeerRole) (st :: ps).
   PeerHasAgency pr st -> dstate -> m (SomeMessage st, dstate)
recvMessage :: forall (pr :: PeerRole) (st :: ps).
                         PeerHasAgency pr st
                      -> dstate
                      -> m (SomeMessage st, dstate)

        , Driver ps dstate m -> dstate
startDState :: dstate
        }

-- | When decoding a 'Message' we only know the expected \"from\" state. We
-- cannot know the \"to\" state as this depends on the message we decode. To
-- resolve this we use the 'SomeMessage' wrapper which uses an existential
-- type to hide the \"to"\ state.
--
data SomeMessage (st :: ps) where
     SomeMessage :: Message ps st st' -> SomeMessage st


--
-- Running normal non-pipelined peers
--

-- | Run a peer with the given driver.
--
-- This runs the peer to completion (if the protocol allows for termination).
--
runPeerWithDriver
  :: forall ps (st :: ps) pr dstate m a.
     Monad m
  => Driver ps dstate m
  -> Peer ps pr st m a
  -> dstate
  -> m (a, dstate)
runPeerWithDriver :: Driver ps dstate m -> Peer ps pr st m a -> dstate -> m (a, dstate)
runPeerWithDriver Driver{forall (pr :: PeerRole) (st :: ps) (st' :: ps).
PeerHasAgency pr st -> Message ps st st' -> m ()
sendMessage :: forall (pr :: PeerRole) (st :: ps) (st' :: ps).
PeerHasAgency pr st -> Message ps st st' -> m ()
sendMessage :: forall ps dstate (m :: * -> *).
Driver ps dstate m
-> forall (pr :: PeerRole) (st :: ps) (st' :: ps).
   PeerHasAgency pr st -> Message ps st st' -> m ()
sendMessage, forall (pr :: PeerRole) (st :: ps).
PeerHasAgency pr st -> dstate -> m (SomeMessage st, dstate)
recvMessage :: forall (pr :: PeerRole) (st :: ps).
PeerHasAgency pr st -> dstate -> m (SomeMessage st, dstate)
recvMessage :: forall ps dstate (m :: * -> *).
Driver ps dstate m
-> forall (pr :: PeerRole) (st :: ps).
   PeerHasAgency pr st -> dstate -> m (SomeMessage st, dstate)
recvMessage} =
    (dstate -> Peer ps pr st m a -> m (a, dstate))
-> Peer ps pr st m a -> dstate -> m (a, dstate)
forall a b c. (a -> b -> c) -> b -> a -> c
flip dstate -> Peer ps pr st m a -> m (a, dstate)
forall (st' :: ps). dstate -> Peer ps pr st' m a -> m (a, dstate)
go
  where
    go :: forall st'.
          dstate
       -> Peer ps pr st' m a
       -> m (a, dstate)
    go :: dstate -> Peer ps pr st' m a -> m (a, dstate)
go dstate
dstate (Effect m (Peer ps pr st' m a)
k) = m (Peer ps pr st' m a)
k m (Peer ps pr st' m a)
-> (Peer ps pr st' m a -> m (a, dstate)) -> m (a, dstate)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= dstate -> Peer ps pr st' m a -> m (a, dstate)
forall (st' :: ps). dstate -> Peer ps pr st' m a -> m (a, dstate)
go dstate
dstate
    go dstate
dstate (Done NobodyHasAgency st'
_ a
x) = (a, dstate) -> m (a, dstate)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
x, dstate
dstate)

    go dstate
dstate (Yield WeHaveAgency pr st'
stok Message ps st' st'
msg Peer ps pr st' m a
k) = do
      WeHaveAgency pr st' -> Message ps st' st' -> m ()
forall (pr :: PeerRole) (st :: ps) (st' :: ps).
PeerHasAgency pr st -> Message ps st st' -> m ()
sendMessage WeHaveAgency pr st'
stok Message ps st' st'
msg
      dstate -> Peer ps pr st' m a -> m (a, dstate)
forall (st' :: ps). dstate -> Peer ps pr st' m a -> m (a, dstate)
go dstate
dstate Peer ps pr st' m a
k

    go dstate
dstate (Await TheyHaveAgency pr st'
stok forall (st' :: ps). Message ps st' st' -> Peer ps pr st' m a
k) = do
      (SomeMessage Message ps st' st'
msg, dstate
dstate') <- TheyHaveAgency pr st' -> dstate -> m (SomeMessage st', dstate)
forall (pr :: PeerRole) (st :: ps).
PeerHasAgency pr st -> dstate -> m (SomeMessage st, dstate)
recvMessage TheyHaveAgency pr st'
stok dstate
dstate
      dstate -> Peer ps pr st' m a -> m (a, dstate)
forall (st' :: ps). dstate -> Peer ps pr st' m a -> m (a, dstate)
go dstate
dstate' (Message ps st' st' -> Peer ps pr st' m a
forall (st' :: ps). Message ps st' st' -> Peer ps pr st' m a
k Message ps st' st'
msg)

    -- Note that we do not complain about trailing data in any case, neither
    -- the 'Await' nor 'Done' cases.
    --
    -- We want to be able to use a non-pipelined peer in communication with
    -- a pipelined peer, and in that case the non-pipelined peer will in
    -- general see trailing data after an 'Await' which is the next incoming
    -- message.
    --
    -- Likewise for 'Done', we want to allow for one protocols to be run after
    -- another on the same channel. It would be legal for the opening message
    -- of the next protocol arrives in the same data chunk as the final
    -- message of the previous protocol.


--
-- Running pipelined peers
--

-- | Run a pipelined peer with the given driver.
--
-- This runs the peer to completion (if the protocol allows for termination).
--
-- Unlike normal peers, running pipelined peers rely on concurrency, hence the
-- 'MonadAsync' constraint.
--
runPipelinedPeerWithDriver
  :: forall ps (st :: ps) pr dstate m a.
     MonadAsync m
  => Driver ps dstate m
  -> PeerPipelined ps pr st m a
  -> dstate
  -> m (a, dstate)
runPipelinedPeerWithDriver :: Driver ps dstate m
-> PeerPipelined ps pr st m a -> dstate -> m (a, dstate)
runPipelinedPeerWithDriver Driver ps dstate m
driver (PeerPipelined PeerSender ps pr st 'Z c m a
peer) dstate
dstate0 = do
    TQueue_ (STM m) (ReceiveHandler dstate ps pr m c)
receiveQueue <- STM m (TQueue_ (STM m) (ReceiveHandler dstate ps pr m c))
-> m (TQueue_ (STM m) (ReceiveHandler dstate ps pr m c))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (TQueue_ (STM m) (ReceiveHandler dstate ps pr m c))
forall (stm :: * -> *) a. MonadSTMTx stm => stm (TQueue_ stm a)
newTQueue
    TQueue_ (STM m) (c, dstate)
collectQueue <- STM m (TQueue_ (STM m) (c, dstate))
-> m (TQueue_ (STM m) (c, dstate))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (TQueue_ (STM m) (c, dstate))
forall (stm :: * -> *) a. MonadSTMTx stm => stm (TQueue_ stm a)
newTQueue
    (a, dstate)
a <- TQueue_ (STM m) (ReceiveHandler dstate ps pr m c)
-> TQueue_ (STM m) (c, dstate) -> Driver ps dstate m -> m Void
forall ps (pr :: PeerRole) dstate (m :: * -> *) c.
(MonadSTM m, MonadThread m) =>
TQueue m (ReceiveHandler dstate ps pr m c)
-> TQueue m (c, dstate) -> Driver ps dstate m -> m Void
runPipelinedPeerReceiverQueue TQueue_ (STM m) (ReceiveHandler dstate ps pr m c)
receiveQueue TQueue_ (STM m) (c, dstate)
collectQueue Driver ps dstate m
driver
           m Void -> m (a, dstate) -> m (a, dstate)
forall x. m Void -> m x -> m x
`withAsyncLoop`
         TQueue_ (STM m) (ReceiveHandler dstate ps pr m c)
-> TQueue_ (STM m) (c, dstate)
-> Driver ps dstate m
-> PeerSender ps pr st 'Z c m a
-> dstate
-> m (a, dstate)
forall ps (st :: ps) (pr :: PeerRole) dstate c (m :: * -> *) a.
(MonadSTM m, MonadThread m) =>
TQueue m (ReceiveHandler dstate ps pr m c)
-> TQueue m (c, dstate)
-> Driver ps dstate m
-> PeerSender ps pr st 'Z c m a
-> dstate
-> m (a, dstate)
runPipelinedPeerSender        TQueue_ (STM m) (ReceiveHandler dstate ps pr m c)
receiveQueue TQueue_ (STM m) (c, dstate)
collectQueue Driver ps dstate m
driver
                                       PeerSender ps pr st 'Z c m a
peer dstate
dstate0
    (a, dstate) -> m (a, dstate)
forall (m :: * -> *) a. Monad m => a -> m a
return (a, dstate)
a

  where
    withAsyncLoop :: m Void -> m x -> m x
    withAsyncLoop :: m Void -> m x -> m x
withAsyncLoop m Void
left m x
right = do
      -- race will throw if either of the threads throw
      Either Void x
res <- m Void -> m x -> m (Either Void x)
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> m b -> m (Either a b)
race m Void
left m x
right
      case Either Void x
res of
        Left Void
v  -> case Void
v of {}
        Right x
a -> x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
a

data ReceiveHandler dstate ps pr m c where
     ReceiveHandler :: MaybeDState dstate n
                    -> PeerReceiver ps pr (st :: ps) (st' :: ps) m c
                    -> ReceiveHandler dstate ps pr m c

-- | The handling of trailing data here is quite subtle. Trailing data is data
-- we have read from the channel but the decoder has told us that it comes
-- after the message we decoded. So it potentially belongs to the next message
-- to decode.
--
-- We read from the channel on both the 'runPipelinedPeerSender' and the
-- 'runPipelinedPeerReceiver', and we synchronise our use of trailing data
-- between the two. The scheme for the sender and receiver threads using the
-- channel ensures that only one can use it at once:
--
-- * When there are zero outstanding pipelined receiver handlers then the
--   sending side is allowed to access the channel directly (to do synchronous
--   yield\/awaits). Correspondingly the receiver side is idle and not
--   accessing the channel.
-- * When there are non-zero outstanding pipelined receiver handlers then
--   the receiver side can access the channel, but the sending side is not
--   permitted to do operations that access the channel.
--
-- So the only times we need to synchronise the trailing data are the times
-- when the right to access the channel passes from one side to the other.
--
-- The transitions are as follows:
--
-- * There having been Zero outstanding pipelined requests there is now a
--   new pipelined yield. In this case we must pass the trailing data from
--   the sender thread to the receiver thread. We pass it with the
--   'ReceiveHandler'.
--
-- * When the last pipelined request is collected. In this case we must pass
--   the trailing data from the receiver thread to the sender thread. We pass
--   it with the collected result.
--
-- Note that the receiver thread cannot know what the last pipelined request
-- is, that is tracked on the sender side. So the receiver thread always
-- returns the trailing data with every collected result. It is for the sender
-- thread to decide if it needs to use it. For the same reason, the receiver
-- thread ends up retaining the last trailing data (as well as passing it to
-- the sender). So correspondingly when new trailing data is passed to the
-- receiver thread, it simply overrides any trailing data it already had, since
-- we now know that copy to be stale.
--
data MaybeDState dstate (n :: N) where
     HasDState :: dstate -> MaybeDState dstate Z
     NoDState  ::           MaybeDState dstate (S n)


runPipelinedPeerSender
  :: forall ps (st :: ps) pr dstate c m a.
     ( MonadSTM    m
     , MonadThread m
     )
  => TQueue m (ReceiveHandler dstate ps pr m c)
  -> TQueue m (c, dstate)
  -> Driver ps dstate m
  -> PeerSender ps pr st Z c m a
  -> dstate
  -> m (a, dstate)
runPipelinedPeerSender :: TQueue m (ReceiveHandler dstate ps pr m c)
-> TQueue m (c, dstate)
-> Driver ps dstate m
-> PeerSender ps pr st 'Z c m a
-> dstate
-> m (a, dstate)
runPipelinedPeerSender TQueue m (ReceiveHandler dstate ps pr m c)
receiveQueue TQueue m (c, dstate)
collectQueue
                       Driver{forall (pr :: PeerRole) (st :: ps) (st' :: ps).
PeerHasAgency pr st -> Message ps st st' -> m ()
sendMessage :: forall (pr :: PeerRole) (st :: ps) (st' :: ps).
PeerHasAgency pr st -> Message ps st st' -> m ()
sendMessage :: forall ps dstate (m :: * -> *).
Driver ps dstate m
-> forall (pr :: PeerRole) (st :: ps) (st' :: ps).
   PeerHasAgency pr st -> Message ps st st' -> m ()
sendMessage, forall (pr :: PeerRole) (st :: ps).
PeerHasAgency pr st -> dstate -> m (SomeMessage st, dstate)
recvMessage :: forall (pr :: PeerRole) (st :: ps).
PeerHasAgency pr st -> dstate -> m (SomeMessage st, dstate)
recvMessage :: forall ps dstate (m :: * -> *).
Driver ps dstate m
-> forall (pr :: PeerRole) (st :: ps).
   PeerHasAgency pr st -> dstate -> m (SomeMessage st, dstate)
recvMessage}
                       PeerSender ps pr st 'Z c m a
peer dstate
dstate0 = do
    ThreadId m
threadId <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
    ThreadId m -> String -> m ()
forall (m :: * -> *). MonadThread m => ThreadId m -> String -> m ()
labelThread ThreadId m
threadId String
"pipeliend-peer-seneder"
    Nat 'Z
-> MaybeDState dstate 'Z
-> PeerSender ps pr st 'Z c m a
-> m (a, dstate)
forall (st' :: ps) (n :: N).
Nat n
-> MaybeDState dstate n
-> PeerSender ps pr st' n c m a
-> m (a, dstate)
go Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero (dstate -> MaybeDState dstate 'Z
forall dstate. dstate -> MaybeDState dstate 'Z
HasDState dstate
dstate0) PeerSender ps pr st 'Z c m a
peer
  where
    go :: forall st' n.
          Nat n
       -> MaybeDState dstate n
       -> PeerSender ps pr st' n c m a
       -> m (a, dstate)
    go :: Nat n
-> MaybeDState dstate n
-> PeerSender ps pr st' n c m a
-> m (a, dstate)
go Nat n
n    MaybeDState dstate n
dstate             (SenderEffect m (PeerSender ps pr st' n c m a)
k) = m (PeerSender ps pr st' n c m a)
k m (PeerSender ps pr st' n c m a)
-> (PeerSender ps pr st' n c m a -> m (a, dstate)) -> m (a, dstate)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Nat n
-> MaybeDState dstate n
-> PeerSender ps pr st' n c m a
-> m (a, dstate)
forall (st' :: ps) (n :: N).
Nat n
-> MaybeDState dstate n
-> PeerSender ps pr st' n c m a
-> m (a, dstate)
go Nat n
n MaybeDState dstate n
dstate
    go Nat n
Zero (HasDState dstate
dstate) (SenderDone NobodyHasAgency st'
_ a
x) = (a, dstate) -> m (a, dstate)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
x, dstate
dstate)

    go Nat n
Zero MaybeDState dstate n
dstate (SenderYield WeHaveAgency pr st'
stok Message ps st' st'
msg PeerSender ps pr st' 'Z c m a
k) = do
      WeHaveAgency pr st' -> Message ps st' st' -> m ()
forall (pr :: PeerRole) (st :: ps) (st' :: ps).
PeerHasAgency pr st -> Message ps st st' -> m ()
sendMessage WeHaveAgency pr st'
stok Message ps st' st'
msg
      Nat n
-> MaybeDState dstate n
-> PeerSender ps pr st' n c m a
-> m (a, dstate)
forall (st' :: ps) (n :: N).
Nat n
-> MaybeDState dstate n
-> PeerSender ps pr st' n c m a
-> m (a, dstate)
go Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero MaybeDState dstate n
dstate PeerSender ps pr st' n c m a
PeerSender ps pr st' 'Z c m a
k

    go Nat n
Zero (HasDState dstate
dstate) (SenderAwait TheyHaveAgency pr st'
stok forall (st' :: ps).
Message ps st' st' -> PeerSender ps pr st' 'Z c m a
k) = do
      (SomeMessage Message ps st' st'
msg, dstate
dstate') <- TheyHaveAgency pr st' -> dstate -> m (SomeMessage st', dstate)
forall (pr :: PeerRole) (st :: ps).
PeerHasAgency pr st -> dstate -> m (SomeMessage st, dstate)
recvMessage TheyHaveAgency pr st'
stok dstate
dstate
      Nat 'Z
-> MaybeDState dstate 'Z
-> PeerSender ps pr st' 'Z c m a
-> m (a, dstate)
forall (st' :: ps) (n :: N).
Nat n
-> MaybeDState dstate n
-> PeerSender ps pr st' n c m a
-> m (a, dstate)
go Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero (dstate -> MaybeDState dstate 'Z
forall dstate. dstate -> MaybeDState dstate 'Z
HasDState dstate
dstate') (Message ps st' st' -> PeerSender ps pr st' 'Z c m a
forall (st' :: ps).
Message ps st' st' -> PeerSender ps pr st' 'Z c m a
k Message ps st' st'
msg)

    go Nat n
n MaybeDState dstate n
dstate (SenderPipeline WeHaveAgency pr st'
stok Message ps st' st'
msg PeerReceiver ps pr st' st'' m c
receiver PeerSender ps pr st'' ('S n) c m a
k) = do
      STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TQueue m (ReceiveHandler dstate ps pr m c)
-> ReceiveHandler dstate ps pr m c -> STM m ()
forall (stm :: * -> *) a.
MonadSTMTx stm =>
TQueue_ stm a -> a -> stm ()
writeTQueue TQueue m (ReceiveHandler dstate ps pr m c)
receiveQueue (MaybeDState dstate n
-> PeerReceiver ps pr st' st'' m c
-> ReceiveHandler dstate ps pr m c
forall dstate (n :: N) ps (pr :: PeerRole) (st :: ps) (st' :: ps)
       (m :: * -> *) c.
MaybeDState dstate n
-> PeerReceiver ps pr st st' m c -> ReceiveHandler dstate ps pr m c
ReceiveHandler MaybeDState dstate n
dstate PeerReceiver ps pr st' st'' m c
receiver))
      WeHaveAgency pr st' -> Message ps st' st' -> m ()
forall (pr :: PeerRole) (st :: ps) (st' :: ps).
PeerHasAgency pr st -> Message ps st st' -> m ()
sendMessage WeHaveAgency pr st'
stok Message ps st' st'
msg
      Nat ('S n)
-> MaybeDState dstate ('S n)
-> PeerSender ps pr st'' ('S n) c m a
-> m (a, dstate)
forall (st' :: ps) (n :: N).
Nat n
-> MaybeDState dstate n
-> PeerSender ps pr st' n c m a
-> m (a, dstate)
go (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n) MaybeDState dstate ('S n)
forall dstate (n :: N). MaybeDState dstate ('S n)
NoDState PeerSender ps pr st'' ('S n) c m a
k

    go (Succ Nat n
n) MaybeDState dstate n
NoDState (SenderCollect Maybe (PeerSender ps pr st' ('S n) c m a)
Nothing c -> PeerSender ps pr st' n c m a
k) = do
      (c
c, dstate
dstate) <- STM m (c, dstate) -> m (c, dstate)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TQueue m (c, dstate) -> STM m (c, dstate)
forall (stm :: * -> *) a. MonadSTMTx stm => TQueue_ stm a -> stm a
readTQueue TQueue m (c, dstate)
collectQueue)
      case Nat n
n of
        Nat n
Zero    -> Nat 'Z
-> MaybeDState dstate 'Z
-> PeerSender ps pr st' 'Z c m a
-> m (a, dstate)
forall (st' :: ps) (n :: N).
Nat n
-> MaybeDState dstate n
-> PeerSender ps pr st' n c m a
-> m (a, dstate)
go Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero      (dstate -> MaybeDState dstate 'Z
forall dstate. dstate -> MaybeDState dstate 'Z
HasDState dstate
dstate) (c -> PeerSender ps pr st' n c m a
k c
c)
        Succ Nat n
n' -> Nat ('S n)
-> MaybeDState dstate ('S n)
-> PeerSender ps pr st' ('S n) c m a
-> m (a, dstate)
forall (st' :: ps) (n :: N).
Nat n
-> MaybeDState dstate n
-> PeerSender ps pr st' n c m a
-> m (a, dstate)
go (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n')  MaybeDState dstate ('S n)
forall dstate (n :: N). MaybeDState dstate ('S n)
NoDState          (c -> PeerSender ps pr st' n c m a
k c
c)

    go (Succ Nat n
n) MaybeDState dstate n
NoDState (SenderCollect (Just PeerSender ps pr st' ('S n) c m a
k') c -> PeerSender ps pr st' n c m a
k) = do
      Maybe (c, dstate)
mc <- STM m (Maybe (c, dstate)) -> m (Maybe (c, dstate))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TQueue m (c, dstate) -> STM m (Maybe (c, dstate))
forall (stm :: * -> *) a.
MonadSTMTx stm =>
TQueue_ stm a -> stm (Maybe a)
tryReadTQueue TQueue m (c, dstate)
collectQueue)
      case Maybe (c, dstate)
mc of
        Maybe (c, dstate)
Nothing  -> Nat ('S n)
-> MaybeDState dstate ('S n)
-> PeerSender ps pr st' ('S n) c m a
-> m (a, dstate)
forall (st' :: ps) (n :: N).
Nat n
-> MaybeDState dstate n
-> PeerSender ps pr st' n c m a
-> m (a, dstate)
go (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n) MaybeDState dstate ('S n)
forall dstate (n :: N). MaybeDState dstate ('S n)
NoDState  PeerSender ps pr st' ('S n) c m a
k'
        Just (c
c, dstate
dstate) ->
          case Nat n
n of
            Nat n
Zero    -> Nat 'Z
-> MaybeDState dstate 'Z
-> PeerSender ps pr st' 'Z c m a
-> m (a, dstate)
forall (st' :: ps) (n :: N).
Nat n
-> MaybeDState dstate n
-> PeerSender ps pr st' n c m a
-> m (a, dstate)
go Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero      (dstate -> MaybeDState dstate 'Z
forall dstate. dstate -> MaybeDState dstate 'Z
HasDState dstate
dstate) (c -> PeerSender ps pr st' n c m a
k c
c)
            Succ Nat n
n' -> Nat ('S n)
-> MaybeDState dstate ('S n)
-> PeerSender ps pr st' ('S n) c m a
-> m (a, dstate)
forall (st' :: ps) (n :: N).
Nat n
-> MaybeDState dstate n
-> PeerSender ps pr st' n c m a
-> m (a, dstate)
go (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n')  MaybeDState dstate ('S n)
forall dstate (n :: N). MaybeDState dstate ('S n)
NoDState          (c -> PeerSender ps pr st' n c m a
k c
c)


runPipelinedPeerReceiverQueue
  :: forall ps pr dstate m c.
     ( MonadSTM    m
     , MonadThread m
     )
  => TQueue m (ReceiveHandler dstate ps pr m c)
  -> TQueue m (c, dstate)
  -> Driver ps dstate m
  -> m Void
runPipelinedPeerReceiverQueue :: TQueue m (ReceiveHandler dstate ps pr m c)
-> TQueue m (c, dstate) -> Driver ps dstate m -> m Void
runPipelinedPeerReceiverQueue TQueue m (ReceiveHandler dstate ps pr m c)
receiveQueue TQueue m (c, dstate)
collectQueue
                              driver :: Driver ps dstate m
driver@Driver{dstate
startDState :: dstate
startDState :: forall ps dstate (m :: * -> *). Driver ps dstate m -> dstate
startDState} = do
    ThreadId m
threadId <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
    ThreadId m -> String -> m ()
forall (m :: * -> *). MonadThread m => ThreadId m -> String -> m ()
labelThread ThreadId m
threadId String
"pipelined-recevier-queue"
    dstate -> m Void
go dstate
startDState
  where
    go :: dstate -> m Void
    go :: dstate -> m Void
go dstate
receiverDState = do
      ReceiveHandler MaybeDState dstate n
senderDState PeerReceiver ps pr st st' m c
receiver
        <- STM m (ReceiveHandler dstate ps pr m c)
-> m (ReceiveHandler dstate ps pr m c)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TQueue m (ReceiveHandler dstate ps pr m c)
-> STM m (ReceiveHandler dstate ps pr m c)
forall (stm :: * -> *) a. MonadSTMTx stm => TQueue_ stm a -> stm a
readTQueue TQueue m (ReceiveHandler dstate ps pr m c)
receiveQueue)
      let dstate :: dstate
dstate = case (MaybeDState dstate n
senderDState, dstate
receiverDState) of
                       (HasDState dstate
t, dstate
_) -> dstate
t
                       (MaybeDState dstate n
NoDState,    dstate
t) -> dstate
t
      x :: (c, dstate)
x@(!c
_c, !dstate
dstate') <- Driver ps dstate m
-> dstate -> PeerReceiver ps pr st st' m c -> m (c, dstate)
forall ps (st :: ps) (stdone :: ps) (pr :: PeerRole) dstate
       (m :: * -> *) c.
Monad m =>
Driver ps dstate m
-> dstate -> PeerReceiver ps pr st stdone m c -> m (c, dstate)
runPipelinedPeerReceiver Driver ps dstate m
driver dstate
dstate PeerReceiver ps pr st st' m c
receiver
      STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TQueue m (c, dstate) -> (c, dstate) -> STM m ()
forall (stm :: * -> *) a.
MonadSTMTx stm =>
TQueue_ stm a -> a -> stm ()
writeTQueue TQueue m (c, dstate)
collectQueue (c, dstate)
x)
      dstate -> m Void
go dstate
dstate'


runPipelinedPeerReceiver
  :: forall ps (st :: ps) (stdone :: ps) pr dstate m c.
     Monad m
  => Driver ps dstate m
  -> dstate
  -> PeerReceiver ps pr (st :: ps) (stdone :: ps) m c
  -> m (c, dstate)
runPipelinedPeerReceiver :: Driver ps dstate m
-> dstate -> PeerReceiver ps pr st stdone m c -> m (c, dstate)
runPipelinedPeerReceiver Driver{forall (pr :: PeerRole) (st :: ps).
PeerHasAgency pr st -> dstate -> m (SomeMessage st, dstate)
recvMessage :: forall (pr :: PeerRole) (st :: ps).
PeerHasAgency pr st -> dstate -> m (SomeMessage st, dstate)
recvMessage :: forall ps dstate (m :: * -> *).
Driver ps dstate m
-> forall (pr :: PeerRole) (st :: ps).
   PeerHasAgency pr st -> dstate -> m (SomeMessage st, dstate)
recvMessage} = dstate -> PeerReceiver ps pr st stdone m c -> m (c, dstate)
forall (st' :: ps) (st'' :: ps).
dstate -> PeerReceiver ps pr st' st'' m c -> m (c, dstate)
go
  where
    go :: forall st' st''.
          dstate
       -> PeerReceiver ps pr st' st'' m c
       -> m (c, dstate)
    go :: dstate -> PeerReceiver ps pr st' st'' m c -> m (c, dstate)
go dstate
dstate (ReceiverEffect m (PeerReceiver ps pr st' st'' m c)
k) = m (PeerReceiver ps pr st' st'' m c)
k m (PeerReceiver ps pr st' st'' m c)
-> (PeerReceiver ps pr st' st'' m c -> m (c, dstate))
-> m (c, dstate)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= dstate -> PeerReceiver ps pr st' st'' m c -> m (c, dstate)
forall (st' :: ps) (st'' :: ps).
dstate -> PeerReceiver ps pr st' st'' m c -> m (c, dstate)
go dstate
dstate

    go dstate
dstate (ReceiverDone c
x) = (c, dstate) -> m (c, dstate)
forall (m :: * -> *) a. Monad m => a -> m a
return (c
x, dstate
dstate)

    go dstate
dstate (ReceiverAwait TheyHaveAgency pr st'
stok forall (st' :: ps).
Message ps st' st' -> PeerReceiver ps pr st' st'' m c
k) = do
      (SomeMessage Message ps st' st'
msg, dstate
dstate') <- TheyHaveAgency pr st' -> dstate -> m (SomeMessage st', dstate)
forall (pr :: PeerRole) (st :: ps).
PeerHasAgency pr st -> dstate -> m (SomeMessage st, dstate)
recvMessage TheyHaveAgency pr st'
stok dstate
dstate
      dstate -> PeerReceiver ps pr st' st'' m c -> m (c, dstate)
forall (st' :: ps) (st'' :: ps).
dstate -> PeerReceiver ps pr st' st'' m c -> m (c, dstate)
go dstate
dstate' (Message ps st' st' -> PeerReceiver ps pr st' st'' m c
forall (st' :: ps).
Message ps st' st' -> PeerReceiver ps pr st' st'' m c
k Message ps st' st'
msg)