Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
Marconi.Core.Coordinator
Description
Coordinating the work of a list of workers.
See Marconi.Core for documentation.
Synopsis
- data Coordinator input
- workers :: forall input. Lens' (Coordinator input) [Worker input (Point input)]
- threadIds :: forall input. Lens' (Coordinator input) [ThreadId]
- tokens :: forall input. Lens' (Coordinator input) QSemN
- channel :: forall input. Lens' (Coordinator input) (TChan (ProcessedInput (Point input) input))
- errorBox :: forall input. Lens' (Coordinator input) (MVar IndexerError)
- nbWorkers :: forall input. Lens' (Coordinator input) Int
- mkCoordinator :: Ord (Point input) => [Worker input (Point input)] -> IO (Coordinator input)
- step :: (Ord (Point input), IsIndex m input indexer, MonadError IndexerError m) => indexer input -> ProcessedInput (Point input) input -> m (indexer input)
- processQueue :: forall indexer event s r. (Ord (Point event), IsIndex (ExceptT IndexerError IO) event indexer, Closeable IO indexer) => (Timed (Point event) (Maybe event) -> State s (Maybe (Point event))) -> s -> TBQueue (ProcessedInput (Point event) event) -> MVar (indexer event) -> CloseSwitch -> IO r
- data CloseSwitch
Documentation
data Coordinator input #
A coordinator synchronises the event processing of a list of indexers. A coordinator is itself is an indexer. It means that we can create a tree of indexer, with coordinators that partially process the data at each node, and with concrete indexers at the leaves.
Instances
workers :: forall input. Lens' (Coordinator input) [Worker input (Point input)] #
threadIds :: forall input. Lens' (Coordinator input) [ThreadId] #
tokens :: forall input. Lens' (Coordinator input) QSemN #
channel :: forall input. Lens' (Coordinator input) (TChan (ProcessedInput (Point input) input)) #
errorBox :: forall input. Lens' (Coordinator input) (MVar IndexerError) #
nbWorkers :: forall input. Lens' (Coordinator input) Int #
mkCoordinator :: Ord (Point input) => [Worker input (Point input)] -> IO (Coordinator input) #
create a coordinator and starts its workers
step :: (Ord (Point input), IsIndex m input indexer, MonadError IndexerError m) => indexer input -> ProcessedInput (Point input) input -> m (indexer input) #
A coordinator step (send an input to its workers, wait for an ack of every worker before listening again)
Arguments
:: forall indexer event s r. (Ord (Point event), IsIndex (ExceptT IndexerError IO) event indexer, Closeable IO indexer) | |
=> (Timed (Point event) (Maybe event) -> State s (Maybe (Point event))) | emit stable point based on incoming information |
-> s | |
-> TBQueue (ProcessedInput (Point event) event) | |
-> MVar (indexer event) | |
-> CloseSwitch | |
-> IO r |
Read a queue of events, processing them synchronously on each worker
Note that this function silently throw an IndexError
if the event processing fails.
data CloseSwitch #
Should the indexer be closed after running its action?