1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use super::grpc;
use crate::{
    blockchain::{self, Blockchain, BootstrapError, Error as BlockchainError, Tip},
    network::convert::Decode,
    settings::start::network::Peer,
    topology,
};
use chain_core::property::ReadError;
use chain_network::{data as net_data, error::Error as NetworkError};
use futures::prelude::*;

use std::fmt::Debug;
use tokio_util::sync::CancellationToken;

#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error("failed to connect to bootstrap peer")]
    Connect(#[source] grpc::ConnectError),
    #[error("connection broken")]
    ClientNotReady(#[source] NetworkError),
    #[error("peers not available")]
    PeersNotAvailable(#[source] NetworkError),
    #[error("bootstrap pull request failed")]
    PullRequestFailed(#[source] NetworkError),
    #[error("could not get the blockchain tip from a peer")]
    TipFailed(#[source] NetworkError),
    #[error(transparent)]
    PeerDecodingFailed(NetworkError),
    #[error("decoding of a block failed")]
    BlockDecodingFailed(#[source] ReadError),
    #[error(transparent)]
    Blockchain(#[from] Box<BootstrapError>),
    #[error("failed to collect garbage and flush blocks to the permanent storage")]
    GcFailed(#[source] Box<BlockchainError>),
    #[error("bootstrap pull stream failed")]
    PullStreamFailed(#[source] NetworkError),
    #[error("Trusted peers cannot be empty. To avoid bootstrap use `skip_bootstrap: true`")]
    EmptyTrustedPeers,
    #[error("the bootstrap process was interrupted")]
    Interrupted,
}

const MAX_BOOTSTRAP_PEERS: u32 = 32;

pub async fn peers_from_trusted_peer(peer: &Peer) -> Result<Vec<topology::Peer>, Error> {
    tracing::info!("getting peers from bootstrap peer {}", peer.connection);

    let mut client = grpc::connect(peer).await.map_err(Error::Connect)?;
    let gossip = client
        .peers(MAX_BOOTSTRAP_PEERS)
        .await
        .map_err(Error::PeersNotAvailable)?;
    let peers = gossip
        .nodes
        .decode()
        .map_err(Error::PeerDecodingFailed)?
        .into_iter()
        .map(topology::Peer::from)
        .collect::<Vec<_>>();

    tracing::info!("peer {} : peers known : {}", peer.connection, peers.len());
    Ok(peers)
}

pub async fn bootstrap_from_peer(
    peer: &Peer,
    blockchain: Blockchain,
    tip: Tip,
    cancellation_token: CancellationToken,
) -> Result<(), Error> {
    use chain_network::data::BlockId;

    async fn with_cancellation_token<T>(
        future: impl Future<Output = T> + Unpin,
        token: &CancellationToken,
    ) -> Result<T, Error> {
        use futures::future::{select, Either};

        match select(future, token.cancelled().boxed()).await {
            Either::Left((result, _)) => Ok(result),
            Either::Right(((), _)) => Err(Error::Interrupted),
        }
    }

    tracing::debug!("connecting to bootstrap peer {}", peer.connection);

    let mut client = with_cancellation_token(grpc::connect(peer).boxed(), &cancellation_token)
        .await?
        .map_err(Error::Connect)?;

    loop {
        let remote_tip = with_cancellation_token(client.tip().boxed(), &cancellation_token)
            .await?
            .and_then(|header| header.decode())
            .map_err(Error::TipFailed)?
            .id();

        if remote_tip == tip.get_ref().await.hash() {
            break Ok(());
        }

        let checkpoints = blockchain.get_checkpoints(&tip.branch().await);
        let checkpoints = net_data::block::try_ids_from_iter(checkpoints).unwrap();

        let remote_tip = BlockId::try_from(remote_tip.as_ref()).unwrap();

        tracing::info!(
            "pulling blocks starting from checkpoints: {:?}; to tip {:?}",
            checkpoints,
            remote_tip,
        );

        let stream = with_cancellation_token(
            client.pull_blocks(checkpoints, remote_tip).boxed(),
            &cancellation_token,
        )
        .await?
        .map_err(Error::PullRequestFailed)?;

        blockchain::bootstrap_from_stream(
            blockchain.clone(),
            tip.clone(),
            stream,
            cancellation_token.clone(),
        )
        .await
        .map_err(Box::new)?;
    }
}