marconi-core-1.2.0.0: Modular indexing for rewindable ledger
Safe HaskellSafe-Inferred
LanguageHaskell2010

Marconi.Core.Coordinator

Description

Coordinating the work of a list of workers.

See Marconi.Core for documentation.

Synopsis

Documentation

data Coordinator input #

A coordinator synchronises the event processing of a list of indexers. A coordinator is itself is an indexer. It means that we can create a tree of indexer, with coordinators that partially process the data at each node, and with concrete indexers at the leaves.

Instances

Instances details
MonadIO m => Closeable m Coordinator # 
Instance details

Defined in Marconi.Core.Coordinator

Methods

close :: Coordinator event -> m () #

(MonadIO m, MonadError IndexerError m) => IsIndex m event Coordinator # 
Instance details

Defined in Marconi.Core.Coordinator

Methods

index :: Timed (Point event) (Maybe event) -> Coordinator event -> m (Coordinator event) #

indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> Coordinator event -> m (Coordinator event) #

indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> Coordinator event -> m (Coordinator event) #

rollback :: Point event -> Coordinator event -> m (Coordinator event) #

setLastStablePoint :: Point event -> Coordinator event -> m (Coordinator event) #

(Ord (Point event), MonadIO m, MonadError IndexerError m) => IsSync m event Coordinator # 
Instance details

Defined in Marconi.Core.Coordinator

Methods

lastStablePoint :: Coordinator event -> m (Point event) #

lastSyncPoint :: Coordinator event -> m (Point event) #

workers :: forall input. Lens' (Coordinator input) [Worker input (Point input)] #

threadIds :: forall input. Lens' (Coordinator input) [ThreadId] #

tokens :: forall input. Lens' (Coordinator input) QSemN #

channel :: forall input. Lens' (Coordinator input) (TChan (ProcessedInput (Point input) input)) #

errorBox :: forall input. Lens' (Coordinator input) (MVar IndexerError) #

nbWorkers :: forall input. Lens' (Coordinator input) Int #

mkCoordinator :: Ord (Point input) => [Worker input (Point input)] -> IO (Coordinator input) #

create a coordinator and starts its workers

step :: (Ord (Point input), IsIndex m input indexer, MonadError IndexerError m) => indexer input -> ProcessedInput (Point input) input -> m (indexer input) #

A coordinator step (send an input to its workers, wait for an ack of every worker before listening again)

processQueue #

Arguments

:: forall indexer event s r. (Ord (Point event), IsIndex (ExceptT IndexerError IO) event indexer, Closeable IO indexer) 
=> (Timed (Point event) (Maybe event) -> State s (Maybe (Point event)))

emit stable point based on incoming information

-> s 
-> TBQueue (ProcessedInput (Point event) event) 
-> MVar (indexer event) 
-> CloseSwitch 
-> IO r 

Read a queue of events, processing them synchronously on each worker

Note that this function silently throw an IndexError if the event processing fails.

data CloseSwitch #

Should the indexer be closed after running its action?

Constructors

CloseOn 
CloseOff