1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
use super::{logs::Logs, pool::internal::Pool};
use crate::{
    blockcfg::{ApplyBlockLedger, Contents, ContentsBuilder},
    fragment::{Fragment, FragmentId},
};
use async_trait::async_trait;
use chain_core::property::Serialize;
use futures::{channel::oneshot::Receiver, future::Shared, prelude::*};
use jormungandr_lib::interfaces::{BlockDate, FragmentStatus};
use std::{error::Error, iter};
use tracing::{debug_span, Instrument};

pub enum SelectionOutput {
    Commit { fragment_id: FragmentId },
    RequestSmallerFee,
    RequestSmallerSize,
    Reject { reason: String },
}

#[async_trait]
pub trait FragmentSelectionAlgorithm {
    async fn select(
        &mut self,
        ledger: ApplyBlockLedger,
        logs: &mut Logs,
        pool: &mut Pool,
        soft_deadline_future: futures::channel::oneshot::Receiver<()>,
        hard_deadline_future: futures::channel::oneshot::Receiver<()>,
    ) -> FragmentSelectionResult;
}

pub struct FragmentSelectionResult {
    pub contents: Contents,
    pub ledger: ApplyBlockLedger,
    pub rejected_fragments_cnt: usize,
}

#[derive(Debug)]
pub enum FragmentSelectionAlgorithmParams {
    OldestFirst,
}

pub struct OldestFirst;

impl OldestFirst {
    pub fn new() -> Self {
        OldestFirst
    }
}

impl Default for OldestFirst {
    fn default() -> Self {
        Self::new()
    }
}

enum ApplyFragmentError {
    DoesNotFit,
    SoftDeadlineReached,
    Rejected(String),
}

struct NewLedgerState {
    ledger: ApplyBlockLedger,
    space_left: u32,
}

async fn try_apply_fragment(
    fragment: Fragment,
    ledger: ApplyBlockLedger,
    soft_deadline_future: Shared<Receiver<()>>,
    hard_deadline_future: Shared<Receiver<()>>,
    mut space_left: u32,
) -> Result<NewLedgerState, ApplyFragmentError> {
    use futures::future::{select, Either};

    let raw_fragment_size = fragment.serialized_size();
    let block_content_max_size = ledger.settings().block_content_max_size;
    let fragment_size = match u32::try_from(raw_fragment_size) {
        Ok(size) if size <= block_content_max_size => size,
        _ => {
            let reason = format!(
                "fragment size {} exceeds maximum block content size {}",
                raw_fragment_size, block_content_max_size
            );
            return Err(ApplyFragmentError::Rejected(reason));
        }
    };

    if fragment_size > space_left {
        // return a fragment to the pool later if does not fit the contents size limit
        tracing::trace!("discarding fragment that does not fit in block");
        return Err(ApplyFragmentError::DoesNotFit);
    }

    space_left -= fragment_size;

    tracing::debug!("applying fragment in simulation");

    let fragment_future = tokio::task::spawn_blocking(move || ledger.apply_fragment(&fragment));

    let ledger_res = match select(fragment_future, soft_deadline_future.clone()).await {
        Either::Left((join_result, _)) => join_result.unwrap(),
        Either::Right((_, fragment_future)) => {
            if space_left < block_content_max_size {
                tracing::debug!(
                    "aborting processing of the current fragment to satisfy the soft deadline"
                );
                return Err(ApplyFragmentError::SoftDeadlineReached);
            }

            tracing::debug!(
                "only one fragment in progress: continuing until meeting the hard deadline"
            );

            match select(fragment_future, hard_deadline_future.clone()).await {
                Either::Left((join_result, _)) => join_result.unwrap(),
                Either::Right(_) => return Err(ApplyFragmentError::Rejected(
                    "cannot process a single fragment within the given time bounds (hard deadline)"
                        .into(),
                )),
            }
        }
    };

    match ledger_res {
        Ok(ledger) => Ok(NewLedgerState { ledger, space_left }),
        Err(err) => {
            let mut msg = err.to_string();
            for e in iter::successors(err.source(), |&e| e.source()) {
                msg.push_str(": ");
                msg.push_str(&e.to_string());
            }
            Err(ApplyFragmentError::Rejected(msg))
        }
    }
}

#[async_trait]
impl FragmentSelectionAlgorithm for OldestFirst {
    async fn select(
        &mut self,
        mut ledger: ApplyBlockLedger,
        logs: &mut Logs,
        pool: &mut Pool,
        soft_deadline_future: futures::channel::oneshot::Receiver<()>,
        hard_deadline_future: futures::channel::oneshot::Receiver<()>,
    ) -> FragmentSelectionResult {
        let date: BlockDate = ledger.block_date().into();
        let mut space_left = ledger.settings().block_content_max_size;
        let mut contents_builder = ContentsBuilder::new();
        let mut return_to_pool = Vec::new();
        let mut rejected_fragments_cnt = 0;

        let soft_deadline_future = soft_deadline_future.shared();
        let hard_deadline_future = hard_deadline_future.shared();
        while let Some((fragment, id)) = pool.remove_oldest() {
            let span = debug_span!("fragment", hash=%id.to_string());

            async {
                let result = try_apply_fragment(
                    fragment.clone(),
                    ledger.clone(),
                    soft_deadline_future.clone(),
                    hard_deadline_future.clone(),
                    space_left,
                )
                .await;
                match result {
                    Ok(NewLedgerState {
                        ledger: ledger_new,
                        space_left: space_left_new,
                    }) => {
                        contents_builder.push(fragment);
                        ledger = ledger_new;
                        tracing::debug!("successfully applied and committed the fragment");
                        space_left = space_left_new;
                    }
                    Err(ApplyFragmentError::DoesNotFit)
                    | Err(ApplyFragmentError::SoftDeadlineReached) => {
                        return_to_pool.push((fragment, id));
                    }
                    Err(ApplyFragmentError::Rejected(reason)) => {
                        tracing::debug!(%reason, "fragment is rejected");
                        logs.modify(id, FragmentStatus::Rejected { reason }, date);
                        rejected_fragments_cnt += 1;
                    }
                }
            }
            .instrument(span)
            .await;

            if space_left == 0 {
                tracing::debug!("block has reached max total size, exiting");
                break;
            }
        }

        tracing::debug!(
            "finished block creation with {} fragments left in the pool",
            pool.len()
        );
        pool.insert_all(return_to_pool);

        FragmentSelectionResult {
            contents: contents_builder.into(),
            ledger,
            rejected_fragments_cnt,
        }
    }
}