{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeFamilies #-} module Ouroboros.Network.BlockFetch.DeltaQ ( GSV , Distribution , DeltaQ , PeerGSV (..) , SizeInBytes , PeerFetchInFlightLimits (..) , calculatePeerFetchInFlightLimits , estimateResponseDeadlineProbability , estimateExpectedResponseDuration , comparePeerGSV , comparePeerGSV' ) where import Control.Monad.Class.MonadTime import Data.Fixed as Fixed (Pico) import Data.Hashable import Data.Set (Set) import qualified Data.Set as Set import Ouroboros.Network.DeltaQ data PeerFetchInFlightLimits = PeerFetchInFlightLimits { PeerFetchInFlightLimits -> SizeInBytes inFlightBytesHighWatermark :: SizeInBytes, PeerFetchInFlightLimits -> SizeInBytes inFlightBytesLowWatermark :: SizeInBytes } deriving Int -> PeerFetchInFlightLimits -> ShowS [PeerFetchInFlightLimits] -> ShowS PeerFetchInFlightLimits -> String (Int -> PeerFetchInFlightLimits -> ShowS) -> (PeerFetchInFlightLimits -> String) -> ([PeerFetchInFlightLimits] -> ShowS) -> Show PeerFetchInFlightLimits forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a showList :: [PeerFetchInFlightLimits] -> ShowS $cshowList :: [PeerFetchInFlightLimits] -> ShowS show :: PeerFetchInFlightLimits -> String $cshow :: PeerFetchInFlightLimits -> String showsPrec :: Int -> PeerFetchInFlightLimits -> ShowS $cshowsPrec :: Int -> PeerFetchInFlightLimits -> ShowS Show -- | Order two PeerGSVs based on `g`. -- Incase the g values are within +/- 5% of each other `peer` is used as a tie breaker. -- The salt is unique per running node, which avoids all nodes prefering the same peer in case of -- a tie. comparePeerGSV :: forall peer. ( Hashable peer , Ord peer ) => Set peer -> Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering comparePeerGSV :: Set peer -> Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering comparePeerGSV Set peer activePeers Int salt (PeerGSV a, peer a_p) (PeerGSV b, peer b_p) = let gs_a :: DiffTime gs_a = if peer -> Bool isActive peer a_p then DiffTime activeAdvantage DiffTime -> DiffTime -> DiffTime forall a. Num a => a -> a -> a * PeerGSV -> DiffTime gs PeerGSV a else PeerGSV -> DiffTime gs PeerGSV a gs_b :: DiffTime gs_b = if peer -> Bool isActive peer b_p then DiffTime activeAdvantage DiffTime -> DiffTime -> DiffTime forall a. Num a => a -> a -> a * PeerGSV -> DiffTime gs PeerGSV b else PeerGSV -> DiffTime gs PeerGSV b in if DiffTime -> DiffTime forall a. Num a => a -> a abs (DiffTime gs_a DiffTime -> DiffTime -> DiffTime forall a. Num a => a -> a -> a - DiffTime gs_b) DiffTime -> DiffTime -> Bool forall a. Ord a => a -> a -> Bool < DiffTime 0.05 DiffTime -> DiffTime -> DiffTime forall a. Num a => a -> a -> a * DiffTime -> DiffTime -> DiffTime forall a. Ord a => a -> a -> a max DiffTime gs_a DiffTime gs_b then Int -> Int -> Ordering forall a. Ord a => a -> a -> Ordering compare (Int -> peer -> Int forall a. Hashable a => Int -> a -> Int hashWithSalt Int salt peer a_p) (Int -> peer -> Int forall a. Hashable a => Int -> a -> Int hashWithSalt Int salt peer b_p) else DiffTime -> DiffTime -> Ordering forall a. Ord a => a -> a -> Ordering compare DiffTime gs_a DiffTime gs_b where -- In order to avoid switching between peers with similar g we give -- active peers a slight advantage. activeAdvantage :: DiffTime activeAdvantage :: DiffTime activeAdvantage = DiffTime 0.8 isActive :: peer -> Bool isActive :: peer -> Bool isActive peer p = peer -> Set peer -> Bool forall a. Ord a => a -> Set a -> Bool Set.member peer p Set peer activePeers gs :: PeerGSV -> DiffTime gs :: PeerGSV -> DiffTime gs PeerGSV { outboundGSV :: PeerGSV -> GSV outboundGSV = GSV DiffTime g_out SizeInBytes -> DiffTime _s_out Distribution DiffTime _v_out, inboundGSV :: PeerGSV -> GSV inboundGSV = GSV DiffTime g_in SizeInBytes -> DiffTime _s_in Distribution DiffTime _v_in } = DiffTime g_out DiffTime -> DiffTime -> DiffTime forall a. Num a => a -> a -> a + DiffTime g_in -- | Order two PeerGSVs based on `g`. -- Like comparePeerGSV but doesn't take active status into account comparePeerGSV' :: forall peer. ( Hashable peer , Ord peer ) => Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering comparePeerGSV' :: Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering comparePeerGSV' = Set peer -> Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering forall peer. (Hashable peer, Ord peer) => Set peer -> Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering comparePeerGSV Set peer forall a. Set a Set.empty calculatePeerFetchInFlightLimits :: PeerGSV -> PeerFetchInFlightLimits calculatePeerFetchInFlightLimits :: PeerGSV -> PeerFetchInFlightLimits calculatePeerFetchInFlightLimits PeerGSV { outboundGSV :: PeerGSV -> GSV outboundGSV = GSV DiffTime g_out SizeInBytes -> DiffTime _s_out Distribution DiffTime _v_out, inboundGSV :: PeerGSV -> GSV inboundGSV = GSV DiffTime g_in SizeInBytes -> DiffTime s_in Distribution DiffTime _v_in } = PeerFetchInFlightLimits :: SizeInBytes -> SizeInBytes -> PeerFetchInFlightLimits PeerFetchInFlightLimits { SizeInBytes inFlightBytesLowWatermark :: SizeInBytes inFlightBytesLowWatermark :: SizeInBytes inFlightBytesLowWatermark, SizeInBytes inFlightBytesHighWatermark :: SizeInBytes inFlightBytesHighWatermark :: SizeInBytes inFlightBytesHighWatermark } where -- To keep the remote end busy at all times, we want to make sure that by -- the time it finishes sending its last response that there's another -- request at the remote end that's ready to go. So we must initiate another -- request g_out seconds before the remote end becomes idle. -- -- Now it turns out to be more convenient to measure this not in time, but -- based on the number of bytes of requests that are in-flight. This of -- course directly corresponds to time, via S_in. -- -- The units of S_in is seconds / octet. We need to calculate the number -- of bytes that can be in flight during a time interval t. So we need -- octets / second * seconds-in-flight = octets. -- -- > (1/s_in) * t or equivalently t/s_in -- -- So for a remote peer, g_out seconds before it becomes idle, it will have -- \( g_in\/s_in \) bytes inbound. Our request will arrive after g_out -- seconds, we should request at minimum \( g_in\/s_in \) bytes. -- -- We should also account for the fact that we do not have perfect -- scheduling and cannot initiate the request at exactly the right time, so -- we should request it slightly early (and thus request correspondingly -- more). Lets say our maximum schedule delay is @d@ seconds. -- inFlightBytesLowWatermark :: SizeInBytes inFlightBytesLowWatermark = SizeInBytes -> SizeInBytes -> SizeInBytes forall a. Ord a => a -> a -> a max SizeInBytes minLowWaterMark (Pico -> SizeInBytes forall a b. (RealFrac a, Integral b) => a -> b ceiling (DiffTime -> Pico seconds (DiffTime g_out DiffTime -> DiffTime -> DiffTime forall a. Num a => a -> a -> a + DiffTime g_in DiffTime -> DiffTime -> DiffTime forall a. Num a => a -> a -> a + DiffTime d) Pico -> Pico -> Pico forall a. Fractional a => a -> a -> a / DiffTime -> Pico seconds (SizeInBytes -> DiffTime s_in SizeInBytes 1))) where -- To ensure that blockfetch can do pipelining we enforce a minimal -- low water mark of at least 3 64k blocks minLowWaterMark :: SizeInBytes minLowWaterMark :: SizeInBytes minLowWaterMark = SizeInBytes 3 SizeInBytes -> SizeInBytes -> SizeInBytes forall a. Num a => a -> a -> a * SizeInBytes 64 SizeInBytes -> SizeInBytes -> SizeInBytes forall a. Num a => a -> a -> a * SizeInBytes 1024 seconds :: DiffTime -> Fixed.Pico seconds :: DiffTime -> Pico seconds = DiffTime -> Pico forall a b. (Real a, Fractional b) => a -> b realToFrac --FIXME: s is now a function of bytes, not unit seconds / octet d :: DiffTime d = DiffTime 2e-2 -- 20 milliseconds, we desire to make a new descison every 10ms. -- This gives us some margin. -- But note that the minimum here is based on the assumption that we can -- react as the /leading/ edge of the low watermark arrives, but in fact -- we can only react when the /trailing/ edge arrives. So when we -- The high watermark is a bit arbitrary. It's just about making sure we -- have a bit of a buffer so we can ask for more in one go, rather than -- asking for lots of small requests very frequently. inFlightBytesHighWatermark :: SizeInBytes inFlightBytesHighWatermark = SizeInBytes inFlightBytesLowWatermark SizeInBytes -> SizeInBytes -> SizeInBytes forall a. Num a => a -> a -> a * SizeInBytes 2 -- | Given the 'PeerGSV', the bytes already in flight and the size of new -- blocks to download, estimate the probability of the download completing -- within the deadline. -- -- This is an appropriate estimator to use in a situation where meeting a -- known deadline is the goal. -- estimateResponseDeadlineProbability :: PeerGSV -> SizeInBytes -> SizeInBytes -> DiffTime -> Double estimateResponseDeadlineProbability :: PeerGSV -> SizeInBytes -> SizeInBytes -> DiffTime -> Double estimateResponseDeadlineProbability PeerGSV{GSV outboundGSV :: GSV outboundGSV :: PeerGSV -> GSV outboundGSV, GSV inboundGSV :: GSV inboundGSV :: PeerGSV -> GSV inboundGSV} SizeInBytes bytesInFlight SizeInBytes bytesRequested DiffTime deadline = DiffTime -> DeltaQ -> Double deltaqProbabilityMassBeforeDeadline DiffTime deadline (DeltaQ -> Double) -> DeltaQ -> Double forall a b. (a -> b) -> a -> b $ GSV -> SizeInBytes -> DeltaQ gsvTrailingEdgeArrive GSV outboundGSV SizeInBytes reqSize DeltaQ -> DeltaQ -> DeltaQ forall a. Semigroup a => a -> a -> a <> GSV -> SizeInBytes -> DeltaQ gsvTrailingEdgeArrive GSV inboundGSV SizeInBytes respSize where reqSize :: SizeInBytes reqSize = SizeInBytes 100 -- TODO not exact, but it's small respSize :: SizeInBytes respSize = SizeInBytes bytesInFlight SizeInBytes -> SizeInBytes -> SizeInBytes forall a. Num a => a -> a -> a + SizeInBytes bytesRequested -- | Given the 'PeerGSV', the bytes already in flight and the size of new -- blocks to download, estimate the expected (mean) time to complete the -- download. -- -- This is an appropriate estimator to use when trying to minimising the -- expected overall download time case in the long run (rather than optimising -- for the worst case in the short term). -- estimateExpectedResponseDuration :: PeerGSV -> SizeInBytes -- ^ Request size -> SizeInBytes -- ^ Expected response size -> DiffTime estimateExpectedResponseDuration :: PeerGSV -> SizeInBytes -> SizeInBytes -> DiffTime estimateExpectedResponseDuration PeerGSV{GSV outboundGSV :: GSV outboundGSV :: PeerGSV -> GSV outboundGSV, GSV inboundGSV :: GSV inboundGSV :: PeerGSV -> GSV inboundGSV} SizeInBytes bytesInFlight SizeInBytes bytesRequested = DeltaQ -> DiffTime deltaqQ50thPercentile (DeltaQ -> DiffTime) -> DeltaQ -> DiffTime forall a b. (a -> b) -> a -> b $ GSV -> SizeInBytes -> DeltaQ gsvTrailingEdgeArrive GSV outboundGSV SizeInBytes reqSize DeltaQ -> DeltaQ -> DeltaQ forall a. Semigroup a => a -> a -> a <> GSV -> SizeInBytes -> DeltaQ gsvTrailingEdgeArrive GSV inboundGSV SizeInBytes respSize where reqSize :: SizeInBytes reqSize = SizeInBytes 100 -- TODO not exact, but it's small respSize :: SizeInBytes respSize = SizeInBytes bytesInFlight SizeInBytes -> SizeInBytes -> SizeInBytes forall a. Num a => a -> a -> a + SizeInBytes bytesRequested {- estimateBlockFetchResponse :: PeerGSV -> PeerFetchInFlight header -> [SizeInBytes] -> DiffTime estimateBlockFetchResponse gsvs PeerFetchInFlight{peerFetchBytesInFlight} blockSizes = gsvRequestResponseDuration gsvs reqSize respSize where reqSize = 100 -- not exact, but it's small respSize = peerFetchBytesInFlight + sum blockSizes -- | The /trailing/ edge arrival schedule for a bunch of blocks. -- blockArrivalShedule :: PeerGSV -> PeerFetchInFlight header -> [SizeInBytes] -> [DiffTime] blockArrivalShedule gsvs PeerFetchInFlight{peerFetchBytesInFlight} blockSizes = [ gsvRequestResponseDuration gsvs reqSize respSize | respSize <- cumulativeSumFrom peerFetchBytesInFlight blockSizes ] where reqSize = 100 -- not exact, but it's small cumulativeSumFrom n = tail . scanl (+) n -}