Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
Marconi.Core
Description
Marconi.Core
re-exports most of the content of the sub-modules and in most scenario,
most of the stuff you need to set up a chain-indexing solution.
Features
Marconi provides the following features out of the box:
- full in-memory indexers;
- indexers backed by a simple file;
- sqlite-indexers;
- indexer transformers, that add capability (logging, caching...) to an indexer;
- group of indexers, synchronised as a single indexer;
- mixed-indexers that allows a different type of storage for recent events and older ones.
Terminology
- An indexer is a data structure that can store any type of events.
- Each event is emitted at a givent point in time.
- A query is a request that the indexer should be able to answer.
- An indexer instance corresponds to the use of an indexer for a specific type of events.
- Most of the time, an indexer instance requires the implementation of some typeclasses to specify its behaviour.
- An indexer transformer slightly alter the behaviour of an existing transformer. In general, an indexer transformer should add its logic to one of the typeclasses that specify the behaviour of an indexer.
- A coordinator is a specific kind of indexer that pass events to a set of indexer it coordonates.
- A worker is a wrapper around an indexer, that hides the indexer types to a coordinator and handle locally the lifecycle of the indexer.
- A preprocessor is a stateful function that transforms events before they are sent to an indexer or to a worker.
Content
- Base type classes to define an indexer, its query interface, and the required plumbing to handle rollback.
- A full in-memory indexer (naive), a full SQLite indexer and an indexer that compose it with a SQL layer for persistence.
- A coordinator for indexers, that can be exposed as an itdexer itself.
- Some queries that can be applied to many indexers.
Several transformers for indexers:
- Aggregate to maintain a fold value based on incoming events
- Cache to keep in memory some query result, avoiding to hit the indexer
- Catchup to speed up the synchronisation time of an indexer doing batch insertion
- Delay to delay event processing for heavy computation.
- Pruning, to compact data that can't be rollbacked.
- Tracing, as a modifier to an existing indexer. (it allows us to opt-in for traces if we want, indexer by indexer)
- Transform to change the input type of an indexer
indexers can have different implementations (SQLite, in-memory...).
SQLite
indexers are the most common ones. For specific scenarios, you may want to combine
them with a mixed indexer or to go for other types of indexers.
We may consider other database backend than SQLite
in the future.
TODO List (non-exhaustive)
- Provide MonadState version of the functions that manipulates the indexer.
- Add custom error handling at the worker and at the coordinator level, even if its likely that the coordinator can't do much aside distributing poison pills.
- Add connections pools for queries for the SQLite indexers.
How-to
Define an indexer instance
At this stage, in most of the cases, you want to define an SQLiteIndexer
,
which will store the incoming events into an SQLite database.
Other types of indexers exist and we detail some of them as well below.
Define an indexer instance for SQLiteIndexer
- You need to define a type for
event
(the input of your indexer). As soon as it's done, define thePoint
type instance for this event,Point
is a way to know when the Point was emitted. It can be a time, a slot number, a block, whatever information that tracks when an event happen. If you follow these steps in order to build aMixedIndexer
, just reuse theevent
type you declared for theListIndexer
.
- Define a
query
type and the correspondingResult
type. Again, reuse the one declared for theListIndexer
if you're building aMixedIndexer
. A
SQLiteIndexer
will require a bit of type machinery to be able to index record. There are two cases, A simple one, where you only need one insert query to index your event, And a more complex one, where several inserts are needed.In the simple case, you can use
singleInsertSQLiteIndexer
to create your indexer. It requires the definition of aparam
type, a data representation of the Timed that SQLite can process (it should have aToRow
instance). You also need to declare aInsertRecord
type instance that is a list of thisparam
.In the complex case, you need to define a custom
InsertRecord
type instance. And your own insertion function. This case is not covered yet in this tutorial.We also need to enable rollback events,
rollbackSQLiteIndexerWith
can save you from a bit of boilerplate.- Define a
query
type and the correspondingResult
type. If you plan to use aMixedIndexer
you probably want to reuse the query - Then, for this query you need to define the
Queryable
instance. There's no helper on this one, but you probably want to query the database and to aggregate the query result in yourResult
type.
Define an indexer instance for ListIndexer
- You need to define a type for
event
(the input of your indexer). As soon as it's done, define thePoint
type instance for this event,Point
is a way to know when the Point was emitted. It can be a time, a slot number, a block, whatever information that tracks when an event happens.
- It's already enough to index
Timed
of the events you defined at step one of your indexer and to proceed to rollback. You can already test it, creating an indexer withlistIndexer
. - Define a
query
type and the correspondingResult
type. - Then, for this query you need to define the
Queryable
instance that corresponds to your indexer. - The
ListIndexer
is ready.
Define an indexer instance for a MixedIndexer
Follow in order the steps for the creation of a ListIndexer
(the in-memory part)
and the ones for a SQLiteIndexer
(the on-disk part).
Once it's done, you need to implement the AppendResult
for the in-memory part.
It's purpose is to take the result of an on-disk query and to update it with on-memory events.
You can choose different indexers for your in-memory part or on-disk part, but these choices are not documented yet.
Write a new indexer
Most users probably don't want to do this.
A good reason is to add support for another backend (another database or another in-memory structure)
The minimal typeclass that your indexer must implement/allow to implement are:
IsSync
IsIndex
Queryable
Closeable
(if you plan to use it in a worker, and you probably plan to)AppendResult
(if you plan to use it as the in-memory part of aMixedIndexer
)
Best practices is to implement as much as we can event
/query
agnostic
instances of the typeclasses of these module for the new indexer.
When it's impossible to write a typeclass, think about how we can reduce the
boilerplate with a helper function, and expose it.
Try to provide a smart constructor to hide most of the complexity of your indexer.
Then you also probably need to provide a helper to create workers for this indexer.
Synopsis
- type family Point event
- type family Result query
- data Timed point event = Timed point event
- point :: Lens' (Timed point event) point
- event :: Lens (Timed point a) (Timed point b) a b
- class HasGenesis t where
- genesis :: t
- class Monad m => IsIndex m event indexer where
- index :: Eq (Point event) => Timed (Point event) (Maybe event) -> indexer event -> m (indexer event)
- indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> indexer event -> m (indexer event)
- indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> indexer event -> m (indexer event)
- rollback :: Ord (Point event) => Point event -> indexer event -> m (indexer event)
- setLastStablePoint :: Ord (Point event) => Point event -> indexer event -> m (indexer event)
- indexEither :: (IsIndex (ExceptT err m) event indexer, Eq (Point event)) => Timed (Point event) (Maybe event) -> indexer event -> m (Either err (indexer event))
- indexAllEither :: (IsIndex (ExceptT err m) event indexer, Traversable f, Ord (Point event)) => f (Timed (Point event) (Maybe event)) -> indexer event -> m (Either err (indexer event))
- indexAllDescendingEither :: (IsIndex (ExceptT err m) event indexer, Traversable f, Ord (Point event)) => f (Timed (Point event) (Maybe event)) -> indexer event -> m (Either err (indexer event))
- class HasGenesis (Point event) => Resetable m event indexer where
- reset :: indexer event -> m (indexer event)
- resumeFrom :: MonadError IndexerError m => Resetable m event indexer => IsIndex m event indexer => IsSync m event indexer => HasGenesis (Point event) => Ord (Point event) => Point event -> Bool -> indexer event -> m (Point event, indexer event)
- class Functor m => IsSync m event indexer where
- lastStablePoint :: indexer event -> m (Point event)
- lastSyncPoint :: indexer event -> m (Point event)
- isAheadOfSync :: (Ord (Point event), IsSync m event indexer) => Point event -> indexer event -> m Bool
- class Closeable m indexer where
- close :: indexer event -> m ()
- class Queryable m event query indexer where
- queryEither :: Queryable (ExceptT (QueryError query) m) event query indexer => Ord (Point event) => Point event -> query -> indexer event -> m (Either (QueryError query) (Result query))
- queryLatestEither :: Queryable (ExceptT (QueryError query) m) event query indexer => IsSync m event indexer => Monad m => Ord (Point event) => query -> indexer event -> m (Either (QueryError query) (Result query))
- class AppendResult m event query indexer where
- data IndexerError
- = RollbackBehindHistory
- | IndexerInternalError Text
- | InvalidIndexer Text
- | StopIndexer (Maybe Text)
- | ResumingFailed Text
- | IndexerCloseTimeoutError
- | OtherIndexError Text
- _RollbackBehindHistory :: Prism' IndexerError ()
- _IndexerInternalError :: Prism' IndexerError Text
- _InvalidIndexer :: Prism' IndexerError Text
- _StopIndexer :: Prism' IndexerError (Maybe Text)
- _ResumingFailed :: Prism' IndexerError Text
- _IndexerCloseTimeoutError :: Prism' IndexerError ()
- _OtherIndexError :: Prism' IndexerError Text
- data QueryError query
- = AheadOfLastSync (Maybe (Result query))
- | NotStoredAnymore
- | IndexerQueryError Text
- | SlotNoBoundsInvalid Text
- _AheadOfLastSync :: forall query query. Prism (QueryError query) (QueryError query) (Maybe (Result query)) (Maybe (Result query))
- _NotStoredAnymore :: forall query. Prism' (QueryError query) ()
- _IndexerQueryError :: forall query. Prism' (QueryError query) Text
- _SlotNoBoundsInvalid :: forall query. Prism' (QueryError query) Text
- data ListIndexer event
- mkListIndexer :: HasGenesis (Point event) => ListIndexer event
- events :: forall event. Lens' (ListIndexer event) [Timed (Point event) event]
- latestPoint :: forall event. Lens' (ListIndexer event) (Point event)
- data SQLiteIndexer event = SQLiteIndexer SQLiteDBLocation Connection (Pool Connection) [[SQLInsertPlan event]] [SQLRollbackPlan (Point event)] SetLastStablePointQuery (Point event) (Point event)
- newtype GetLastStablePointQuery = GetLastStablePointQuery {
- getLastStablePointQuery :: Query
- newtype SetLastStablePointQuery = SetLastStablePointQuery {
- getSetLastStablePointQuery :: Query
- newtype InsertPointQuery = InsertPointQuery Query
- data SQLiteDBLocation
- mkSqliteIndexer :: forall event m. (MonadIO m, MonadError IndexerError m, HasGenesis (Point event), FromRow (Point event), ToRow (Point event), Ord (Point event)) => SQLiteDBLocation -> [Query] -> [[SQLInsertPlan event]] -> [SQLRollbackPlan (Point event)] -> SetLastStablePointQuery -> GetLastStablePointQuery -> m (SQLiteIndexer event)
- mkSingleInsertSqliteIndexer :: forall m event param. (MonadIO m, MonadError IndexerError m, HasGenesis (Point event), FromRow (Point event), ToRow (Point event), ToRow param, Ord (Point event)) => SQLiteDBLocation -> (Timed (Point event) event -> param) -> Query -> Query -> SQLRollbackPlan (Point event) -> SetLastStablePointQuery -> GetLastStablePointQuery -> m (SQLiteIndexer event)
- inMemoryDB :: SQLiteDBLocation
- parseDBLocation :: String -> SQLiteDBLocation
- writeConnection :: forall event. Lens' (SQLiteIndexer event) Connection
- readConnectionPool :: forall event. Lens' (SQLiteIndexer event) (Pool Connection)
- data SQLInsertPlan event = forall a.ToRow a => SQLInsertPlan {
- planExtractor :: Timed (Point event) event -> [a]
- planInsert :: Query
- data SQLRollbackPlan point = forall a.ToField a => SQLRollbackPlan {}
- class ToRow a where
- toRow :: a -> [SQLData]
- dbLastSync :: forall event. Lens' (SQLiteIndexer event) (Point event)
- querySQLiteIndexerWith :: (MonadIO m, MonadError (QueryError query) m, Ord (Point event), FromRow r) => (Point event -> query -> [NamedParam]) -> (query -> Query) -> (query -> [r] -> Result query) -> Point event -> query -> SQLiteIndexer event -> m (Result query)
- queryLatestSQLiteIndexerWith :: MonadIO m => FromRow r => (query -> [NamedParam]) -> (query -> Query) -> (query -> [r] -> Result query) -> query -> SQLiteIndexer event -> m (Result query)
- querySyncedOnlySQLiteIndexerWith :: MonadIO m => MonadError (QueryError query) m => Ord (Point event) => FromRow r => (Point event -> query -> [NamedParam]) -> (query -> Query) -> (query -> [r] -> Result query) -> Point event -> query -> SQLiteIndexer event -> m (Result query)
- handleSQLErrors :: IO a -> IO (Either IndexerError a)
- data SQLiteAggregateQuery m point event = SQLiteAggregateQuery [SQLiteSourceProvider m point] (Pool Connection)
- aggregateConnection :: Lens' (SQLiteAggregateQuery m point event) (Pool Connection)
- mkSQLiteAggregateQuery :: Map String (SQLiteSourceProvider m point) -> IO (SQLiteAggregateQuery m point event)
- withResource :: Pool a -> (a -> IO r) -> IO r
- data SQLiteSourceProvider m point = forall indexer event.(IsSourceProvider m event indexer, Point event ~ point) => SQLiteSourceProvider (MVar (indexer event))
- type IsSourceProvider m event indexer = (IsSync m event indexer, HasDatabasePath indexer)
- class HasDatabasePath indexer where
- getDatabasePath :: indexer event -> SQLiteDBLocation
- readOnlyConnection :: MonadIO io => SQLiteDBLocation -> io Connection
- readWriteConnection :: (MonadIO io, MonadError IndexerError io) => SQLiteDBLocation -> io Connection
- data FileIndexer meta event = FileIndexer FilePath (FileStorageConfig meta event) (FileBuilder meta event) (EventBuilder meta event) (Point event) (Point event) (Maybe AsyncWriteFileConfig)
- data FileStorageConfig meta event = FileStorageConfig Bool (Maybe (FileCleanup meta event)) (meta -> meta -> Ordering)
- data FileCleanup meta event
- withPartition :: (Timed (Point event) (Maybe event) -> [EventInfo meta] -> ([EventInfo meta], [EventInfo meta])) -> Maybe (FileCleanup meta event)
- data FileBuilder meta event = FileBuilder Text Text (Timed (Point event) (Maybe event) -> [Text]) (event -> Builder) (Point event -> Builder)
- data EventBuilder meta event = EventBuilder ([Text] -> Maybe meta) (meta -> Point event) (meta -> ByteString -> Either Text (Maybe event)) (ByteString -> Either Text (Point event))
- data EventInfo meta
- mkFileIndexer :: (MonadIO m, MonadError IndexerError m, Ord (Point event), HasGenesis (Point event)) => FilePath -> Maybe Int -> FileStorageConfig meta event -> FileBuilder meta event -> EventBuilder meta event -> m (FileIndexer meta event)
- data LastEventIndexer event = LastEventIndexer (LastEventConfig event) (LastEventState event)
- mkLastEventIndexer :: (HasGenesis (Point event), MonadIO m, MonadError IndexerError m) => LastEventConfig event -> m (LastEventIndexer event)
- data LastEventConfig event = LastEventConfig FilePath Bool (Point event -> ByteString) (ByteString -> Either Text (Point event)) (event -> ByteString) (ByteString -> Either Text event) Word
- data GetLastQuery a = GetLastQuery
- data MixedIndexer store mem event
- mkMixedIndexer :: Word -> Word -> store event -> mem event -> MixedIndexer store mem event
- standardMixedIndexer :: (IsSync m event store, HasGenesis (Point event), Ord (Point event), Monad m) => Word -> Word -> store event -> m (MixedIndexer store ListIndexer event)
- inMemory :: Lens' (MixedIndexer store mem event) (mem event)
- inDatabase :: Lens' (MixedIndexer store mem event) (store event)
- class HasMixedConfig indexer where
- flushEvery :: Lens' (indexer event) Word
- keepInMemory :: Lens' (indexer event) Word
- class Flushable m event indexer where
- data LastPointIndexer event
- lastPointIndexer :: HasGenesis (Point event) => LastPointIndexer event
- data WorkerM m input point = forall indexer event n.(WorkerIndexerType n event indexer, Point event ~ point) => Worker {
- workerName :: Text
- workerState :: MVar (indexer event)
- transformInput :: Preprocessor (ExceptT IndexerError m) point input event
- hoistError :: forall a. n a -> ExceptT IndexerError m a
- type Worker = WorkerM IO
- data WorkerIndexer m input event indexer = WorkerIndexer {
- workerIndexerVar :: !(MVar (indexer event))
- worker :: WorkerM m input (Point event)
- type WorkerIndexerType n event indexer = (IsIndex n event indexer, IsSync n event indexer, Closeable n indexer)
- startWorker :: forall input m. MonadIO m => Ord (Point input) => TChan (ProcessedInput (Point input) input) -> MVar IndexerError -> QSemN -> QSemN -> Worker input (Point input) -> m ThreadId
- createWorker :: (MonadIO f, WorkerIndexerType (ExceptT IndexerError m) event indexer) => Text -> (input -> Maybe event) -> indexer event -> f (WorkerIndexer m input event indexer)
- createWorkerHoist :: (MonadIO f, WorkerIndexerType n event indexer) => (forall a. n a -> ExceptT IndexerError m a) -> Text -> Preprocessor (ExceptT IndexerError m) (Point event) input event -> indexer event -> f (WorkerIndexer m input event indexer)
- createWorkerPure :: (MonadIO f, MonadIO m, WorkerIndexerType m event indexer) => Text -> Preprocessor (ExceptT IndexerError m) (Point event) input event -> indexer event -> f (WorkerIndexer m input event indexer)
- createWorkerWithPreprocessing :: (MonadIO f, WorkerIndexerType (ExceptT IndexerError m) event indexer) => Text -> Preprocessor (ExceptT IndexerError m) (Point event) input event -> indexer event -> f (WorkerIndexer m input event indexer)
- data ProcessedInput point event
- type Preprocessor m point a b = ScanM m [ProcessedInput point a] [ProcessedInput point b]
- mapEvent :: Monad m => (a -> b) -> Preprocessor m point a b
- mapMaybeEvent :: forall m point a b. Monad m => (a -> Maybe b) -> Preprocessor m point a b
- traverseEvent :: Monad m => (a -> m b) -> Preprocessor m point a b
- traverseMaybeEvent :: Monad m => (a -> m (Maybe b)) -> Preprocessor m point a b
- scanEvent :: forall m s point a b. Monad m => (a -> State s b) -> s -> Preprocessor m point a b
- scanEventM :: forall m s point a b. Monad m => (a -> StateT s m b) -> m s -> Preprocessor m point a b
- scanMaybeEvent :: forall m s point a b. Monad m => (a -> State s (Maybe b)) -> s -> Preprocessor m point a b
- scanMaybeEventM :: forall m s point a b. Monad m => (a -> StateT s m (Maybe b)) -> m s -> Preprocessor m point a b
- preprocessor :: Monad m => (ProcessedInput point a -> State s [ProcessedInput point b]) -> s -> Preprocessor m point a b
- preprocessorM :: Monad m => (ProcessedInput point a -> StateT s m [ProcessedInput point b]) -> m s -> Preprocessor m point a b
- type Resume m event = Preprocessor m (Point event) event event
- withResume :: (MonadError IndexerError m, Ord (Point event)) => Point event -> Resume m event
- data Coordinator input
- workers :: forall input. Lens' (Coordinator input) [Worker input (Point input)]
- tokens :: forall input. Lens' (Coordinator input) QSemN
- channel :: forall input. Lens' (Coordinator input) (TChan (ProcessedInput (Point input) input))
- nbWorkers :: forall input. Lens' (Coordinator input) Int
- mkCoordinator :: Ord (Point input) => [Worker input (Point input)] -> IO (Coordinator input)
- processQueue :: forall indexer event s r. (Ord (Point event), IsIndex (ExceptT IndexerError IO) event indexer, Closeable IO indexer) => (Timed (Point event) (Maybe event) -> State s (Maybe (Point event))) -> s -> TBQueue (ProcessedInput (Point event) event) -> MVar (indexer event) -> CloseSwitch -> IO r
- data CloseSwitch
- data EventAtQuery event = EventAtQuery
- newtype EventsFromQuery event = EventsFromQuery (Point event)
- newtype EventsMatchingQuery event = EventsMatchingQuery (event -> Maybe event)
- allEvents :: EventsMatchingQuery event
- calcStability :: Ord point => (event -> point) -> point -> event -> Stability event
- newtype LatestEventsQuery event = LatestEventsQuery Word
- latestEvent :: LatestEventsQuery event
- data Stability a
- isStable :: Stability a -> Bool
- queryErrorWithStability :: (Result (WithStability query) ~ f (Stability event), Result query ~ f event) => (f event -> f (Stability event)) -> QueryError query -> QueryError (WithStability query)
- newtype WithStability query = WithStability {
- unWithStability :: query
- withStabilityM :: (Result query ~ f event, Result (WithStability query) ~ f (Stability event), MonadError (QueryError (WithStability query)) m, Traversable f) => (event -> Stability event) -> ExceptT (QueryError query) m (f event) -> m (f (Stability event))
- withStability :: forall event m indexer f. (Monad m, Ord (Point event), IsSync m event indexer, Traversable f) => indexer event -> f (Timed (Point event) event) -> m (f (Stability (Timed (Point event) event)))
- withStabilityAt :: forall result m event indexer f. (Monad m, Ord (Point event), IsSync m event indexer, Applicative f) => indexer event -> Point event -> f result -> m (f (Stability result))
- class IndexerTrans t where
- unwrap :: Lens' (t indexer event) (indexer event)
- class IndexerMapTrans t where
- unwrapMap :: Lens' (t indexer output event) (indexer output)
- data IndexerEvent point
- data WithTrace m indexer event
- withTrace :: Applicative m => Trace m (IndexerEvent (Point event)) -> indexer event -> WithTrace m indexer event
- withTraceM :: MonadIO m => Trace m (IndexerEvent (Point event)) -> m (indexer event) -> m (WithTrace m indexer event)
- class HasTraceConfig m event indexer where
- trace :: Lens' (indexer event) (Trace m (IndexerEvent (Point event)))
- data WithTracer m indexer event
- withTracer :: Applicative m => Tracer m (IndexerEvent (Point event)) -> indexer event -> WithTracer m indexer event
- withTracerM :: Applicative m => Tracer m (IndexerEvent (Point event)) -> m (indexer event) -> m (WithTracer m indexer event)
- class HasTracerConfig m event indexer where
- tracer :: Lens' (indexer event) (Tracer m (IndexerEvent (Point event)))
- data WithCatchup indexer event
- withCatchup :: (Point event -> event -> Word64) -> CatchupConfig -> indexer event -> WithCatchup indexer event
- data CatchupConfig = CatchupConfig Word64 Word64 (Maybe (CatchupEvent -> IO ()))
- mkCatchupConfig :: Word64 -> Word64 -> CatchupConfig
- configCatchupEventHook :: Lens' CatchupConfig (Maybe (CatchupEvent -> IO ()))
- class HasCatchupConfig indexer where
- catchupBypassDistance :: Lens' (indexer event) Word64
- catchupBatchSize :: Lens' (indexer event) Word64
- catchupEventHook :: Lens' (indexer event) (Maybe (CatchupEvent -> IO ()))
- data CatchupEvent = Synced
- createIndexTable :: Text -> Trace IO Text -> Connection -> String -> Query -> IO ()
- data WithDelay indexer event
- withDelay :: Word -> indexer event -> WithDelay indexer event
- class HasDelayConfig indexer where
- delayCapacity :: Lens' (indexer event) Word
- data WithPruning indexer event
- withPruning :: Word -> Word -> indexer event -> WithPruning indexer event
- nextPruning :: Lens' (WithPruning indexer event) (Seq (Point event))
- stepsBeforeNext :: Lens' (WithPruning indexer event) Word
- currentDepth :: Lens' (WithPruning indexer event) Word
- class Prunable m event indexer where
- class HasPruningConfig indexer where
- securityParam :: Lens' (indexer event) Word
- pruneEvery :: Lens' (indexer event) Word
- newtype WithAction indexer event = WithAction {
- _withActionWrapper :: IndexTransformer WithActionConfig indexer event
- newtype WithActionConfig event = WithActionConfig {
- _withActionConfigAction :: Timed (Point event) event -> IO ()
- withActionConfigAction :: forall event event. Iso (WithActionConfig event) (WithActionConfig event) (Timed (Point event) event -> IO ()) (Timed (Point event) event -> IO ())
- withActionWrapper :: forall indexer event indexer event. Iso (WithAction indexer event) (WithAction indexer event) (IndexTransformer WithActionConfig indexer event) (IndexTransformer WithActionConfig indexer event)
- class Streamable a r where
- withStream :: Streamable a r => (Timed (Point event) event -> r) -> a -> indexer event -> WithAction indexer event
- data WithCache query indexer event
- withCache :: Ord query => (Timed (Point event) (Maybe event) -> Result query -> Result query) -> indexer event -> WithCache query indexer event
- addCacheFor :: forall query m event indexer. Queryable (ExceptT (QueryError query) m) event query indexer => IsSync (ExceptT (QueryError query) m) event indexer => HasCacheConfig query indexer => Monad m => MonadError IndexerError m => Ord query => Ord (Point event) => query -> indexer event -> m (indexer event)
- class HasCacheConfig query indexer where
- data WithTransform indexer output input
- withTransform :: (Point input -> Point output) -> (input -> Maybe output) -> indexer output -> WithTransform indexer output input
- class HasTransformConfig input output indexer where
- transformEvent :: Lens' (indexer input) (input -> Maybe output)
- data WithFold m indexer output input
- withFold :: output -> (indexer output -> m (Maybe output)) -> (output -> input -> m output) -> indexer output -> WithFold m indexer output input
- withFoldMap :: (Monoid output, Applicative m) => (indexer output -> m (Maybe output)) -> (input -> output) -> indexer output -> WithFold m indexer output input
- getLastEventAtQueryValue :: (IsSync m event indexer, HasGenesis (Point event), Queryable (ExceptT (QueryError (EventAtQuery a)) m) event (EventAtQuery a) indexer, Ord (Point event), MonadError IndexerError m) => indexer event -> m (Maybe a)
- class HasFold m input output indexer where
- fold :: Lens' (indexer input) (output -> input -> m output)
- data IndexTransformer config indexer event = IndexTransformer (config event) (indexer event)
- wrappedIndexer :: forall config indexer event indexer. Lens (IndexTransformer config indexer event) (IndexTransformer config indexer event) (indexer event) (indexer event)
- wrapperConfig :: forall config indexer event config. Lens (IndexTransformer config indexer event) (IndexTransformer config indexer event) (config event) (config event)
- pruneVia :: (Functor m, Prunable m event indexer, Ord (Point event)) => Lens' s (indexer event) -> Point event -> s -> m s
- pruningPointVia :: Prunable m event indexer => Getter s (indexer event) -> s -> m (Maybe (Point event))
- rollbackVia :: (IsIndex m event indexer, Ord (Point event)) => Lens' s (indexer event) -> Point event -> s -> m s
- resetVia :: (Functor m, Resetable m event indexer) => Lens' s (indexer event) -> s -> m s
- indexVia :: (IsIndex m event indexer, Eq (Point event)) => Lens' s (indexer event) -> Timed (Point event) (Maybe event) -> s -> m s
- indexAllDescendingVia :: (Eq (Point event), IsIndex m event indexer, Traversable f) => Lens' s (indexer event) -> f (Timed (Point event) (Maybe event)) -> s -> m s
- setLastStablePointVia :: (Ord (Point event), IsIndex m event indexer) => Lens' s (indexer event) -> Point event -> s -> m s
- lastStablePointVia :: IsSync m event indexer => Getter s (indexer event) -> s -> m (Point event)
- lastSyncPointVia :: IsSync m event indexer => Getter s (indexer event) -> s -> m (Point event)
- closeVia :: Closeable m indexer => Getter s (indexer event) -> s -> m ()
- queryVia :: (Queryable m event query indexer, Ord (Point event)) => Getter s (indexer event) -> Point event -> query -> s -> m (Result query)
- queryLatestVia :: (Queryable m event query indexer, Monad m, Ord (Point event), IsSync m event indexer) => Getter s (indexer event) -> query -> s -> m (Result query)
Core types and typeclasses
Core types
Marconi's indexers relies on three main concepts:
event
the information we index;indexer
that stores relevant (for it) pieces of information from theevent
s;query
that defines what can be asked to anindexer
.
A point in time, the concrete type of a point is now derived from an indexer event, instead of an event. The reason is that you may not want to always carry around a point when you manipulate an event.
A Result
is a data family for query descriptor.
A query is tied to an indexer by a typeclass, this design choice has two main reasons: * we want to be able to define different query for the same indexer (eg. we may want to define two distinct query types for an utxo indexer: one to ge all the utxo for a given address, another one for to get all the utxos emitted at a given slot). * we want to assign a query type to different indexers.
Instances
type Result (GetLastQuery a) # | |
Defined in Marconi.Core.Indexer.LastEventIndexer | |
type Result (EventAtQuery event) # | The result of EventAtQuery is always an event. The error cases are handled by the query interface. |
Defined in Marconi.Core.Query | |
type Result (EventsFromQuery event) # | |
Defined in Marconi.Core.Query | |
type Result (EventsMatchingQuery event) # | The result of an |
Defined in Marconi.Core.Query | |
type Result (LatestEventsQuery event) # | |
Defined in Marconi.Core.Query | |
type Result (WithStability (EventsFromQuery event)) # | |
Defined in Marconi.Core.Query | |
type Result (WithStability (EventsMatchingQuery event)) # | |
Defined in Marconi.Core.Query |
Attach an event to a point in time
Constructors
Timed point event |
Instances
Foldable (Timed point) # | |
Defined in Marconi.Core.Type Methods fold :: Monoid m => Timed point m -> m Source # foldMap :: Monoid m => (a -> m) -> Timed point a -> m Source # foldMap' :: Monoid m => (a -> m) -> Timed point a -> m Source # foldr :: (a -> b -> b) -> b -> Timed point a -> b Source # foldr' :: (a -> b -> b) -> b -> Timed point a -> b Source # foldl :: (b -> a -> b) -> b -> Timed point a -> b Source # foldl' :: (b -> a -> b) -> b -> Timed point a -> b Source # foldr1 :: (a -> a -> a) -> Timed point a -> a Source # foldl1 :: (a -> a -> a) -> Timed point a -> a Source # toList :: Timed point a -> [a] Source # null :: Timed point a -> Bool Source # length :: Timed point a -> Int Source # elem :: Eq a => a -> Timed point a -> Bool Source # maximum :: Ord a => Timed point a -> a Source # minimum :: Ord a => Timed point a -> a Source # | |
Traversable (Timed point) # | |
Defined in Marconi.Core.Type Methods traverse :: Applicative f => (a -> f b) -> Timed point a -> f (Timed point b) Source # sequenceA :: Applicative f => Timed point (f a) -> f (Timed point a) Source # mapM :: Monad m => (a -> m b) -> Timed point a -> m (Timed point b) Source # sequence :: Monad m => Timed point (m a) -> m (Timed point a) Source # | |
Functor (Timed point) # | |
(FromJSON event, FromJSON point) => FromJSON (Timed point event) # | |
Defined in Marconi.Core.Type Methods parseJSON :: Value -> Parser (Timed point event) parseJSONList :: Value -> Parser [Timed point event] | |
(ToJSON event, ToJSON point) => ToJSON (Timed point event) # | |
Defined in Marconi.Core.Type Methods toJSON :: Timed point event -> Value toEncoding :: Timed point event -> Encoding toJSONList :: [Timed point event] -> Value toEncodingList :: [Timed point event] -> Encoding | |
Generic (Timed point event) # | |
(Show event, Show point) => Show (Timed point event) # | |
(Eq event, Eq point) => Eq (Timed point event) # | |
(Ord event, Ord point) => Ord (Timed point event) # | |
Defined in Marconi.Core.Type Methods compare :: Timed point event -> Timed point event -> Ordering Source # (<) :: Timed point event -> Timed point event -> Bool Source # (<=) :: Timed point event -> Timed point event -> Bool Source # (>) :: Timed point event -> Timed point event -> Bool Source # (>=) :: Timed point event -> Timed point event -> Bool Source # max :: Timed point event -> Timed point event -> Timed point event Source # min :: Timed point event -> Timed point event -> Timed point event Source # | |
type Rep (Timed point event) # | |
Defined in Marconi.Core.Type type Rep (Timed point event) = D1 ('MetaData "Timed" "Marconi.Core.Type" "marconi-core-1.2.0.0-inplace" 'False) (C1 ('MetaCons "Timed" 'PrefixI 'True) (S1 ('MetaSel ('Just "_point") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedStrict) (Rec0 point) :*: S1 ('MetaSel ('Just "_event") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedStrict) (Rec0 event))) |
event :: Lens (Timed point a) (Timed point b) a b #
A lens to get the event without its time information
Core typeclasses
class Monad m => IsIndex m event indexer where #
The base class of an indexer, providing its key functionalities: indexing events and handling rollbacks.
indexer
the indexer implementation typeevent
the indexed eventsm
the monad in which our indexer operates
Rules:
- Rollback to last indexed point do nothing:
rollback p <=< index (Timed p evt) === index (Timed p evt)
; - Rollback eliminate intermediate indexing
(precondition: none of the
point
inxs
are equal top
):rollback p <=< indexAll events <=< index (Timed p evt) === index (Timed p evt)
; - Rollback is idempotent:
rollback p <=< rollback p === rollback p
.
Minimal complete definition
Methods
index :: Eq (Point event) => Timed (Point event) (Maybe event) -> indexer event -> m (indexer event) #
index an event at a given point in time
indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> indexer event -> m (indexer event) #
Index a bunch of event, associated to their point in time, in an indexer
The events must be sorted in ascending order (the most recent first)
indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> indexer event -> m (indexer event) #
Index a bunch of event, associated to their point in time, in an indexer
The events must be sorted in descending order (the most recent first)
rollback :: Ord (Point event) => Point event -> indexer event -> m (indexer event) #
Rollback to a previous point
setLastStablePoint :: Ord (Point event) => Point event -> indexer event -> m (indexer event) #
Set the last stable point known by the indexer. Note that the last stable point should always increase, so implementation should usually filter out lower stable points.
Instances
(MonadIO m, MonadError IndexerError m) => IsIndex m a LastEventIndexer # | |
Defined in Marconi.Core.Indexer.LastEventIndexer Methods index :: Timed (Point a) (Maybe a) -> LastEventIndexer a -> m (LastEventIndexer a) # indexAll :: (Eq (Point a), Traversable f) => f (Timed (Point a) (Maybe a)) -> LastEventIndexer a -> m (LastEventIndexer a) # indexAllDescending :: (Eq (Point a), Traversable f) => f (Timed (Point a) (Maybe a)) -> LastEventIndexer a -> m (LastEventIndexer a) # rollback :: Point a -> LastEventIndexer a -> m (LastEventIndexer a) # setLastStablePoint :: Point a -> LastEventIndexer a -> m (LastEventIndexer a) # | |
(MonadIO m, MonadError IndexerError m) => IsIndex m event Coordinator # | |
Defined in Marconi.Core.Coordinator Methods index :: Timed (Point event) (Maybe event) -> Coordinator event -> m (Coordinator event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> Coordinator event -> m (Coordinator event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> Coordinator event -> m (Coordinator event) # rollback :: Point event -> Coordinator event -> m (Coordinator event) # setLastStablePoint :: Point event -> Coordinator event -> m (Coordinator event) # | |
(HasGenesis (Point event), Monad m) => IsIndex m event LastPointIndexer # | |
Defined in Marconi.Core.Indexer.LastPointIndexer Methods index :: Timed (Point event) (Maybe event) -> LastPointIndexer event -> m (LastPointIndexer event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> LastPointIndexer event -> m (LastPointIndexer event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> LastPointIndexer event -> m (LastPointIndexer event) # rollback :: Point event -> LastPointIndexer event -> m (LastPointIndexer event) # setLastStablePoint :: Point event -> LastPointIndexer event -> m (LastPointIndexer event) # | |
Monad m => IsIndex m event ListIndexer # | |
Defined in Marconi.Core.Indexer.ListIndexer Methods index :: Timed (Point event) (Maybe event) -> ListIndexer event -> m (ListIndexer event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> ListIndexer event -> m (ListIndexer event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> ListIndexer event -> m (ListIndexer event) # rollback :: Point event -> ListIndexer event -> m (ListIndexer event) # setLastStablePoint :: Point event -> ListIndexer event -> m (ListIndexer event) # | |
(MonadIO m, MonadError IndexerError m, ToRow (Point event)) => IsIndex m event SQLiteIndexer # | |
Defined in Marconi.Core.Indexer.SQLiteIndexer Methods index :: Timed (Point event) (Maybe event) -> SQLiteIndexer event -> m (SQLiteIndexer event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> SQLiteIndexer event -> m (SQLiteIndexer event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> SQLiteIndexer event -> m (SQLiteIndexer event) # rollback :: Point event -> SQLiteIndexer event -> m (SQLiteIndexer event) # setLastStablePoint :: Point event -> SQLiteIndexer event -> m (SQLiteIndexer event) # | |
(MonadIO io, MonadError IndexerError io) => IsIndex io event (FileIndexer meta) # | |
Defined in Marconi.Core.Indexer.FileIndexer Methods index :: Timed (Point event) (Maybe event) -> FileIndexer meta event -> io (FileIndexer meta event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> FileIndexer meta event -> io (FileIndexer meta event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> FileIndexer meta event -> io (FileIndexer meta event) # rollback :: Point event -> FileIndexer meta event -> io (FileIndexer meta event) # setLastStablePoint :: Point event -> FileIndexer meta event -> io (FileIndexer meta event) # | |
(MonadIO m, MonadError IndexerError m, IsIndex m event indexer) => IsIndex m event (WithAction indexer) # | |
Defined in Marconi.Core.Transformer.WithAction Methods index :: Timed (Point event) (Maybe event) -> WithAction indexer event -> m (WithAction indexer event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithAction indexer event -> m (WithAction indexer event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithAction indexer event -> m (WithAction indexer event) # rollback :: Point event -> WithAction indexer event -> m (WithAction indexer event) # setLastStablePoint :: Point event -> WithAction indexer event -> m (WithAction indexer event) # | |
(MonadIO m, IsIndex m event indexer, Ord (Point event)) => IsIndex m event (WithCatchup indexer) # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods index :: Timed (Point event) (Maybe event) -> WithCatchup indexer event -> m (WithCatchup indexer event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithCatchup indexer event -> m (WithCatchup indexer event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithCatchup indexer event -> m (WithCatchup indexer event) # rollback :: Point event -> WithCatchup indexer event -> m (WithCatchup indexer event) # setLastStablePoint :: Point event -> WithCatchup indexer event -> m (WithCatchup indexer event) # | |
(Monad m, IsIndex m event indexer, Ord (Point event)) => IsIndex m event (WithDelay indexer) # | |
Defined in Marconi.Core.Transformer.WithDelay Methods index :: Timed (Point event) (Maybe event) -> WithDelay indexer event -> m (WithDelay indexer event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithDelay indexer event -> m (WithDelay indexer event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithDelay indexer event -> m (WithDelay indexer event) # rollback :: Point event -> WithDelay indexer event -> m (WithDelay indexer event) # setLastStablePoint :: Point event -> WithDelay indexer event -> m (WithDelay indexer event) # | |
(MonadError IndexerError m, Ord (Point event), Prunable m event indexer, IsIndex m event indexer, HasGenesis (Point event)) => IsIndex m event (WithPruning indexer) # | |
Defined in Marconi.Core.Transformer.WithPruning Methods index :: Timed (Point event) (Maybe event) -> WithPruning indexer event -> m (WithPruning indexer event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithPruning indexer event -> m (WithPruning indexer event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithPruning indexer event -> m (WithPruning indexer event) # rollback :: Point event -> WithPruning indexer event -> m (WithPruning indexer event) # setLastStablePoint :: Point event -> WithPruning indexer event -> m (WithPruning indexer event) # | |
(Ord (Point event), Flushable m event mem, Traversable (Container mem), IsIndex m event store, IsIndex m event mem, IsSync m event mem) => IsIndex m event (MixedIndexer store mem) # | |
Defined in Marconi.Core.Indexer.MixedIndexer Methods index :: Timed (Point event) (Maybe event) -> MixedIndexer store mem event -> m (MixedIndexer store mem event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> MixedIndexer store mem event -> m (MixedIndexer store mem event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> MixedIndexer store mem event -> m (MixedIndexer store mem event) # rollback :: Point event -> MixedIndexer store mem event -> m (MixedIndexer store mem event) # setLastStablePoint :: Point event -> MixedIndexer store mem event -> m (MixedIndexer store mem event) # | |
IsIndex m event indexer => IsIndex m event (IndexTransformer config indexer) # | |
Defined in Marconi.Core.Transformer.IndexTransformer Methods index :: Timed (Point event) (Maybe event) -> IndexTransformer config indexer event -> m (IndexTransformer config indexer event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> IndexTransformer config indexer event -> m (IndexTransformer config indexer event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> IndexTransformer config indexer event -> m (IndexTransformer config indexer event) # rollback :: Point event -> IndexTransformer config indexer event -> m (IndexTransformer config indexer event) # setLastStablePoint :: Point event -> IndexTransformer config indexer event -> m (IndexTransformer config indexer event) # | |
(Applicative m, IsIndex m event index, Queryable m event query index) => IsIndex m event (WithCache query index) # | This instances update all the cached queries with the incoming event and then pass this event to the underlying indexer. |
Defined in Marconi.Core.Transformer.WithCache Methods index :: Timed (Point event) (Maybe event) -> WithCache query index event -> m (WithCache query index event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithCache query index event -> m (WithCache query index event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithCache query index event -> m (WithCache query index event) # rollback :: Point event -> WithCache query index event -> m (WithCache query index event) # setLastStablePoint :: Point event -> WithCache query index event -> m (WithCache query index event) # | |
(MonadIO m, MonadError IndexerError m, IsIndex m event index) => IsIndex m event (WithTrace m index) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods index :: Timed (Point event) (Maybe event) -> WithTrace m index event -> m (WithTrace m index event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTrace m index event -> m (WithTrace m index event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTrace m index event -> m (WithTrace m index event) # rollback :: Point event -> WithTrace m index event -> m (WithTrace m index event) # setLastStablePoint :: Point event -> WithTrace m index event -> m (WithTrace m index event) # | |
(Applicative m, IsIndex m event index) => IsIndex m event (WithTracer m index) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods index :: Timed (Point event) (Maybe event) -> WithTracer m index event -> m (WithTracer m index event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTracer m index event -> m (WithTracer m index event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTracer m index event -> m (WithTracer m index event) # rollback :: Point event -> WithTracer m index event -> m (WithTracer m index event) # setLastStablePoint :: Point event -> WithTracer m index event -> m (WithTracer m index event) # | |
(Point output ~ Point input, IsIndex m output indexer, Ord (Point output)) => IsIndex m input (WithTransform indexer output) # | |
Defined in Marconi.Core.Transformer.WithTransform Methods index :: Timed (Point input) (Maybe input) -> WithTransform indexer output input -> m (WithTransform indexer output input) # indexAll :: (Eq (Point input), Traversable f) => f (Timed (Point input) (Maybe input)) -> WithTransform indexer output input -> m (WithTransform indexer output input) # indexAllDescending :: (Eq (Point input), Traversable f) => f (Timed (Point input) (Maybe input)) -> WithTransform indexer output input -> m (WithTransform indexer output input) # rollback :: Point input -> WithTransform indexer output input -> m (WithTransform indexer output input) # setLastStablePoint :: Point input -> WithTransform indexer output input -> m (WithTransform indexer output input) # | |
(HasGenesis (Point output), Point input ~ Point output, IsSync m output indexer, IsIndex m output indexer, Ord (Point output)) => IsIndex m input (WithFold m indexer output) # | |
Defined in Marconi.Core.Transformer.WithFold Methods index :: Timed (Point input) (Maybe input) -> WithFold m indexer output input -> m (WithFold m indexer output input) # indexAll :: (Eq (Point input), Traversable f) => f (Timed (Point input) (Maybe input)) -> WithFold m indexer output input -> m (WithFold m indexer output input) # indexAllDescending :: (Eq (Point input), Traversable f) => f (Timed (Point input) (Maybe input)) -> WithFold m indexer output input -> m (WithFold m indexer output input) # rollback :: Point input -> WithFold m indexer output input -> m (WithFold m indexer output input) # setLastStablePoint :: Point input -> WithFold m indexer output input -> m (WithFold m indexer output input) # | |
(MonadTrans t, MonadIO m, MonadIO (t m), MonadError IndexerError (t m), IsIndex (t m) event index) => IsIndex (t m) event (WithTrace m index) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods index :: Timed (Point event) (Maybe event) -> WithTrace m index event -> t m (WithTrace m index event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTrace m index event -> t m (WithTrace m index event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTrace m index event -> t m (WithTrace m index event) # rollback :: Point event -> WithTrace m index event -> t m (WithTrace m index event) # setLastStablePoint :: Point event -> WithTrace m index event -> t m (WithTrace m index event) # | |
(MonadTrans t, Monad m, Monad (t m), IsIndex (t m) event index) => IsIndex (t m) event (WithTracer m index) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods index :: Timed (Point event) (Maybe event) -> WithTracer m index event -> t m (WithTracer m index event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTracer m index event -> t m (WithTracer m index event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTracer m index event -> t m (WithTracer m index event) # rollback :: Point event -> WithTracer m index event -> t m (WithTracer m index event) # setLastStablePoint :: Point event -> WithTracer m index event -> t m (WithTracer m index event) # |
indexEither :: (IsIndex (ExceptT err m) event indexer, Eq (Point event)) => Timed (Point event) (Maybe event) -> indexer event -> m (Either err (indexer event)) #
Like index
, but internalise its error in the result.
It's useful when you don't want to internalise the error in the monad stack to handle it explicitly,
it's often used when we target IO as we don't want to mess with IOException
.
indexAllEither :: (IsIndex (ExceptT err m) event indexer, Traversable f, Ord (Point event)) => f (Timed (Point event) (Maybe event)) -> indexer event -> m (Either err (indexer event)) #
Like indexAll
, but internalise the error in the result.
indexAllDescendingEither :: (IsIndex (ExceptT err m) event indexer, Traversable f, Ord (Point event)) => f (Timed (Point event) (Maybe event)) -> indexer event -> m (Either err (indexer event)) #
Like indexAllDescending
, but internalise the error in the result.
class HasGenesis (Point event) => Resetable m event indexer where #
We can reset an indexer, clearing all its content
indexer
is the indexer implementation typeevent
the indexer eventsm
the monad in which our indexer operates
Instances
(HasGenesis (Point event), Applicative m) => Resetable m event ListIndexer # | |
Defined in Marconi.Core.Indexer.ListIndexer Methods reset :: ListIndexer event -> m (ListIndexer event) # | |
(Applicative m, Resetable m event indexer) => Resetable m event (WithCatchup indexer) # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods reset :: WithCatchup indexer event -> m (WithCatchup indexer event) # | |
(Applicative m, Resetable m event indexer) => Resetable m event (WithDelay indexer) # | |
Defined in Marconi.Core.Transformer.WithDelay | |
(Monad m, Prunable m event indexer, Resetable m event indexer, HasGenesis (Point event), Ord (Point event)) => Resetable m event (WithPruning indexer) # | |
Defined in Marconi.Core.Transformer.WithPruning Methods reset :: WithPruning indexer event -> m (WithPruning indexer event) # | |
(Monad m, Resetable m event index, HasGenesis (Point event), Ord (Point event), Queryable m event query index) => Resetable m event (WithCache query index) # | Rollback the underlying indexer, clear the cache, repopulate it with queries to the underlying indexer. |
Defined in Marconi.Core.Transformer.WithCache | |
(HasGenesis (Point event), Functor m, Resetable m event indexer) => Resetable m event (WithTrace m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer | |
(HasGenesis (Point event), Functor m, Resetable m event indexer) => Resetable m event (WithTracer m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods reset :: WithTracer m indexer event -> m (WithTracer m indexer event) # | |
(Functor m, Resetable m output indexer, HasGenesis (Point event)) => Resetable m event (WithTransform indexer output) # | |
Defined in Marconi.Core.Transformer.WithTransform Methods reset :: WithTransform indexer output event -> m (WithTransform indexer output event) # | |
(Functor m, Resetable m output indexer, HasGenesis (Point event)) => Resetable m event (WithFold m indexer output) # | |
Defined in Marconi.Core.Transformer.WithFold | |
(MonadTrans t, Monad m, Monad (t m), HasGenesis (Point event), Resetable (t m) event indexer) => Resetable (t m) event (WithTrace m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer | |
(MonadTrans t, Monad m, Monad (t m), HasGenesis (Point event), Resetable (t m) event indexer) => Resetable (t m) event (WithTracer m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods reset :: WithTracer m indexer event -> t m (WithTracer m indexer event) # |
Arguments
:: MonadError IndexerError m | |
=> Resetable m event indexer | |
=> IsIndex m event indexer | |
=> IsSync m event indexer | |
=> HasGenesis (Point event) | |
=> Ord (Point event) | |
=> Point event | expected resume point |
-> Bool | do we allow reset? |
-> indexer event | the indexer to resume |
-> m (Point event, indexer event) | the indexer back to the provided given point |
Try to rollback to a given point to resume the indexer.
If we can't resume from here, either we allow reset and we restart the indexer from genesis, or we don't and we throw an error.
class Functor m => IsSync m event indexer where #
We know how far an indexer went in the indexation of events.
Methods
lastStablePoint :: indexer event -> m (Point event) #
Get the latest stable point known by the indexer
lastSyncPoint :: indexer event -> m (Point event) #
Last sync point of the indexer.
Instances
isAheadOfSync :: (Ord (Point event), IsSync m event indexer) => Point event -> indexer event -> m Bool #
Check if the given point is ahead of the last syncPoint of an indexer
class Closeable m indexer where #
We know how to close an indexer
Instances
class Queryable m event query indexer where #
The indexer can answer a Query to produce the corresponding result of that query.
indexer
is the indexer implementation typeevent
the indexer eventsquery
the type of query we want to answerm
the monad in which our indexer operates
Minimal complete definition
Methods
query :: Ord (Point event) => Point event -> query -> indexer event -> m (Result query) #
Query an indexer at a given point in time It can be read as: "With the knowledge you have at that point in time, what is your answer to this query?"
queryLatest :: (IsSync m event indexer, Monad m, Ord (Point event)) => query -> indexer event -> m (Result query) #
Like query
, but use the latest point of the indexer instead of a provided one
Instances
Queryable m event query indexer => Queryable m event query (WithAction indexer) # | |
Defined in Marconi.Core.Transformer.WithAction Methods query :: Point event -> query -> WithAction indexer event -> m (Result query) # queryLatest :: query -> WithAction indexer event -> m (Result query) # | |
Queryable m event query indexer => Queryable m event query (WithCatchup indexer) # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods query :: Point event -> query -> WithCatchup indexer event -> m (Result query) # queryLatest :: query -> WithCatchup indexer event -> m (Result query) # | |
Queryable m event query indexer => Queryable m event query (WithDelay indexer) # | |
Defined in Marconi.Core.Transformer.WithDelay | |
Queryable m event query indexer => Queryable m event query (WithPruning indexer) # | |
Defined in Marconi.Core.Transformer.WithPruning Methods query :: Point event -> query -> WithPruning indexer event -> m (Result query) # queryLatest :: query -> WithPruning indexer event -> m (Result query) # | |
(AppendResult m event query ListIndexer, Queryable m event query store) => Queryable m event query (MixedIndexer store ListIndexer) # | |
Defined in Marconi.Core.Indexer.MixedIndexer Methods query :: Point event -> query -> MixedIndexer store ListIndexer event -> m (Result query) # queryLatest :: query -> MixedIndexer store ListIndexer event -> m (Result query) # | |
Queryable m event query indexer => Queryable m event query (IndexTransformer config indexer) # | |
Defined in Marconi.Core.Transformer.IndexTransformer Methods query :: Point event -> query -> IndexTransformer config indexer event -> m (Result query) # queryLatest :: query -> IndexTransformer config indexer event -> m (Result query) # | |
(Ord query, Ord (Point event), IsSync m event index, MonadError (QueryError query) m, Monad m, Queryable m event query index) => Queryable m event query (WithCache query index) # | |
Defined in Marconi.Core.Transformer.WithCache | |
(Queryable m event query indexer, MonadIO m, MonadError (QueryError e) m) => Queryable m event query (WithTrace IO indexer) # | |
Queryable m event query indexer => Queryable m event query (WithTracer n indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods query :: Point event -> query -> WithTracer n indexer event -> m (Result query) # queryLatest :: query -> WithTracer n indexer event -> m (Result query) # | |
(Queryable m output query indexer, Point output ~ Point input) => Queryable m input query (WithTransform indexer output) # | |
Defined in Marconi.Core.Transformer.WithTransform Methods query :: Point input -> query -> WithTransform indexer output input -> m (Result query) # queryLatest :: query -> WithTransform indexer output input -> m (Result query) # | |
(Queryable m output query indexer, Point input ~ Point output) => Queryable m input query (WithFold n indexer output) # | |
Defined in Marconi.Core.Transformer.WithFold | |
(MonadIO m, MonadError (QueryError (GetLastQuery event)) m) => Queryable m event (GetLastQuery event) LastEventIndexer # | |
Defined in Marconi.Core.Indexer.LastEventIndexer Methods query :: Point event -> GetLastQuery event -> LastEventIndexer event -> m (Result (GetLastQuery event)) # queryLatest :: GetLastQuery event -> LastEventIndexer event -> m (Result (GetLastQuery event)) # | |
MonadError (QueryError (EventAtQuery event)) m => Queryable m event (EventAtQuery event) ListIndexer # | |
Defined in Marconi.Core.Query Methods query :: Point event -> EventAtQuery event -> ListIndexer event -> m (Result (EventAtQuery event)) # queryLatest :: EventAtQuery event -> ListIndexer event -> m (Result (EventAtQuery event)) # | |
MonadError (QueryError (EventsFromQuery event)) m => Queryable m event (EventsFromQuery event) ListIndexer # | |
Defined in Marconi.Core.Query Methods query :: Point event -> EventsFromQuery event -> ListIndexer event -> m (Result (EventsFromQuery event)) # queryLatest :: EventsFromQuery event -> ListIndexer event -> m (Result (EventsFromQuery event)) # | |
MonadError (QueryError (EventsMatchingQuery event)) m => Queryable m event (EventsMatchingQuery event) ListIndexer # | |
Defined in Marconi.Core.Query Methods query :: Point event -> EventsMatchingQuery event -> ListIndexer event -> m (Result (EventsMatchingQuery event)) # queryLatest :: EventsMatchingQuery event -> ListIndexer event -> m (Result (EventsMatchingQuery event)) # | |
MonadError (QueryError (LatestEventsQuery event)) m => Queryable m event (LatestEventsQuery event) ListIndexer # | |
Defined in Marconi.Core.Query Methods query :: Point event -> LatestEventsQuery event -> ListIndexer event -> m (Result (LatestEventsQuery event)) # queryLatest :: LatestEventsQuery event -> ListIndexer event -> m (Result (LatestEventsQuery event)) # | |
(MonadIO m, MonadError (QueryError (EventAtQuery event)) m) => Queryable m event (EventAtQuery event) (FileIndexer meta) # | |
Defined in Marconi.Core.Query Methods query :: Point event -> EventAtQuery event -> FileIndexer meta event -> m (Result (EventAtQuery event)) # queryLatest :: EventAtQuery event -> FileIndexer meta event -> m (Result (EventAtQuery event)) # | |
(MonadIO m, MonadError (QueryError (EventsFromQuery event)) m) => Queryable m event (EventsFromQuery event) (FileIndexer meta) # | |
Defined in Marconi.Core.Query Methods query :: Point event -> EventsFromQuery event -> FileIndexer meta event -> m (Result (EventsFromQuery event)) # queryLatest :: EventsFromQuery event -> FileIndexer meta event -> m (Result (EventsFromQuery event)) # | |
(MonadIO m, MonadError (QueryError (LatestEventsQuery event)) m) => Queryable m event (LatestEventsQuery event) (FileIndexer meta) # | |
Defined in Marconi.Core.Query Methods query :: Point event -> LatestEventsQuery event -> FileIndexer meta event -> m (Result (LatestEventsQuery event)) # queryLatest :: LatestEventsQuery event -> FileIndexer meta event -> m (Result (LatestEventsQuery event)) # |
queryEither :: Queryable (ExceptT (QueryError query) m) event query indexer => Ord (Point event) => Point event -> query -> indexer event -> m (Either (QueryError query) (Result query)) #
Like 'queryEither, but internalise QueryError
in the result.
queryLatestEither :: Queryable (ExceptT (QueryError query) m) event query indexer => IsSync m event indexer => Monad m => Ord (Point event) => query -> indexer event -> m (Either (QueryError query) (Result query)) #
Like queryEither
, but use the latest point of the indexer instead of a provided one
class AppendResult m event query indexer where #
The indexer can take a result and complete it with its events
Methods
appendResult :: Ord (Point event) => Point event -> query -> indexer event -> m (Result query) -> m (Result query) #
Instances
MonadError (QueryError (EventAtQuery event)) m => AppendResult m event (EventAtQuery event) ListIndexer # | |
Defined in Marconi.Core.Query Methods appendResult :: Point event -> EventAtQuery event -> ListIndexer event -> m (Result (EventAtQuery event)) -> m (Result (EventAtQuery event)) # | |
MonadError (QueryError (EventsFromQuery event)) m => AppendResult m event (EventsFromQuery event) ListIndexer # | |
Defined in Marconi.Core.Query Methods appendResult :: Point event -> EventsFromQuery event -> ListIndexer event -> m (Result (EventsFromQuery event)) -> m (Result (EventsFromQuery event)) # | |
MonadError (QueryError (EventsMatchingQuery event)) m => AppendResult m event (EventsMatchingQuery event) ListIndexer # | |
Defined in Marconi.Core.Query Methods appendResult :: Point event -> EventsMatchingQuery event -> ListIndexer event -> m (Result (EventsMatchingQuery event)) -> m (Result (EventsMatchingQuery event)) # |
Errors
data IndexerError #
Error that can occur when you index events
Constructors
RollbackBehindHistory | An indexer don't have access to the history at the point that is asked |
IndexerInternalError Text | The indexer did not respond |
InvalidIndexer Text | The indexer is in an invalid state and can't recover |
StopIndexer (Maybe Text) | Indexer has to stop as requested by the given worker |
ResumingFailed Text | The indexer failed at resuming (likely due to a bug) |
IndexerCloseTimeoutError | The indexer timed out while attempting to write a file before closing |
OtherIndexError Text | Any other cause of failure |
Instances
Exception IndexerError # | |
Defined in Marconi.Core.Type Methods toException :: IndexerError -> SomeException Source # fromException :: SomeException -> Maybe IndexerError Source # | |
Show IndexerError # | |
Defined in Marconi.Core.Type | |
Eq IndexerError # | |
Defined in Marconi.Core.Type Methods (==) :: IndexerError -> IndexerError -> Bool Source # (/=) :: IndexerError -> IndexerError -> Bool Source # |
_RollbackBehindHistory :: Prism' IndexerError () #
_IndexerInternalError :: Prism' IndexerError Text #
_InvalidIndexer :: Prism' IndexerError Text #
_StopIndexer :: Prism' IndexerError (Maybe Text) #
_ResumingFailed :: Prism' IndexerError Text #
_IndexerCloseTimeoutError :: Prism' IndexerError () #
_OtherIndexError :: Prism' IndexerError Text #
data QueryError query #
Error that can occurs when you query an indexer
Constructors
AheadOfLastSync (Maybe (Result query)) | The required point is ahead of the current index. The error may still provide its latest result if it make sense for the given query. It can be useful for indexer that contains a partial knowledge and that want to pass this knowledge to another indexer to complete the query. |
NotStoredAnymore | The requested point is too far in the past and has been pruned |
IndexerQueryError Text | The indexer query failed |
SlotNoBoundsInvalid Text | Upper or lower SlotNo bounds provided in the query are not consistent. For example, the requested point is too early to answer the query completely. |
Instances
(Typeable query, Show (Result query)) => Exception (QueryError query) # | |
Defined in Marconi.Core.Type Methods toException :: QueryError query -> SomeException Source # fromException :: SomeException -> Maybe (QueryError query) Source # displayException :: QueryError query -> String Source # | |
Show (Result query) => Show (QueryError query) # | |
Defined in Marconi.Core.Type |
_AheadOfLastSync :: forall query query. Prism (QueryError query) (QueryError query) (Maybe (Result query)) (Maybe (Result query)) #
_NotStoredAnymore :: forall query. Prism' (QueryError query) () #
_IndexerQueryError :: forall query. Prism' (QueryError query) Text #
_SlotNoBoundsInvalid :: forall query. Prism' (QueryError query) Text #
Core Indexers
A bunch of indexers that should cover the general need.
ListIndexer
to store events in memory.SQLiteIndexer
to store events in a SQLite database.MixedIndexer
to store recent events in an indexer and older events in another one.
In memory
A Full in memory indexer, backed by a simple list of events. Most of the logic for this indexer is generic.
To create an indexer instance that uses a list indexer, you have to define:
- the type of
event
you want to store - the type(s) of query your indexer instance must handle
- the
Queryable
interface for these queries.
data ListIndexer event #
The constructor is not exposed, use listIndexer
instead.
Instances
mkListIndexer :: HasGenesis (Point event) => ListIndexer event #
A smart constructor for list indexer, starting at genesis with an empty list.
events :: forall event. Lens' (ListIndexer event) [Timed (Point event) event] #
latestPoint :: forall event. Lens' (ListIndexer event) (Point event) #
On disk
SQLite
An in-memory indexer that stores its events in a SQLite database.
We try to provide as much machinery as possible to minimize the work needed
to create an indexer instance that uses a SQLiteIndexer
.
Populating a constructor and defining the queries
(and the corresponding Queryable
interface)
should be enough to have an operational indexer.
data SQLiteIndexer event #
Provide the minimal elements required to use a SQLite database to back an indexer.
Constructors
SQLiteIndexer SQLiteDBLocation Connection (Pool Connection) [[SQLInsertPlan event]] [SQLRollbackPlan (Point event)] SetLastStablePointQuery (Point event) (Point event) |
Instances
newtype GetLastStablePointQuery #
A newtype to get the last stable point from an indexer.
Constructors
GetLastStablePointQuery | |
Fields
|
newtype SetLastStablePointQuery #
A newtype to set the last stable point of an indexer.
Constructors
SetLastStablePointQuery | |
Fields
|
newtype InsertPointQuery #
Constructors
InsertPointQuery Query |
data SQLiteDBLocation #
Start a new indexer or resume an existing SQLite indexer
The main difference with SQLiteIndexer
is
that we set dbLastSync
thanks to the provided query
Arguments
:: forall event m. (MonadIO m, MonadError IndexerError m, HasGenesis (Point event), FromRow (Point event), ToRow (Point event), Ord (Point event)) | |
=> SQLiteDBLocation | |
-> [Query] | creation statement |
-> [[SQLInsertPlan event]] | extract |
-> [SQLRollbackPlan (Point event)] | the rollbackQuery |
-> SetLastStablePointQuery | The SQL query to set the last stable point of the indexer. |
-> GetLastStablePointQuery | The SQL query to fetch the last stable point from the indexer. |
-> m (SQLiteIndexer event) |
Start a new indexer or resume an existing SQLite indexer
The main difference with SQLiteIndexer
is that we set dbLastSync
thanks to the provided query.
It helps resuming an existing indexer.
A smart constructor for indexer that want to map an event to a single table. We just have to set the type family of `InsertRecord event` to `[param]` and then to provide the expected parameters.
It is monomorphic restriction of mkSqliteIndexer
Arguments
:: forall m event param. (MonadIO m, MonadError IndexerError m, HasGenesis (Point event), FromRow (Point event), ToRow (Point event), ToRow param, Ord (Point event)) | |
=> SQLiteDBLocation | |
-> (Timed (Point event) event -> param) | extract |
-> Query | the creation query |
-> Query | the insert query |
-> SQLRollbackPlan (Point event) | the rollback query |
-> SetLastStablePointQuery | The SQL query to set the last stable point of the indexer. |
-> GetLastStablePointQuery | The SQL query to fetch the last stable point from the indexer. |
-> m (SQLiteIndexer event) |
A smart constructor for indexer that want to map an event to a single table. We just have to set the type family of `InsertRecord event` to `[param]` and then to provide the expected parameters.
It is monomorphic restriction of mkSqliteIndexer
writeConnection :: forall event. Lens' (SQLiteIndexer event) Connection #
readConnectionPool :: forall event. Lens' (SQLiteIndexer event) (Pool Connection) #
data SQLInsertPlan event #
A SQLInsertPlan
provides a piece information about how an event should be inserted in the database
Constructors
forall a.ToRow a => SQLInsertPlan | |
Fields
|
data SQLRollbackPlan point #
A SQLRollbackPlan
provides a piece of information on how to perform a rollback on the data
inserted in the database.
Constructors
forall a.ToField a => SQLRollbackPlan | |
Fields |
Reexport from SQLite
Minimal complete definition
Nothing
Instances
ToRow () | |
Defined in Database.SQLite.Simple.ToRow | |
ToField a => ToRow (Only a) | |
Defined in Database.SQLite.Simple.ToRow | |
ToField a => ToRow [a] | |
Defined in Database.SQLite.Simple.ToRow | |
(ToRow a, ToRow b) => ToRow (a :. b) | |
Defined in Database.SQLite.Simple.ToRow | |
(ToField a, ToField b) => ToRow (a, b) | |
Defined in Database.SQLite.Simple.ToRow | |
(ToField a, ToField b, ToField c) => ToRow (a, b, c) | |
Defined in Database.SQLite.Simple.ToRow | |
(ToField a, ToField b, ToField c, ToField d) => ToRow (a, b, c, d) | |
Defined in Database.SQLite.Simple.ToRow | |
(ToField a, ToField b, ToField c, ToField d, ToField e) => ToRow (a, b, c, d, e) | |
Defined in Database.SQLite.Simple.ToRow | |
(ToField a, ToField b, ToField c, ToField d, ToField e, ToField f) => ToRow (a, b, c, d, e, f) | |
Defined in Database.SQLite.Simple.ToRow | |
(ToField a, ToField b, ToField c, ToField d, ToField e, ToField f, ToField g) => ToRow (a, b, c, d, e, f, g) | |
Defined in Database.SQLite.Simple.ToRow | |
(ToField a, ToField b, ToField c, ToField d, ToField e, ToField f, ToField g, ToField h) => ToRow (a, b, c, d, e, f, g, h) | |
Defined in Database.SQLite.Simple.ToRow | |
(ToField a, ToField b, ToField c, ToField d, ToField e, ToField f, ToField g, ToField h, ToField i) => ToRow (a, b, c, d, e, f, g, h, i) | |
Defined in Database.SQLite.Simple.ToRow | |
(ToField a, ToField b, ToField c, ToField d, ToField e, ToField f, ToField g, ToField h, ToField i, ToField j) => ToRow (a, b, c, d, e, f, g, h, i, j) | |
Defined in Database.SQLite.Simple.ToRow |
dbLastSync :: forall event. Lens' (SQLiteIndexer event) (Point event) #
Arguments
:: (MonadIO m, MonadError (QueryError query) m, Ord (Point event), FromRow r) | |
=> (Point event -> query -> [NamedParam]) | A preprocessing of the query, to obtain SQL parameters |
-> (query -> Query) | The sqlite query statement |
-> (query -> [r] -> Result query) | Post processing of the result, to obtain the final result |
-> Point event | |
-> query | |
-> SQLiteIndexer event | |
-> m (Result query) |
A helper for the definition of the Queryable
typeclass for SQLiteIndexer
The helper just remove a bit of the boilerplate needed to transform data to query the database.
It doesn't contain any logic, except a check for AheadOfLastSync
error,
in which case it throws the AheadOfLastSync
exception with a partial result.
If you don't want to query the database on a partial result,
use querySyncedOnlySQLiteIndexerWith
It doesn't filter the result based on the given data point.
queryLatestSQLiteIndexerWith #
Arguments
:: MonadIO m | |
=> FromRow r | |
=> (query -> [NamedParam]) | A preprocessing of the query, to obtain SQL parameters |
-> (query -> Query) | The sqlite query statement |
-> (query -> [r] -> Result query) | Post processing of the result, to obtain the final result |
-> query | |
-> SQLiteIndexer event | |
-> m (Result query) |
A helper for the definition of queryLatest
in the Queryable
typeclass
for SQLiteIndexer
.
The helper just remove a bit of the boilerplate needed to transform data to query the database. It also assumes that the SQL query will deal with the latest part (we don't use a side query to access the latest sync point).
querySyncedOnlySQLiteIndexerWith #
Arguments
:: MonadIO m | |
=> MonadError (QueryError query) m | |
=> Ord (Point event) | |
=> FromRow r | |
=> (Point event -> query -> [NamedParam]) | A preprocessing of the query, to obtain SQL parameters |
-> (query -> Query) | The sqlite query statement |
-> (query -> [r] -> Result query) | Post processing of the result, to obtain the final result |
-> Point event | |
-> query | |
-> SQLiteIndexer event | |
-> m (Result query) |
A helper for the definition of the Queryable
typeclass for SQLiteIndexer
.
The helper just remove a bit of the boilerplate needed to transform data to query the database.
It doesn't contain any logic, except a check for AheadOfLastSync
error,
in which case it throws the AheadOfLastSync
without any result attached.
It doesn't filter the result based on the given data point.
handleSQLErrors :: IO a -> IO (Either IndexerError a) #
Map SQLite errors to an indexer error
data SQLiteAggregateQuery m point event #
An aggregation of SQLite indexers used to build query across indexers.
Constructors
SQLiteAggregateQuery [SQLiteSourceProvider m point] (Pool Connection) |
Instances
(MonadIO m, Ord point, point ~ Point event) => IsSync m event (SQLiteAggregateQuery m point) # | |
Defined in Marconi.Core.Indexer.SQLiteAggregateQuery Methods lastStablePoint :: SQLiteAggregateQuery m point event -> m (Point event) # lastSyncPoint :: SQLiteAggregateQuery m point event -> m (Point event) # | |
MonadIO m => Closeable m (SQLiteAggregateQuery m point) # | |
Defined in Marconi.Core.Indexer.SQLiteAggregateQuery Methods close :: SQLiteAggregateQuery m point event -> m () # | |
(MonadIO m, MonadTrans t, Monad (t m), Ord point, point ~ Point event) => IsSync (t m) event (SQLiteAggregateQuery m point) # | |
Defined in Marconi.Core.Indexer.SQLiteAggregateQuery Methods lastStablePoint :: SQLiteAggregateQuery m point event -> t m (Point event) # lastSyncPoint :: SQLiteAggregateQuery m point event -> t m (Point event) # |
aggregateConnection :: Lens' (SQLiteAggregateQuery m point event) (Pool Connection) #
The connection that provides a read access accross the different databases
Arguments
:: Map String (SQLiteSourceProvider m point) | A map of indexers. The keys are used as an alias int the attach statement |
-> IO (SQLiteAggregateQuery m point event) |
Build a SQLiteSourceProvider
from a map that attaches
each database of the provided sources to the corresponding alias
withResource :: Pool a -> (a -> IO r) -> IO r #
data SQLiteSourceProvider m point #
A wrapper around indexers.
Its purpose is mainly to allow the use of a heterogenerous lists of indexers as a source for a
SQLiteAggregateQuery
.
Constructors
forall indexer event.(IsSourceProvider m event indexer, Point event ~ point) => SQLiteSourceProvider (MVar (indexer event)) |
type IsSourceProvider m event indexer = (IsSync m event indexer, HasDatabasePath indexer) #
Alias to gather typeclasses required to be a source provider
class HasDatabasePath indexer where #
A class for indexer that has access to a SQLite database and know the path to this database
Methods
getDatabasePath :: indexer event -> SQLiteDBLocation #
Retrieve the database path from the indexer
Instances
Connection utilities
readOnlyConnection :: MonadIO io => SQLiteDBLocation -> io Connection #
Create an SQL connection with ReadOnly permission and concurrent connections
readWriteConnection :: (MonadIO io, MonadError IndexerError io) => SQLiteDBLocation -> io Connection #
Create an SQL connection with ReadWrite permission and concurrent connections
On file
An indexer that serialise the event to disk
data FileIndexer meta event #
An indexer that store events in a directory, one file per event.
It as to type parameters:
meta
the metadata type, used to build the filenameevent
the indexed events type.
Constructors
FileIndexer FilePath (FileStorageConfig meta event) (FileBuilder meta event) (EventBuilder meta event) (Point event) (Point event) (Maybe AsyncWriteFileConfig) |
Instances
data FileStorageConfig meta event #
The datatype used to configure the way we store files, including:
- control over which events are saved
- how many events we keep on disk
Be careful in the choice of the function used to remove events as you probably don't want to store all the events on disk.
Constructors
FileStorageConfig Bool (Maybe (FileCleanup meta event)) (meta -> meta -> Ordering) |
data FileCleanup meta event #
withPartition :: (Timed (Point event) (Maybe event) -> [EventInfo meta] -> ([EventInfo meta], [EventInfo meta])) -> Maybe (FileCleanup meta event) #
data FileBuilder meta event #
Configuration datatype for FileIndexer
that provides the content needed to create the
filenames used by the indexer
Constructors
FileBuilder Text Text (Timed (Point event) (Maybe event) -> [Text]) (event -> Builder) (Point event -> Builder) |
data EventBuilder meta event #
This datatypes gather the functions needed to deserialise events from both the filenames and the file content.
meta
is the metadata type and event
the type of events we handle.
Information about an event that can be gathered from the filename
mkFileIndexer :: (MonadIO m, MonadError IndexerError m, Ord (Point event), HasGenesis (Point event)) => FilePath -> Maybe Int -> FileStorageConfig meta event -> FileBuilder meta event -> EventBuilder meta event -> m (FileIndexer meta event) #
data LastEventIndexer event #
An indexer that only track the latest value of an event On rollback, we either delete the value or keep the one provided before the rollback, depending on the provided configuration.
Constructors
LastEventIndexer (LastEventConfig event) (LastEventState event) |
Instances
mkLastEventIndexer :: (HasGenesis (Point event), MonadIO m, MonadError IndexerError m) => LastEventConfig event -> m (LastEventIndexer event) #
data LastEventConfig event #
data GetLastQuery a #
Datatype used to query the stored value of a LastEventIndexer
Constructors
GetLastQuery |
Instances
(MonadIO m, MonadError (QueryError (GetLastQuery event)) m) => Queryable m event (GetLastQuery event) LastEventIndexer # | |
Defined in Marconi.Core.Indexer.LastEventIndexer Methods query :: Point event -> GetLastQuery event -> LastEventIndexer event -> m (Result (GetLastQuery event)) # queryLatest :: GetLastQuery event -> LastEventIndexer event -> m (Result (GetLastQuery event)) # | |
type Result (GetLastQuery a) # | |
Defined in Marconi.Core.Indexer.LastEventIndexer |
Mixed indexer
An indexer that uses two indexer internally: one for the most recents points, another for the older points.
The idea is that recent events are _volatile_ and can be rollbacked, so they must be easy to delete if needed. Older events are stable and can be store on disk to be persisted.
data MixedIndexer store mem event #
An indexer that keepAtLeast _configKeepInMemory
events in memory
and put the older one on disk by group of _configFlushSize
events.
The query interface for this indexer will always go through the database first and then prune
results present in memory.
mem
the indexer that handles old events, when we need to remove stuff from memory
store
the indexer that handle the most recent events
Instances
(AppendResult m event query ListIndexer, Queryable m event query store) => Queryable m event query (MixedIndexer store ListIndexer) # | |
Defined in Marconi.Core.Indexer.MixedIndexer Methods query :: Point event -> query -> MixedIndexer store ListIndexer event -> m (Result query) # queryLatest :: query -> MixedIndexer store ListIndexer event -> m (Result query) # | |
(Ord (Point event), Flushable m event mem, Traversable (Container mem), IsIndex m event store, IsIndex m event mem, IsSync m event mem) => IsIndex m event (MixedIndexer store mem) # | |
Defined in Marconi.Core.Indexer.MixedIndexer Methods index :: Timed (Point event) (Maybe event) -> MixedIndexer store mem event -> m (MixedIndexer store mem event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> MixedIndexer store mem event -> m (MixedIndexer store mem event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> MixedIndexer store mem event -> m (MixedIndexer store mem event) # rollback :: Point event -> MixedIndexer store mem event -> m (MixedIndexer store mem event) # setLastStablePoint :: Point event -> MixedIndexer store mem event -> m (MixedIndexer store mem event) # | |
(IsSync event m mem, Monad event) => IsSync event m (MixedIndexer store mem) # | |
Defined in Marconi.Core.Indexer.MixedIndexer Methods lastStablePoint :: MixedIndexer store mem m -> event (Point m) # lastSyncPoint :: MixedIndexer store mem m -> event (Point m) # | |
(Functor m, Prunable m event store) => Prunable m event (MixedIndexer store mem) # | |
Defined in Marconi.Core.Transformer.WithPruning Methods prune :: Point event -> MixedIndexer store mem event -> m (MixedIndexer store mem event) # pruningPoint :: MixedIndexer store mem event -> m (Maybe (Point event)) # | |
(Closeable m store, Closeable m mem, Monad m) => Closeable m (MixedIndexer store mem) # | |
Defined in Marconi.Core.Indexer.MixedIndexer Methods close :: MixedIndexer store mem event -> m () # | |
HasMixedConfig (MixedIndexer store mem) # | |
Defined in Marconi.Core.Indexer.MixedIndexer Methods flushEvery :: Lens' (MixedIndexer store mem event) Word # keepInMemory :: Lens' (MixedIndexer store mem event) Word # |
A smart constructor for MixedIndexer
.
It doesn't sync up the last syncEvent of inMemory and in Database indexers.
As a consequence, if these indexers aren't empty, you probably want to modify the result of the constructor.
Arguments
:: Word | how many events are kept in memory after a flush |
-> Word | flush size |
-> store event | |
-> mem event | |
-> MixedIndexer store mem event |
A smart constructor for a MixedIndexer
that sets only the relevant parameters
A smart constructor for a standard MixedIndexer
:
an on-disk indexer and a ListIndexer
for the in-memory part.
Contrary to mkMixedIndexer
,
this smart constructor checks the content of the on-disk indexer
to set the lastSyncPoint
correctly.
Arguments
:: (IsSync m event store, HasGenesis (Point event), Ord (Point event), Monad m) | |
=> Word | how many events are kept in memory after a flush |
-> Word | flush size |
-> store event | |
-> m (MixedIndexer store ListIndexer event) |
Create a mixed indexer that initialised it's last stable point point from the inDatabase indexer
inMemory :: Lens' (MixedIndexer store mem event) (mem event) #
access to the in memory part of the indexer
inDatabase :: Lens' (MixedIndexer store mem event) (store event) #
access to the on disk part of the indexer
A type class that give access to the configuration of a MixedIndexer
class HasMixedConfig indexer where #
Provide access to the mixed indexer configuration, an indexer transformer has to implement it if you want to access the configuration option of the mixed indexer below the transformer
Instances
(IndexerTrans t, HasMixedConfig indexer) => HasMixedConfig (t indexer) # | |
Defined in Marconi.Core.Indexer.MixedIndexer | |
HasMixedConfig (MixedIndexer store mem) # | |
Defined in Marconi.Core.Indexer.MixedIndexer Methods flushEvery :: Lens' (MixedIndexer store mem event) Word # keepInMemory :: Lens' (MixedIndexer store mem event) Word # | |
(IndexerMapTrans t, HasMixedConfig indexer) => HasMixedConfig (t indexer output) # | |
Defined in Marconi.Core.Indexer.MixedIndexer Methods flushEvery :: Lens' (t indexer output event) Word # keepInMemory :: Lens' (t indexer output event) Word # |
The indexer of the most recent events must be able to send a part of its events to the other indexer when they are stable.
The number of events that are sent and the number of events kept in memory
is controlled by the MixedIndexer
class Flushable m event indexer where #
Define a way to flush old events out of a container
Associated Types
type Container (indexer :: Type -> Type) :: Type -> Type #
The events container used when you flush event
Methods
memorySize :: indexer event -> m Word #
Arguments
:: Point event | Last point to flush |
-> indexer event | |
-> m (Container indexer (Timed (Point event) event), indexer event) |
Clear the memory and return its content
Instances
(Applicative m, Ord (Point event)) => Flushable m event ListIndexer # | |
Defined in Marconi.Core.Indexer.MixedIndexer Associated Types type Container ListIndexer :: Type -> Type # Methods memorySize :: ListIndexer event -> m Word # flushMemory :: Point event -> ListIndexer event -> m (Container ListIndexer (Timed (Point event) event), ListIndexer event) # |
LastPointIndexer
data LastPointIndexer event #
LastPointIndexer. An indexer that does nothing except keeping track of the last point. While it may sound useless, it can be usefull when you want to benefit of the capabilities of a transformer.
Instances
lastPointIndexer :: HasGenesis (Point event) => LastPointIndexer event #
A smart constructor for LastPointIndexer
Running indexers
To control a set of indexers simultaneously,
we want to be able to consider a list of them.
Unfortunately, each indexer has its own purpose and (usually) operates
on its own event
type.
As a consequence, a list of indexer would be an heterogeneous list,
which is hard to manipulate.
WorkerM
(and WorkerM
) offers an opaque representation of a WorkerM
,
which can be embed in a homogeneous list.
Coordinator take a bunch of WorkerM
and synchronise their work,
sending them events and rollbacks.
Workers
A worker hides the shape of an indexer and integrates the data needed to interact with a coordinator.
Constructors
forall indexer event n.(WorkerIndexerType n event indexer, Point event ~ point) => Worker | |
Fields
|
data WorkerIndexer m input event indexer #
Worker which also provides direct access to the indexer hidden inside it.
Constructors
WorkerIndexer | |
Fields
|
type WorkerIndexerType n event indexer = (IsIndex n event indexer, IsSync n event indexer, Closeable n indexer) #
Type alias for the type classes that are required to build a worker for an indexer
startWorker :: forall input m. MonadIO m => Ord (Point input) => TChan (ProcessedInput (Point input) input) -> MVar IndexerError -> QSemN -> QSemN -> Worker input (Point input) -> m ThreadId #
The worker notify its coordinator that it's ready and starts waiting for new events and process them as they come
createWorker :: (MonadIO f, WorkerIndexerType (ExceptT IndexerError m) event indexer) => Text -> (input -> Maybe event) -> indexer event -> f (WorkerIndexer m input event indexer) #
create a worker for an indexer that already throws IndexerError
createWorkerHoist :: (MonadIO f, WorkerIndexerType n event indexer) => (forall a. n a -> ExceptT IndexerError m a) -> Text -> Preprocessor (ExceptT IndexerError m) (Point event) input event -> indexer event -> f (WorkerIndexer m input event indexer) #
create a worker for an indexer, retuning the worker and the MVar
it's using internally
createWorkerPure :: (MonadIO f, MonadIO m, WorkerIndexerType m event indexer) => Text -> Preprocessor (ExceptT IndexerError m) (Point event) input event -> indexer event -> f (WorkerIndexer m input event indexer) #
create a worker for an indexer that doesn't throw error
createWorkerWithPreprocessing :: (MonadIO f, WorkerIndexerType (ExceptT IndexerError m) event indexer) => Text -> Preprocessor (ExceptT IndexerError m) (Point event) input event -> indexer event -> f (WorkerIndexer m input event indexer) #
create a worker for an indexer that already throws IndexerError
data ProcessedInput point event #
The different types of input event that should be handled by an indexer used to map the chain incoming events to something that an indexer should be able to digest.
Constructors
Rollback point | A rollback happen and indexers need to go back to the given point in time |
Index (Timed point (Maybe event)) | A new event has to be indexed |
IndexAllDescending (NonEmpty (Timed point (Maybe event))) | A new event has to be indexed |
StableAt point | Inform the indexer of the latest stable point reached |
Stop | Processing stops |
Instances
Preprocessors
Preprocessors are used to alter the incoming the events sent to an indexer through a worker.
It allows to transform the content of a block or to silence some events.
type Preprocessor m point a b = ScanM m [ProcessedInput point a] [ProcessedInput point b] #
Stateful tranformer. Map a list of preprocessor actions to a new lis of actions. Actions should be read from left to right element of the list
mapEvent :: Monad m => (a -> b) -> Preprocessor m point a b #
Lift a function on events to a preprocessor
mapMaybeEvent :: forall m point a b. Monad m => (a -> Maybe b) -> Preprocessor m point a b #
Lift a function that may emit an event to a preprocessor.
traverseEvent :: Monad m => (a -> m b) -> Preprocessor m point a b #
Lift an effectful function on events to a preprocessor
traverseMaybeEvent :: Monad m => (a -> m (Maybe b)) -> Preprocessor m point a b #
Lift an effectful function that may emit an event to a preprocessor
scanEvent :: forall m s point a b. Monad m => (a -> State s b) -> s -> Preprocessor m point a b #
Create a tranformer from a strict stateful computation
scanEventM :: forall m s point a b. Monad m => (a -> StateT s m b) -> m s -> Preprocessor m point a b #
Create a tranformer from a strict stateful computation
scanMaybeEvent :: forall m s point a b. Monad m => (a -> State s (Maybe b)) -> s -> Preprocessor m point a b #
Create a tranformer from a strict stateful computation
scanMaybeEventM :: forall m s point a b. Monad m => (a -> StateT s m (Maybe b)) -> m s -> Preprocessor m point a b #
Create a tranformer from a strict stateful computation
preprocessor :: Monad m => (ProcessedInput point a -> State s [ProcessedInput point b]) -> s -> Preprocessor m point a b #
Lift a stateful function as a preprocessor
preprocessorM :: Monad m => (ProcessedInput point a -> StateT s m [ProcessedInput point b]) -> m s -> Preprocessor m point a b #
Lift a stateful and effectful function as a preprocessor
Resuming/draining
type Resume m event = Preprocessor m (Point event) event event #
A type alias for a preprocessor that allows resuming. We don't use a newtype to ease preprocessors composition.
Arguments
:: (MonadError IndexerError m, Ord (Point event)) | |
=> Point event | The last point of an indexer. |
-> Resume m event |
Coordinator
data Coordinator input #
A coordinator synchronises the event processing of a list of indexers. A coordinator is itself is an indexer. It means that we can create a tree of indexer, with coordinators that partially process the data at each node, and with concrete indexers at the leaves.
Instances
workers :: forall input. Lens' (Coordinator input) [Worker input (Point input)] #
tokens :: forall input. Lens' (Coordinator input) QSemN #
channel :: forall input. Lens' (Coordinator input) (TChan (ProcessedInput (Point input) input)) #
nbWorkers :: forall input. Lens' (Coordinator input) Int #
mkCoordinator :: Ord (Point input) => [Worker input (Point input)] -> IO (Coordinator input) #
create a coordinator and starts its workers
Arguments
:: forall indexer event s r. (Ord (Point event), IsIndex (ExceptT IndexerError IO) event indexer, Closeable IO indexer) | |
=> (Timed (Point event) (Maybe event) -> State s (Maybe (Point event))) | emit stable point based on incoming information |
-> s | |
-> TBQueue (ProcessedInput (Point event) event) | |
-> MVar (indexer event) | |
-> CloseSwitch | |
-> IO r |
Read a queue of events, processing them synchronously on each worker
Note that this function silently throw an IndexError
if the event processing fails.
data CloseSwitch #
Should the indexer be closed after running its action?
Common queries
data EventAtQuery event #
Get the event stored by the indexer at a given point in time
Constructors
EventAtQuery |
Instances
newtype EventsFromQuery event #
Get the non empty events from the given point (excluded) to the one of the query (included)
Constructors
EventsFromQuery (Point event) |
Instances
newtype EventsMatchingQuery event #
Query an indexer to find all events that match a given predicate
The result should return the most recent first
Constructors
EventsMatchingQuery (event -> Maybe event) |
Instances
MonadError (QueryError (EventsMatchingQuery event)) m => AppendResult m event (EventsMatchingQuery event) ListIndexer # | |
Defined in Marconi.Core.Query Methods appendResult :: Point event -> EventsMatchingQuery event -> ListIndexer event -> m (Result (EventsMatchingQuery event)) -> m (Result (EventsMatchingQuery event)) # | |
MonadError (QueryError (EventsMatchingQuery event)) m => Queryable m event (EventsMatchingQuery event) ListIndexer # | |
Defined in Marconi.Core.Query Methods query :: Point event -> EventsMatchingQuery event -> ListIndexer event -> m (Result (EventsMatchingQuery event)) # queryLatest :: EventsMatchingQuery event -> ListIndexer event -> m (Result (EventsMatchingQuery event)) # | |
type Result (EventsMatchingQuery event) # | The result of an |
Defined in Marconi.Core.Query | |
type Result (WithStability (EventsMatchingQuery event)) # | |
Defined in Marconi.Core.Query |
allEvents :: EventsMatchingQuery event #
Get all the events that are stored in the indexer
calcStability :: Ord point => (event -> point) -> point -> event -> Stability event #
Helper function to wrap an event in Stability
based on the last stable point and a given way
to get a point.
Asks: Is the provided point less than (from a time before) or equal to (from the time of) the last stable point?
newtype LatestEventsQuery event #
Get the nbOfEvents
last non empty events from the indexer before the point given in the query
Constructors
LatestEventsQuery Word |
Instances
MonadError (QueryError (LatestEventsQuery event)) m => Queryable m event (LatestEventsQuery event) ListIndexer # | |
Defined in Marconi.Core.Query Methods query :: Point event -> LatestEventsQuery event -> ListIndexer event -> m (Result (LatestEventsQuery event)) # queryLatest :: LatestEventsQuery event -> ListIndexer event -> m (Result (LatestEventsQuery event)) # | |
(MonadIO m, MonadError (QueryError (LatestEventsQuery event)) m) => Queryable m event (LatestEventsQuery event) (FileIndexer meta) # | |
Defined in Marconi.Core.Query Methods query :: Point event -> LatestEventsQuery event -> FileIndexer meta event -> m (Result (LatestEventsQuery event)) # queryLatest :: LatestEventsQuery event -> FileIndexer meta event -> m (Result (LatestEventsQuery event)) # | |
type Result (LatestEventsQuery event) # | |
Defined in Marconi.Core.Query |
latestEvent :: LatestEventsQuery event #
Get the latest non empty event before the point given in the query
Represents whether an event is considered to stable or not.
Instances
queryErrorWithStability :: (Result (WithStability query) ~ f (Stability event), Result query ~ f event) => (f event -> f (Stability event)) -> QueryError query -> QueryError (WithStability query) #
newtype WithStability query #
A wrapper representing the demand that the result of a query
should come with Stability
data
Constructors
WithStability | |
Fields
|
Instances
type Result (WithStability (EventsFromQuery event)) # | |
Defined in Marconi.Core.Query | |
type Result (WithStability (EventsMatchingQuery event)) # | |
Defined in Marconi.Core.Query |
Arguments
:: (Result query ~ f event, Result (WithStability query) ~ f (Stability event), MonadError (QueryError (WithStability query)) m, Traversable f) | |
=> (event -> Stability event) | A function to calculate |
-> ExceptT (QueryError query) m (f event) | In |
-> m (f (Stability event)) |
Arguments
:: forall event m indexer f. (Monad m, Ord (Point event), IsSync m event indexer, Traversable f) | |
=> indexer event | An indexer |
-> f (Timed (Point event) event) | A traversable of query results |
-> m (f (Stability (Timed (Point event) event))) |
Given an indexer and some traversable of timed query results, calculate the stability of all the query results.
Arguments
:: forall result m event indexer f. (Monad m, Ord (Point event), IsSync m event indexer, Applicative f) | |
=> indexer event | An indexer |
-> Point event | A specific point to compare against the last stable point |
-> f result | A traversable of query results |
-> m (f (Stability result)) |
Given an indexer, a Point
and some traversable of query results,
calculate the stability of all the query results.
Indexer Transformers
class IndexerTrans t where #
An indexer transformer: it adds a configurable capability to a tranformer
Instances
IndexerTrans WithAction # | |
Defined in Marconi.Core.Transformer.WithAction Methods unwrap :: forall (indexer :: Type -> Type) event. Lens' (WithAction indexer event) (indexer event) # | |
IndexerTrans WithCatchup # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods unwrap :: forall (indexer :: Type -> Type) event. Lens' (WithCatchup indexer event) (indexer event) # | |
IndexerTrans WithDelay # | |
IndexerTrans WithPruning # | |
Defined in Marconi.Core.Transformer.WithPruning Methods unwrap :: forall (indexer :: Type -> Type) event. Lens' (WithPruning indexer event) (indexer event) # | |
IndexerTrans (IndexTransformer config) # | |
Defined in Marconi.Core.Transformer.IndexTransformer Methods unwrap :: forall (indexer :: Type -> Type) event. Lens' (IndexTransformer config indexer event) (indexer event) # | |
IndexerTrans (WithCache query) # | |
IndexerTrans (WithTrace m) # | |
IndexerTrans (WithTracer m) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods unwrap :: forall (indexer :: Type -> Type) event. Lens' (WithTracer m indexer event) (indexer event) # |
class IndexerMapTrans t where #
An indexer transformer: it adds a configurable capability to a tranformer
This one allow also the transformation of the event, contrary to IndexerTrans
.
Instances
IndexerMapTrans WithTransform # | |
Defined in Marconi.Core.Transformer.WithTransform Methods unwrapMap :: forall (indexer :: Type -> Type) output event. Lens' (WithTransform indexer output event) (indexer output) # | |
IndexerMapTrans (WithFold m) # | |
Logging
data IndexerEvent point #
Event available for the tracer
Constructors
Instances
Show point => Show (IndexerEvent point) # | |
Defined in Marconi.Core.Transformer.WithTracer | |
Pretty point => Pretty (IndexerEvent point) # | |
Defined in Marconi.Core.Transformer.WithTracer |
Trace
data WithTrace m indexer event #
A tracer modifier that adds tracing to an existing indexer
Instances
(Queryable m event query indexer, MonadIO m, MonadError (QueryError e) m) => Queryable m event query (WithTrace IO indexer) # | |
(MonadIO m, MonadError IndexerError m, IsIndex m event index) => IsIndex m event (WithTrace m index) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods index :: Timed (Point event) (Maybe event) -> WithTrace m index event -> m (WithTrace m index event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTrace m index event -> m (WithTrace m index event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTrace m index event -> m (WithTrace m index event) # rollback :: Point event -> WithTrace m index event -> m (WithTrace m index event) # setLastStablePoint :: Point event -> WithTrace m index event -> m (WithTrace m index event) # | |
IsSync m event indexer => IsSync m event (WithTrace n indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods lastStablePoint :: WithTrace n indexer event -> m (Point event) # lastSyncPoint :: WithTrace n indexer event -> m (Point event) # | |
(HasGenesis (Point event), Functor m, Resetable m event indexer) => Resetable m event (WithTrace m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer | |
HasTraceConfig m event (WithTrace m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods trace :: Lens' (WithTrace m indexer event) (Trace m (IndexerEvent (Point event))) # | |
(MonadIO m, Closeable m indexer) => Closeable m (WithTrace m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer | |
IndexerTrans (WithTrace m) # | |
(MonadTrans t, MonadIO m, MonadIO (t m), MonadError IndexerError (t m), IsIndex (t m) event index) => IsIndex (t m) event (WithTrace m index) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods index :: Timed (Point event) (Maybe event) -> WithTrace m index event -> t m (WithTrace m index event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTrace m index event -> t m (WithTrace m index event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTrace m index event -> t m (WithTrace m index event) # rollback :: Point event -> WithTrace m index event -> t m (WithTrace m index event) # setLastStablePoint :: Point event -> WithTrace m index event -> t m (WithTrace m index event) # | |
(MonadTrans t, Monad m, Monad (t m), HasGenesis (Point event), Resetable (t m) event indexer) => Resetable (t m) event (WithTrace m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer | |
(MonadTrans t, MonadIO (t m), MonadIO m, MonadError IndexerError (t m), Closeable (t m) indexer) => Closeable (t m) (WithTrace m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer | |
HasDatabasePath indexer => HasDatabasePath (WithTrace m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods getDatabasePath :: WithTrace m indexer event -> SQLiteDBLocation # |
withTrace :: Applicative m => Trace m (IndexerEvent (Point event)) -> indexer event -> WithTrace m indexer event #
A smart constructor for WithTrace
withTraceM :: MonadIO m => Trace m (IndexerEvent (Point event)) -> m (indexer event) -> m (WithTrace m indexer event) #
A monadic smart constructor for WithTrace
class HasTraceConfig m event indexer where #
It gives access to the tracer. The provided instances allows access to the tracer event below an indexer transformer.
Methods
trace :: Lens' (indexer event) (Trace m (IndexerEvent (Point event))) #
Instances
(IndexerTrans t, HasTraceConfig m event indexer) => HasTraceConfig m event (t indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods trace :: Lens' (t indexer event) (Trace m (IndexerEvent (Point event))) # | |
HasTraceConfig m event (WithTrace m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods trace :: Lens' (WithTrace m indexer event) (Trace m (IndexerEvent (Point event))) # | |
(IndexerMapTrans t, HasTraceConfig m output indexer) => HasTraceConfig m output (t indexer output) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods trace :: Lens' (t indexer output output) (Trace m (IndexerEvent (Point output))) # |
Tracer
data WithTracer m indexer event #
A tracer modifier that adds tracing to an existing indexer
Instances
Queryable m event query indexer => Queryable m event query (WithTracer n indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods query :: Point event -> query -> WithTracer n indexer event -> m (Result query) # queryLatest :: query -> WithTracer n indexer event -> m (Result query) # | |
(Applicative m, IsIndex m event index) => IsIndex m event (WithTracer m index) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods index :: Timed (Point event) (Maybe event) -> WithTracer m index event -> m (WithTracer m index event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTracer m index event -> m (WithTracer m index event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTracer m index event -> m (WithTracer m index event) # rollback :: Point event -> WithTracer m index event -> m (WithTracer m index event) # setLastStablePoint :: Point event -> WithTracer m index event -> m (WithTracer m index event) # | |
IsSync m event indexer => IsSync m event (WithTracer n indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods lastStablePoint :: WithTracer n indexer event -> m (Point event) # lastSyncPoint :: WithTracer n indexer event -> m (Point event) # | |
(HasGenesis (Point event), Functor m, Resetable m event indexer) => Resetable m event (WithTracer m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods reset :: WithTracer m indexer event -> m (WithTracer m indexer event) # | |
HasTracerConfig m event (WithTracer m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods tracer :: Lens' (WithTracer m indexer event) (Tracer m (IndexerEvent (Point event))) # | |
(Applicative m, Closeable m indexer) => Closeable m (WithTracer m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods close :: WithTracer m indexer event -> m () # | |
IndexerTrans (WithTracer m) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods unwrap :: forall (indexer :: Type -> Type) event. Lens' (WithTracer m indexer event) (indexer event) # | |
(MonadTrans t, Monad m, Monad (t m), IsIndex (t m) event index) => IsIndex (t m) event (WithTracer m index) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods index :: Timed (Point event) (Maybe event) -> WithTracer m index event -> t m (WithTracer m index event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTracer m index event -> t m (WithTracer m index event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithTracer m index event -> t m (WithTracer m index event) # rollback :: Point event -> WithTracer m index event -> t m (WithTracer m index event) # setLastStablePoint :: Point event -> WithTracer m index event -> t m (WithTracer m index event) # | |
(MonadTrans t, Monad m, Monad (t m), HasGenesis (Point event), Resetable (t m) event indexer) => Resetable (t m) event (WithTracer m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods reset :: WithTracer m indexer event -> t m (WithTracer m indexer event) # | |
(MonadTrans t, Applicative (t m), Monad m, Closeable (t m) indexer) => Closeable (t m) (WithTracer m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods close :: WithTracer m indexer event -> t m () # | |
HasDatabasePath indexer => HasDatabasePath (WithTracer m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods getDatabasePath :: WithTracer m indexer event -> SQLiteDBLocation # |
withTracer :: Applicative m => Tracer m (IndexerEvent (Point event)) -> indexer event -> WithTracer m indexer event #
A smart constructor for WithTracer
withTracerM :: Applicative m => Tracer m (IndexerEvent (Point event)) -> m (indexer event) -> m (WithTracer m indexer event) #
A monadic smart constructor for WithTracer
class HasTracerConfig m event indexer where #
It gives access to the tracer. The provided instances allows access to the tracer event below an indexer transformer.
Methods
tracer :: Lens' (indexer event) (Tracer m (IndexerEvent (Point event))) #
Instances
(IndexerTrans t, HasTracerConfig m event indexer) => HasTracerConfig m event (t indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods tracer :: Lens' (t indexer event) (Tracer m (IndexerEvent (Point event))) # | |
HasTracerConfig m event (WithTracer m indexer) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods tracer :: Lens' (WithTracer m indexer event) (Tracer m (IndexerEvent (Point event))) # | |
(IndexerMapTrans t, HasTracerConfig m output indexer) => HasTracerConfig m output (t indexer output) # | |
Defined in Marconi.Core.Transformer.WithTracer Methods tracer :: Lens' (t indexer output output) (Tracer m (IndexerEvent (Point output))) # |
Catchup
data WithCatchup indexer event #
Instances
IndexerTrans WithCatchup # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods unwrap :: forall (indexer :: Type -> Type) event. Lens' (WithCatchup indexer event) (indexer event) # | |
Queryable m event query indexer => Queryable m event query (WithCatchup indexer) # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods query :: Point event -> query -> WithCatchup indexer event -> m (Result query) # queryLatest :: query -> WithCatchup indexer event -> m (Result query) # | |
(MonadIO m, IsIndex m event indexer, Ord (Point event)) => IsIndex m event (WithCatchup indexer) # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods index :: Timed (Point event) (Maybe event) -> WithCatchup indexer event -> m (WithCatchup indexer event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithCatchup indexer event -> m (WithCatchup indexer event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithCatchup indexer event -> m (WithCatchup indexer event) # rollback :: Point event -> WithCatchup indexer event -> m (WithCatchup indexer event) # setLastStablePoint :: Point event -> WithCatchup indexer event -> m (WithCatchup indexer event) # | |
IsSync m event indexer => IsSync m event (WithCatchup indexer) # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods lastStablePoint :: WithCatchup indexer event -> m (Point event) # lastSyncPoint :: WithCatchup indexer event -> m (Point event) # | |
(Applicative m, Resetable m event indexer) => Resetable m event (WithCatchup indexer) # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods reset :: WithCatchup indexer event -> m (WithCatchup indexer event) # | |
Closeable m indexer => Closeable m (WithCatchup indexer) # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods close :: WithCatchup indexer event -> m () # | |
HasDatabasePath indexer => HasDatabasePath (WithCatchup indexer) # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods getDatabasePath :: WithCatchup indexer event -> SQLiteDBLocation # | |
HasCatchupConfig (WithCatchup indexer) # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods catchupBypassDistance :: Lens' (WithCatchup indexer event) Word64 # catchupBatchSize :: Lens' (WithCatchup indexer event) Word64 # catchupEventHook :: Lens' (WithCatchup indexer event) (Maybe (CatchupEvent -> IO ())) # |
Arguments
:: (Point event -> event -> Word64) | The distance function |
-> CatchupConfig | Configure how many element we put in a batch and until when we use it |
-> indexer event | the underlying indexer |
-> WithCatchup indexer event |
A smart constructor for WithCatchup
data CatchupConfig #
The visible part of the catchup configuration, it allows you to configure the size of the batch and to control when the batch mecanism stops
Constructors
CatchupConfig Word64 Word64 (Maybe (CatchupEvent -> IO ())) |
mkCatchupConfig :: Word64 -> Word64 -> CatchupConfig #
configCatchupEventHook :: Lens' CatchupConfig (Maybe (CatchupEvent -> IO ())) #
class HasCatchupConfig indexer where #
A typeclass that allows an indexer with a WitchCatchup
transformer to configure
the behaviour of this transformer
Methods
catchupBypassDistance :: Lens' (indexer event) Word64 #
catchupBatchSize :: Lens' (indexer event) Word64 #
catchupEventHook :: Lens' (indexer event) (Maybe (CatchupEvent -> IO ())) #
Instances
HasCatchupConfig (WithCatchup indexer) # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods catchupBypassDistance :: Lens' (WithCatchup indexer event) Word64 # catchupBatchSize :: Lens' (WithCatchup indexer event) Word64 # catchupEventHook :: Lens' (WithCatchup indexer event) (Maybe (CatchupEvent -> IO ())) # | |
(IndexerTrans t, HasCatchupConfig indexer) => HasCatchupConfig (t indexer) # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods catchupBypassDistance :: Lens' (t indexer event) Word64 # catchupBatchSize :: Lens' (t indexer event) Word64 # catchupEventHook :: Lens' (t indexer event) (Maybe (CatchupEvent -> IO ())) # | |
(IndexerMapTrans t, HasCatchupConfig indexer) => HasCatchupConfig (t indexer output) # | |
Defined in Marconi.Core.Transformer.WithCatchup Methods catchupBypassDistance :: Lens' (t indexer output event) Word64 # catchupBatchSize :: Lens' (t indexer output event) Word64 # catchupEventHook :: Lens' (t indexer output event) (Maybe (CatchupEvent -> IO ())) # |
data CatchupEvent #
Constructors
Synced |
SQLite
Arguments
:: Text | Pretty name of the indexer |
-> Trace IO Text | Trace for logging. |
-> Connection | SQLite database connection |
-> String | SQLite index name |
-> Query | SQL statement for creating the index table |
-> IO () |
Utility function for creating an SQLite database index.
Typically used with WithCatchup
as a way to create SQLite indexes when
fully synchronized. This function includes some logging so that the user is informed when the
creation of the SQLite indexes is done.
An example would look like:
let stdoutTrace = ... c = ... slotNoIndexName = "utxo_slotNo" createSlotNoIndexStatement = "CREATE INDEX IF NOT EXISTS " <> fromString slotNoIndexName <> " ON utxo (slotNo)" createIndexTable Utxo stdoutTrace c slotNoIndexName createSlotNoIndexStatement
Delay
data WithDelay indexer event #
When indexing computation is expensive, you may want to delay it to avoid expensive rollback
WithDelay
buffers events before sending them to the underlying indexer.
Buffered events are sent when the buffers overflows.
An indexer wrapped in WithDelay
won't interact nicely with a coordinator at the moment,
as WithDelay
acts as it's processing an event while it only postpones the processing.
As a consequence, WithDelay
is preferably used at the top of the hierarchy.
Instances
A smart constructor for WithDelay
class HasDelayConfig indexer where #
Provide accces to the delay size of a WithDelay
transformer.
The provided instance ensure that access is granted even if other indexer transformers are used
on top of this one
Methods
delayCapacity :: Lens' (indexer event) Word #
Instances
HasDelayConfig (WithDelay indexer) # | |
Defined in Marconi.Core.Transformer.WithDelay Methods delayCapacity :: Lens' (WithDelay indexer event) Word # | |
(IndexerTrans t, HasDelayConfig indexer) => HasDelayConfig (t indexer) # | |
Defined in Marconi.Core.Transformer.WithDelay Methods delayCapacity :: Lens' (t indexer event) Word # | |
(IndexerMapTrans t, HasDelayConfig indexer) => HasDelayConfig (t indexer output) # | |
Defined in Marconi.Core.Transformer.WithDelay Methods delayCapacity :: Lens' (t indexer output event) Word # |
Pruning
data WithPruning indexer event #
WithPruning control when we should prune an indexer
The main purpose is to optimize storage for events that can't be rollbacked anymore.
In some contexts, once you know that an event can't be rollback anymore,
your indexer may not need it, or may process it's information to make it
irrelevant.
In this case, you may want to prune
the stored events.
Instances
IndexerTrans WithPruning # | |
Defined in Marconi.Core.Transformer.WithPruning Methods unwrap :: forall (indexer :: Type -> Type) event. Lens' (WithPruning indexer event) (indexer event) # | |
Queryable m event query indexer => Queryable m event query (WithPruning indexer) # | |
Defined in Marconi.Core.Transformer.WithPruning Methods query :: Point event -> query -> WithPruning indexer event -> m (Result query) # queryLatest :: query -> WithPruning indexer event -> m (Result query) # | |
(MonadError IndexerError m, Ord (Point event), Prunable m event indexer, IsIndex m event indexer, HasGenesis (Point event)) => IsIndex m event (WithPruning indexer) # | |
Defined in Marconi.Core.Transformer.WithPruning Methods index :: Timed (Point event) (Maybe event) -> WithPruning indexer event -> m (WithPruning indexer event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithPruning indexer event -> m (WithPruning indexer event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithPruning indexer event -> m (WithPruning indexer event) # rollback :: Point event -> WithPruning indexer event -> m (WithPruning indexer event) # setLastStablePoint :: Point event -> WithPruning indexer event -> m (WithPruning indexer event) # | |
IsSync m event indexer => IsSync m event (WithPruning indexer) # | |
Defined in Marconi.Core.Transformer.WithPruning Methods lastStablePoint :: WithPruning indexer event -> m (Point event) # lastSyncPoint :: WithPruning indexer event -> m (Point event) # | |
(Monad m, Prunable m event indexer, Resetable m event indexer, HasGenesis (Point event), Ord (Point event)) => Resetable m event (WithPruning indexer) # | |
Defined in Marconi.Core.Transformer.WithPruning Methods reset :: WithPruning indexer event -> m (WithPruning indexer event) # | |
Closeable m indexer => Closeable m (WithPruning indexer) # | |
Defined in Marconi.Core.Transformer.WithPruning Methods close :: WithPruning indexer event -> m () # | |
HasPruningConfig (WithPruning indexer) # | |
Defined in Marconi.Core.Transformer.WithPruning Methods securityParam :: Lens' (WithPruning indexer event) Word # pruneEvery :: Lens' (WithPruning indexer event) Word # |
Arguments
:: Word | how far can a rollback go |
-> Word | once we have enough events, how often do we prune |
-> indexer event | |
-> WithPruning indexer event |
nextPruning :: Lens' (WithPruning indexer event) (Seq (Point event)) #
stepsBeforeNext :: Lens' (WithPruning indexer event) Word #
currentDepth :: Lens' (WithPruning indexer event) Word #
class Prunable m event indexer where #
The indexer can prune old data.
The main purpose is to speed up query processing.
If the indexer is Rollbackable
and Prunable
,
it can't rollback
behind the pruningPoint
,
the idea is to call prune
on points that can't be rollbacked anymore.
indexer
is the indexer implementation typedesc
the descriptor of the indexer, fixing thePoint
typesm
the monad in which our indexer operates
Methods
prune :: Ord (Point event) => Point event -> indexer event -> m (indexer event) #
pruningPoint :: indexer event -> m (Maybe (Point event)) #
Instances
(IndexerTrans t, Functor m, Prunable m event indexer) => Prunable m event (t indexer) # | |
Defined in Marconi.Core.Transformer.WithPruning | |
(Functor m, Prunable m event store) => Prunable m event (MixedIndexer store mem) # | |
Defined in Marconi.Core.Transformer.WithPruning Methods prune :: Point event -> MixedIndexer store mem event -> m (MixedIndexer store mem event) # pruningPoint :: MixedIndexer store mem event -> m (Maybe (Point event)) # | |
(IndexerMapTrans t, Functor m, Prunable m output indexer) => Prunable m output (t indexer output) # | |
Defined in Marconi.Core.Transformer.WithPruning |
class HasPruningConfig indexer where #
Instances
HasPruningConfig (WithPruning indexer) # | |
Defined in Marconi.Core.Transformer.WithPruning Methods securityParam :: Lens' (WithPruning indexer event) Word # pruneEvery :: Lens' (WithPruning indexer event) Word # | |
(IndexerTrans t, HasPruningConfig indexer) => HasPruningConfig (t indexer) # | |
Defined in Marconi.Core.Transformer.WithPruning |
Arbitrary action on events
newtype WithAction indexer event #
An indexer which runs an action on events during indexing
Constructors
WithAction | |
Fields
|
Instances
IndexerTrans WithAction # | |
Defined in Marconi.Core.Transformer.WithAction Methods unwrap :: forall (indexer :: Type -> Type) event. Lens' (WithAction indexer event) (indexer event) # | |
Queryable m event query indexer => Queryable m event query (WithAction indexer) # | |
Defined in Marconi.Core.Transformer.WithAction Methods query :: Point event -> query -> WithAction indexer event -> m (Result query) # queryLatest :: query -> WithAction indexer event -> m (Result query) # | |
(MonadIO m, MonadError IndexerError m, IsIndex m event indexer) => IsIndex m event (WithAction indexer) # | |
Defined in Marconi.Core.Transformer.WithAction Methods index :: Timed (Point event) (Maybe event) -> WithAction indexer event -> m (WithAction indexer event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithAction indexer event -> m (WithAction indexer event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithAction indexer event -> m (WithAction indexer event) # rollback :: Point event -> WithAction indexer event -> m (WithAction indexer event) # setLastStablePoint :: Point event -> WithAction indexer event -> m (WithAction indexer event) # | |
IsSync m event indexer => IsSync m event (WithAction indexer) # | |
Defined in Marconi.Core.Transformer.WithAction Methods lastStablePoint :: WithAction indexer event -> m (Point event) # lastSyncPoint :: WithAction indexer event -> m (Point event) # | |
Closeable m indexer => Closeable m (WithAction indexer) # | |
Defined in Marconi.Core.Transformer.WithAction Methods close :: WithAction indexer event -> m () # |
newtype WithActionConfig event #
Configuration is a function Timed (Point event) event -> IO ()
Constructors
WithActionConfig | |
Fields
|
withActionConfigAction :: forall event event. Iso (WithActionConfig event) (WithActionConfig event) (Timed (Point event) event -> IO ()) (Timed (Point event) event -> IO ()) #
withActionWrapper :: forall indexer event indexer event. Iso (WithAction indexer event) (WithAction indexer event) (IndexTransformer WithActionConfig indexer event) (IndexTransformer WithActionConfig indexer event) #
Streaming
class Streamable a r where #
Something that can have mapped events streamed to or from it
Methods
streamFrom :: a -> Stream (Of r) IO () #
streamTo :: (Timed (Point event) event -> r) -> a -> Timed (Point event) event -> IO () #
Instances
Binary r => Streamable Socket r # | A streaming implementation using |
Streamable (TBQueue r) r # | An experimental streaming implementation using IMPORTANT: Note that this will block when used with For an example of how to use it, please see: testMarconi.CoreSpec.hspropWithStreamTBQueue |
withStream :: Streamable a r => (Timed (Point event) event -> r) -> a -> indexer event -> WithAction indexer event #
A smart constructor for WithAction
, using any streamable.
Be aware: depending on the implementation you choose, this can lead to the indexer blocking, given a slow (or non-existent) consumer!
Caching
data WithCache query indexer event #
Setup a cache for some requests.
The cache is active only for the latest Point
.
As a consequence, using WithCache
is more effective on top of the on-disk
part of a MixedIndexer
, or any other part of an indexer that has a relatively
stable sync point.
Instances
(Ord query, Ord (Point event), IsSync m event index, MonadError (QueryError query) m, Monad m, Queryable m event query index) => Queryable m event query (WithCache query index) # | |
Defined in Marconi.Core.Transformer.WithCache | |
(Applicative m, IsIndex m event index, Queryable m event query index) => IsIndex m event (WithCache query index) # | This instances update all the cached queries with the incoming event and then pass this event to the underlying indexer. |
Defined in Marconi.Core.Transformer.WithCache Methods index :: Timed (Point event) (Maybe event) -> WithCache query index event -> m (WithCache query index event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithCache query index event -> m (WithCache query index event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> WithCache query index event -> m (WithCache query index event) # rollback :: Point event -> WithCache query index event -> m (WithCache query index event) # setLastStablePoint :: Point event -> WithCache query index event -> m (WithCache query index event) # | |
IsSync m event indexer => IsSync m event (WithCache query indexer) # | |
Defined in Marconi.Core.Transformer.WithCache Methods lastStablePoint :: WithCache query indexer event -> m (Point event) # lastSyncPoint :: WithCache query indexer event -> m (Point event) # | |
(Monad m, Resetable m event index, HasGenesis (Point event), Ord (Point event), Queryable m event query index) => Resetable m event (WithCache query index) # | Rollback the underlying indexer, clear the cache, repopulate it with queries to the underlying indexer. |
Defined in Marconi.Core.Transformer.WithCache | |
Closeable m indexer => Closeable m (WithCache query indexer) # | |
Defined in Marconi.Core.Transformer.WithCache | |
HasCacheConfig query (WithCache query indexer) # | |
Defined in Marconi.Core.Transformer.WithCache | |
IndexerTrans (WithCache query) # | |
HasDatabasePath indexer => HasDatabasePath (WithCache query indexer) # | |
Defined in Marconi.Core.Transformer.WithCache Methods getDatabasePath :: WithCache query indexer event -> SQLiteDBLocation # |
Arguments
:: Ord query | |
=> (Timed (Point event) (Maybe event) -> Result query -> Result query) | The cache update function |
-> indexer event | |
-> WithCache query indexer event |
A smart constructor for WithCache
.
The cache starts empty, you can populate it with addCacheFor
addCacheFor :: forall query m event indexer. Queryable (ExceptT (QueryError query) m) event query indexer => IsSync (ExceptT (QueryError query) m) event indexer => HasCacheConfig query indexer => Monad m => MonadError IndexerError m => Ord query => Ord (Point event) => query -> indexer event -> m (indexer event) #
Add a cache for a specific query.
When added, the WithCache
queries the underlying indexer to populate the cache for this query.
If you want to add several indexers at the same time, use traverse
.
class HasCacheConfig query indexer where #
Instances
(IndexerTrans t, HasCacheConfig query indexer) => HasCacheConfig query (t indexer) # | |
Defined in Marconi.Core.Transformer.WithCache | |
HasCacheConfig query (WithCache query indexer) # | |
Defined in Marconi.Core.Transformer.WithCache | |
(IndexerMapTrans t, HasCacheConfig query indexer) => HasCacheConfig query (t indexer output) # | |
Defined in Marconi.Core.Transformer.WithCache |
Transforming input
data WithTransform indexer output input #
WithTransform is an indexer transformer that map the event type of an indexer
Instances
IndexerMapTrans WithTransform # | |
Defined in Marconi.Core.Transformer.WithTransform Methods unwrapMap :: forall (indexer :: Type -> Type) output event. Lens' (WithTransform indexer output event) (indexer output) # | |
(Queryable m output query indexer, Point output ~ Point input) => Queryable m input query (WithTransform indexer output) # | |
Defined in Marconi.Core.Transformer.WithTransform Methods query :: Point input -> query -> WithTransform indexer output input -> m (Result query) # queryLatest :: query -> WithTransform indexer output input -> m (Result query) # | |
(Point output ~ Point input, IsIndex m output indexer, Ord (Point output)) => IsIndex m input (WithTransform indexer output) # | |
Defined in Marconi.Core.Transformer.WithTransform Methods index :: Timed (Point input) (Maybe input) -> WithTransform indexer output input -> m (WithTransform indexer output input) # indexAll :: (Eq (Point input), Traversable f) => f (Timed (Point input) (Maybe input)) -> WithTransform indexer output input -> m (WithTransform indexer output input) # indexAllDescending :: (Eq (Point input), Traversable f) => f (Timed (Point input) (Maybe input)) -> WithTransform indexer output input -> m (WithTransform indexer output input) # rollback :: Point input -> WithTransform indexer output input -> m (WithTransform indexer output input) # setLastStablePoint :: Point input -> WithTransform indexer output input -> m (WithTransform indexer output input) # | |
(Point output ~ Point event, IsSync m output indexer) => IsSync m event (WithTransform indexer output) # | |
Defined in Marconi.Core.Transformer.WithTransform Methods lastStablePoint :: WithTransform indexer output event -> m (Point event) # lastSyncPoint :: WithTransform indexer output event -> m (Point event) # | |
(Functor m, Resetable m output indexer, HasGenesis (Point event)) => Resetable m event (WithTransform indexer output) # | |
Defined in Marconi.Core.Transformer.WithTransform Methods reset :: WithTransform indexer output event -> m (WithTransform indexer output event) # | |
HasTransformConfig input output (WithTransform indexer output) # | |
Defined in Marconi.Core.Transformer.WithTransform Methods transformEvent :: Lens' (WithTransform indexer output input) (input -> Maybe output) # | |
Closeable m indexer => Closeable m (WithTransform indexer output) # | |
Defined in Marconi.Core.Transformer.WithTransform Methods close :: WithTransform indexer output event -> m () # | |
HasDatabasePath indexer => HasDatabasePath (WithTransform indexer output) # | |
Defined in Marconi.Core.Transformer.WithTransform Methods getDatabasePath :: WithTransform indexer output event -> SQLiteDBLocation # |
withTransform :: (Point input -> Point output) -> (input -> Maybe output) -> indexer output -> WithTransform indexer output input #
A smart constructor for WithTransform
class HasTransformConfig input output indexer where #
Provide accces to the tranformation function of a WithTransform
transformer.
The provided instance ensure that access is granted even if other indexer transformers are used
on top of this one
Methods
transformEvent :: Lens' (indexer input) (input -> Maybe output) #
Instances
HasTransformConfig input output (WithTransform indexer output) # | |
Defined in Marconi.Core.Transformer.WithTransform Methods transformEvent :: Lens' (WithTransform indexer output input) (input -> Maybe output) # |
Transform an indexer into a fold
data WithFold m indexer output input #
WithFold
fold incoming event
to produce an output
.
Instances
(Queryable m output query indexer, Point input ~ Point output) => Queryable m input query (WithFold n indexer output) # | |
Defined in Marconi.Core.Transformer.WithFold | |
HasFold m input output (WithFold m indexer output) # | |
Defined in Marconi.Core.Transformer.WithFold | |
(HasGenesis (Point output), Point input ~ Point output, IsSync m output indexer, IsIndex m output indexer, Ord (Point output)) => IsIndex m input (WithFold m indexer output) # | |
Defined in Marconi.Core.Transformer.WithFold Methods index :: Timed (Point input) (Maybe input) -> WithFold m indexer output input -> m (WithFold m indexer output input) # indexAll :: (Eq (Point input), Traversable f) => f (Timed (Point input) (Maybe input)) -> WithFold m indexer output input -> m (WithFold m indexer output input) # indexAllDescending :: (Eq (Point input), Traversable f) => f (Timed (Point input) (Maybe input)) -> WithFold m indexer output input -> m (WithFold m indexer output input) # rollback :: Point input -> WithFold m indexer output input -> m (WithFold m indexer output input) # setLastStablePoint :: Point input -> WithFold m indexer output input -> m (WithFold m indexer output input) # | |
(Point output ~ Point event, IsSync m output indexer) => IsSync m event (WithFold m indexer output) # | |
Defined in Marconi.Core.Transformer.WithFold Methods lastStablePoint :: WithFold m indexer output event -> m (Point event) # lastSyncPoint :: WithFold m indexer output event -> m (Point event) # | |
(Functor m, Resetable m output indexer, HasGenesis (Point event)) => Resetable m event (WithFold m indexer output) # | |
Defined in Marconi.Core.Transformer.WithFold | |
Closeable m indexer => Closeable m (WithFold m indexer output) # | |
Defined in Marconi.Core.Transformer.WithFold | |
IndexerMapTrans (WithFold m) # | |
withFold :: output -> (indexer output -> m (Maybe output)) -> (output -> input -> m output) -> indexer output -> WithFold m indexer output input #
A smart constructor for WithFold
withFoldMap :: (Monoid output, Applicative m) => (indexer output -> m (Maybe output)) -> (input -> output) -> indexer output -> WithFold m indexer output input #
A smart constructor for WithFold
getLastEventAtQueryValue :: (IsSync m event indexer, HasGenesis (Point event), Queryable (ExceptT (QueryError (EventAtQuery a)) m) event (EventAtQuery a) indexer, Ord (Point event), MonadError IndexerError m) => indexer event -> m (Maybe a) #
class HasFold m input output indexer where #
There are few scenarios where you want to modify the fold function but it may happen
Index Wrapper
Wrap an indexer with some extra information to modify its behaviour
The wrapper comes with some instances that relay the function to the wrapped indexer, without any extra behaviour.
An indexer transformer can be a newtype of IndexTransformer
,
reuse some of its instances with deriving via
,
and specifies its own instances when it wants to add logic in it.
Derive via machinery
data IndexTransformer config indexer event #
This datatype is meant to be use inside a new type by any indexer transformer. It wraps an indexer and attach to it a "config" (which may be stateful) used in the logic added by the transformer
Constructors
IndexTransformer (config event) (indexer event) |
Instances
Queryable m event query indexer => Queryable m event query (IndexTransformer config indexer) # | |
Defined in Marconi.Core.Transformer.IndexTransformer Methods query :: Point event -> query -> IndexTransformer config indexer event -> m (Result query) # queryLatest :: query -> IndexTransformer config indexer event -> m (Result query) # | |
IsIndex m event indexer => IsIndex m event (IndexTransformer config indexer) # | |
Defined in Marconi.Core.Transformer.IndexTransformer Methods index :: Timed (Point event) (Maybe event) -> IndexTransformer config indexer event -> m (IndexTransformer config indexer event) # indexAll :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> IndexTransformer config indexer event -> m (IndexTransformer config indexer event) # indexAllDescending :: (Eq (Point event), Traversable f) => f (Timed (Point event) (Maybe event)) -> IndexTransformer config indexer event -> m (IndexTransformer config indexer event) # rollback :: Point event -> IndexTransformer config indexer event -> m (IndexTransformer config indexer event) # setLastStablePoint :: Point event -> IndexTransformer config indexer event -> m (IndexTransformer config indexer event) # | |
IsSync event m index => IsSync event m (IndexTransformer config index) # | |
Defined in Marconi.Core.Transformer.IndexTransformer Methods lastStablePoint :: IndexTransformer config index m -> event (Point m) # lastSyncPoint :: IndexTransformer config index m -> event (Point m) # | |
Closeable m index => Closeable m (IndexTransformer config index) # | |
Defined in Marconi.Core.Transformer.IndexTransformer Methods close :: IndexTransformer config index event -> m () # | |
IndexerTrans (IndexTransformer config) # | |
Defined in Marconi.Core.Transformer.IndexTransformer Methods unwrap :: forall (indexer :: Type -> Type) event. Lens' (IndexTransformer config indexer event) (indexer event) # | |
HasDatabasePath index => HasDatabasePath (IndexTransformer config index) # | |
Defined in Marconi.Core.Transformer.IndexTransformer Methods getDatabasePath :: IndexTransformer config index event -> SQLiteDBLocation # |
wrappedIndexer :: forall config indexer event indexer. Lens (IndexTransformer config indexer event) (IndexTransformer config indexer event) (indexer event) (indexer event) #
wrapperConfig :: forall config indexer event config. Lens (IndexTransformer config indexer event) (IndexTransformer config indexer event) (config event) (config event) #
Helpers
Via methods have two major utilities.
- In some cases, we can't use deriving via to derive a typeclass, then, via method can help in the declaration of our typeclasses.
- Often, when an indexer transformer modify the default behaviour of an indexer typeclass, we need to call the some methods of the underlying indexer, via methods make it easier.
pruneVia :: (Functor m, Prunable m event indexer, Ord (Point event)) => Lens' s (indexer event) -> Point event -> s -> m s #
Helper to implement the prune
functon of Prunable
when we use a wrapper.
Unfortunately, as m
must have a functor instance, we can't use deriving via
directly.
pruningPointVia :: Prunable m event indexer => Getter s (indexer event) -> s -> m (Maybe (Point event)) #
Helper to implement the pruningPoint
functon of Prunable
when we use a wrapper.
Unfortunately, as m
must have a functor instance, we can't use deriving via
directly.
rollbackVia :: (IsIndex m event indexer, Ord (Point event)) => Lens' s (indexer event) -> Point event -> s -> m s #
Helper to implement the rollback
functon of rollback
when we use a wrapper.
Unfortunately, as m
must have a functor instance, we can't use deriving via
directly.
resetVia :: (Functor m, Resetable m event indexer) => Lens' s (indexer event) -> s -> m s #
Helper to implement the reset
functon of Resetable
when we use a wrapper.
Unfortunately, as m
must have a functor instance, we can't use deriving via
directly.
indexVia :: (IsIndex m event indexer, Eq (Point event)) => Lens' s (indexer event) -> Timed (Point event) (Maybe event) -> s -> m s #
Helper to implement the index
functon of IsIndex
when we use a wrapper.
If you don't want to perform any other side logic, use deriving via
instead.
indexAllDescendingVia :: (Eq (Point event), IsIndex m event indexer, Traversable f) => Lens' s (indexer event) -> f (Timed (Point event) (Maybe event)) -> s -> m s #
Helper to implement the index
functon of IsIndex
when we use a wrapper.
If you don't want to perform any other side logic, use deriving via
instead.
setLastStablePointVia :: (Ord (Point event), IsIndex m event indexer) => Lens' s (indexer event) -> Point event -> s -> m s #
Helper to implement the index
functon of IsIndex
when we use a wrapper.
If you don't want to perform any other side logic, use deriving via
instead.
lastStablePointVia :: IsSync m event indexer => Getter s (indexer event) -> s -> m (Point event) #
Helper to implement the lastSyncPoint
functon of IsSync
when we use a wrapper.
If you don't want to perform any other side logic, use deriving via
instead.
lastSyncPointVia :: IsSync m event indexer => Getter s (indexer event) -> s -> m (Point event) #
Helper to implement the lastSyncPoint
functon of IsSync
when we use a wrapper.
If you don't want to perform any other side logic, use deriving via
instead.
closeVia :: Closeable m indexer => Getter s (indexer event) -> s -> m () #
Helper to implement the close
functon of Closeable
when we use a wrapper.
If you don't want to perform any other side logic, use deriving via
instead.
queryVia :: (Queryable m event query indexer, Ord (Point event)) => Getter s (indexer event) -> Point event -> query -> s -> m (Result query) #
Helper to implement the query
functon of Queryable
when we use a wrapper.
If you don't want to perform any other side logic, use deriving via
instead.
queryLatestVia :: (Queryable m event query indexer, Monad m, Ord (Point event), IsSync m event indexer) => Getter s (indexer event) -> query -> s -> m (Result query) #
Helper to implement the query
functon of Queryable
when we use a wrapper.
If you don't want to perform any other side logic, use deriving via
instead.