{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving  #-}
module Ouroboros.Consensus.Storage.VolatileDB.Impl.Parser (
    ParseError (..)
  , ParsedBlockInfo (..)
  , parseBlockFile
    -- * Auxiliary
  , extractBlockInfo
  ) where

import           Data.Bifunctor (bimap)
import qualified Data.ByteString.Lazy as Lazy
import           Data.Word (Word64)
import qualified Streaming.Prelude as S
import           Streaming.Prelude (Of (..), Stream)

import           Ouroboros.Consensus.Block
import           Ouroboros.Consensus.Util.CBOR (ReadIncrementalErr (..),
                     withStreamIncrementalOffsets)
import           Ouroboros.Consensus.Util.IOLike

import           Ouroboros.Consensus.Storage.FS.API (HasFS)
import           Ouroboros.Consensus.Storage.FS.API.Types (FsPath)
import           Ouroboros.Consensus.Storage.Serialisation
import           Ouroboros.Consensus.Storage.VolatileDB.API (BlockInfo (..))
import           Ouroboros.Consensus.Storage.VolatileDB.Impl.Types

-- | Information returned by the parser about a single block.
--
-- The parser returns for each block, its offset, its size and its 'BlockInfo'
--
-- The fields of this record are strict to make sure that by evaluating this
-- record to WHNF, we no longer hold on to the entire block. Otherwise, we might
-- accidentally keep all blocks in a single file in memory during parsing.
data ParsedBlockInfo blk = ParsedBlockInfo {
      ParsedBlockInfo blk -> BlockOffset
pbiBlockOffset :: !BlockOffset
    , ParsedBlockInfo blk -> BlockSize
pbiBlockSize   :: !BlockSize
    , ParsedBlockInfo blk -> BlockInfo blk
pbiBlockInfo   :: !(BlockInfo blk)
    , ParsedBlockInfo blk -> SomeSecond (NestedCtxt Header) blk
pbiNestedCtxt  :: !(SomeSecond (NestedCtxt Header) blk)
    }

-- | Parse the given file containing blocks.
--
-- Return the 'ParsedBlockInfo' for all the valid blocks in the file. Stop
-- when encountering an error and include the offset to truncate to.
parseBlockFile ::
     forall m blk h.
     ( IOLike m
     , GetPrevHash blk
     , HasBinaryBlockInfo blk
     , HasNestedContent Header blk
     , DecodeDisk blk (Lazy.ByteString -> blk)
     )
  => CodecConfig blk
  -> HasFS m h
  -> (blk -> Bool)
  -> BlockValidationPolicy
  -> FsPath
  -> m ( [ParsedBlockInfo blk]
       , Maybe (ParseError blk, BlockOffset)
       )
parseBlockFile :: CodecConfig blk
-> HasFS m h
-> (blk -> Bool)
-> BlockValidationPolicy
-> FsPath
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
parseBlockFile CodecConfig blk
ccfg HasFS m h
hasFS blk -> Bool
isNotCorrupt BlockValidationPolicy
validationPolicy FsPath
fsPath =
    HasFS m h
-> (forall s. Decoder s (ByteString -> blk))
-> FsPath
-> (Stream
      (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
    -> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall (m :: * -> *) h a r.
(IOLike m, HasCallStack) =>
HasFS m h
-> (forall s. Decoder s (ByteString -> a))
-> FsPath
-> (Stream
      (Of (Word64, (Word64, a))) m (Maybe (ReadIncrementalErr, Word64))
    -> m r)
-> m r
withStreamIncrementalOffsets HasFS m h
hasFS (CodecConfig blk -> forall s. Decoder s (ByteString -> blk)
forall blk a.
DecodeDisk blk a =>
CodecConfig blk -> forall s. Decoder s a
decodeDisk CodecConfig blk
ccfg) FsPath
fsPath ((Stream
    (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
  -> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
 -> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> (Stream
      (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
    -> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall a b. (a -> b) -> a -> b
$
      [ParsedBlockInfo blk]
-> Stream
     (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
checkEntries []
  where
    noValidation :: Bool
    noValidation :: Bool
noValidation = BlockValidationPolicy
validationPolicy BlockValidationPolicy -> BlockValidationPolicy -> Bool
forall a. Eq a => a -> a -> Bool
== BlockValidationPolicy
NoValidation

    checkEntries ::
         [ParsedBlockInfo blk]
      -> Stream (Of (Word64, (Word64, blk)))
                m
                (Maybe (ReadIncrementalErr, Word64))
      -> m ( [ParsedBlockInfo blk]
           , Maybe (ParseError blk, BlockOffset)
           )
    checkEntries :: [ParsedBlockInfo blk]
-> Stream
     (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
checkEntries [ParsedBlockInfo blk]
parsed Stream
  (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream = Stream
  (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m (Either
        (Maybe (ReadIncrementalErr, Word64))
        ((Word64, (Word64, blk)),
         Stream
           (Of (Word64, (Word64, blk)))
           m
           (Maybe (ReadIncrementalErr, Word64))))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
S.next Stream
  (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream m (Either
     (Maybe (ReadIncrementalErr, Word64))
     ((Word64, (Word64, blk)),
      Stream
        (Of (Word64, (Word64, blk)))
        m
        (Maybe (ReadIncrementalErr, Word64))))
-> (Either
      (Maybe (ReadIncrementalErr, Word64))
      ((Word64, (Word64, blk)),
       Stream
         (Of (Word64, (Word64, blk)))
         m
         (Maybe (ReadIncrementalErr, Word64)))
    -> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left Maybe (ReadIncrementalErr, Word64)
mbErr
        -> ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall (m :: * -> *) a. Monad m => a -> m a
return ([ParsedBlockInfo blk] -> [ParsedBlockInfo blk]
forall a. [a] -> [a]
reverse [ParsedBlockInfo blk]
parsed, (ReadIncrementalErr -> ParseError blk)
-> (Word64 -> BlockOffset)
-> (ReadIncrementalErr, Word64)
-> (ParseError blk, BlockOffset)
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap ReadIncrementalErr -> ParseError blk
forall blk. ReadIncrementalErr -> ParseError blk
BlockReadErr Word64 -> BlockOffset
BlockOffset ((ReadIncrementalErr, Word64) -> (ParseError blk, BlockOffset))
-> Maybe (ReadIncrementalErr, Word64)
-> Maybe (ParseError blk, BlockOffset)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ReadIncrementalErr, Word64)
mbErr)
      Right ((Word64
offset, (Word64
size, blk
blk)), Stream
  (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream')
        | Bool
noValidation Bool -> Bool -> Bool
|| blk -> Bool
isNotCorrupt blk
blk
        -> let !blockInfo :: BlockInfo blk
blockInfo = blk -> BlockInfo blk
forall blk.
(GetPrevHash blk, HasBinaryBlockInfo blk) =>
blk -> BlockInfo blk
extractBlockInfo blk
blk
               !newParsed :: ParsedBlockInfo blk
newParsed = ParsedBlockInfo :: forall blk.
BlockOffset
-> BlockSize
-> BlockInfo blk
-> SomeSecond (NestedCtxt Header) blk
-> ParsedBlockInfo blk
ParsedBlockInfo  {
                   pbiBlockOffset :: BlockOffset
pbiBlockOffset = Word64 -> BlockOffset
BlockOffset Word64
offset
                 , pbiBlockSize :: BlockSize
pbiBlockSize   = Word32 -> BlockSize
BlockSize (Word32 -> BlockSize) -> Word32 -> BlockSize
forall a b. (a -> b) -> a -> b
$ Word64 -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
size
                 , pbiBlockInfo :: BlockInfo blk
pbiBlockInfo   = BlockInfo blk
blockInfo
                 , pbiNestedCtxt :: SomeSecond (NestedCtxt Header) blk
pbiNestedCtxt  = case Header blk -> DepPair (NestedCtxt Header blk)
forall (f :: * -> *) blk.
HasNestedContent f blk =>
f blk -> DepPair (NestedCtxt f blk)
unnest (blk -> Header blk
forall blk. GetHeader blk => blk -> Header blk
getHeader blk
blk) of
                                      DepPair NestedCtxt Header blk a
nestedCtxt a
_ -> NestedCtxt Header blk a -> SomeSecond (NestedCtxt Header) blk
forall (f :: * -> * -> *) a b. f a b -> SomeSecond f a
SomeSecond NestedCtxt Header blk a
nestedCtxt
                 }
           in [ParsedBlockInfo blk]
-> Stream
     (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
checkEntries (ParsedBlockInfo blk
newParsed ParsedBlockInfo blk
-> [ParsedBlockInfo blk] -> [ParsedBlockInfo blk]
forall a. a -> [a] -> [a]
: [ParsedBlockInfo blk]
parsed) Stream
  (Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream'
        | Bool
otherwise  -- The block was invalid
        -> let !hash :: HeaderHash blk
hash = blk -> HeaderHash blk
forall b. HasHeader b => b -> HeaderHash b
blockHash blk
blk
           in ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall (m :: * -> *) a. Monad m => a -> m a
return ( [ParsedBlockInfo blk] -> [ParsedBlockInfo blk]
forall a. [a] -> [a]
reverse [ParsedBlockInfo blk]
parsed
                     , (ParseError blk, BlockOffset)
-> Maybe (ParseError blk, BlockOffset)
forall a. a -> Maybe a
Just (HeaderHash blk -> ParseError blk
forall blk. HeaderHash blk -> ParseError blk
BlockCorruptedErr HeaderHash blk
hash, Word64 -> BlockOffset
BlockOffset Word64
offset)
                     )

{-------------------------------------------------------------------------------
  Auxiliary
-------------------------------------------------------------------------------}

extractBlockInfo ::
     (GetPrevHash blk, HasBinaryBlockInfo blk)
  => blk
  -> BlockInfo blk
extractBlockInfo :: blk -> BlockInfo blk
extractBlockInfo blk
blk = BlockInfo :: forall blk.
HeaderHash blk
-> SlotNo
-> BlockNo
-> ChainHash blk
-> IsEBB
-> Word16
-> Word16
-> BlockInfo blk
BlockInfo {
      biHash :: HeaderHash blk
biHash         = blk -> HeaderHash blk
forall b. HasHeader b => b -> HeaderHash b
blockHash     blk
blk
    , biSlotNo :: SlotNo
biSlotNo       = blk -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot     blk
blk
    , biBlockNo :: BlockNo
biBlockNo      = blk -> BlockNo
forall b. HasHeader b => b -> BlockNo
blockNo       blk
blk
    , biIsEBB :: IsEBB
biIsEBB        = blk -> IsEBB
forall blk. GetHeader blk => blk -> IsEBB
blockToIsEBB  blk
blk
    , biPrevHash :: ChainHash blk
biPrevHash     = blk -> ChainHash blk
forall blk. GetPrevHash blk => blk -> ChainHash blk
blockPrevHash blk
blk
    , biHeaderOffset :: Word16
biHeaderOffset = Word16
headerOffset
    , biHeaderSize :: Word16
biHeaderSize   = Word16
headerSize
    }
  where
    BinaryBlockInfo { Word16
headerOffset :: BinaryBlockInfo -> Word16
headerOffset :: Word16
headerOffset, Word16
headerSize :: BinaryBlockInfo -> Word16
headerSize :: Word16
headerSize } = blk -> BinaryBlockInfo
forall blk. HasBinaryBlockInfo blk => blk -> BinaryBlockInfo
getBinaryBlockInfo blk
blk