1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use super::{Gossip, Gossips, P2pTopology, Peer};
use crate::{
    intercom::{NetworkMsg, PropagateMsg, TopologyMsg},
    metrics::Metrics,
    settings::start::network::Configuration,
    utils::async_msg::{MessageBox, MessageQueue},
};
use std::time::Duration;
use tokio::time::{Instant, Interval, MissedTickBehavior};
use tokio_stream::StreamExt;

pub const DEFAULT_NETWORK_STUCK_INTERVAL: Duration = Duration::from_secs(60 * 5); // 5 min
const QUARANTINE_CHECK: Duration = Duration::from_secs(60);
const MAX_GOSSIP_SIZE: usize = 10;

struct Process {
    input: MessageQueue<TopologyMsg>,
    network_msgbox: MessageBox<NetworkMsg>,
    gossip_interval: Interval,
    network_stuck_check: Duration,
    topology: P2pTopology,
}

pub struct TaskData {
    pub network_msgbox: MessageBox<NetworkMsg>,
    pub topology_queue: MessageQueue<TopologyMsg>,
    pub initial_peers: Vec<Peer>,
    pub config: Configuration,
    pub stats_counter: Metrics,
}

pub async fn start(task_data: TaskData) {
    let TaskData {
        network_msgbox,
        topology_queue,
        initial_peers,
        config,
        stats_counter,
    } = task_data;

    let mut topology = P2pTopology::new(&config, stats_counter);

    topology.accept_gossips(Gossips::from(
        initial_peers
            .into_iter()
            .map(Gossip::from)
            .collect::<Vec<_>>(),
    ));

    let mut gossip_interval = tokio::time::interval(config.gossip_interval);
    gossip_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

    let mut process = Process {
        input: topology_queue,
        gossip_interval,
        network_stuck_check: config.network_stuck_check,
        network_msgbox,
        topology,
    };
    process.handle_input().await;
}

impl Process {
    async fn handle_input(&mut self) {
        let mut last_update = Instant::now();
        let mut quarantine_check = tokio::time::interval(QUARANTINE_CHECK);

        loop {
            tokio::select! {
                Some(input) = self.input.next() => {
                    tracing::trace!("handling new topology task item");
                    match input {
                        TopologyMsg::AcceptGossip(gossip) => {
                            self.topology.accept_gossips(gossip);
                            last_update = Instant::now();
                        },
                        TopologyMsg::DemotePeer(id) => self.topology.report_node(&id),
                        TopologyMsg::PromotePeer(id) => self.topology.promote_node(&id),
                        TopologyMsg::View(selection, handle) => {
                            handle.reply_ok(self.topology.view(selection))
                        }
                        TopologyMsg::ListAvailable(handle) => {
                            handle.reply_ok(self.topology.list_available().map(Into::into).collect::<Vec<_>>())
                        }
                        TopologyMsg::ListNonPublic(handle) => {
                            handle.reply_ok(self.topology.list_non_public().map(Into::into).collect::<Vec<_>>())
                        }
                        TopologyMsg::ListQuarantined(handle) => {
                            handle.reply_ok(self.topology.list_quarantined())
                        }
                    }
                    tracing::trace!("item handling finished");
                },
                _ = self.gossip_interval.tick() => {
                        let span = tracing::debug_span!("generating_gossip", task = "topology");
                        let _guard = span.enter();
                        self.topology.update_gossip();
                        let view = self.topology.view(poldercast::layer::Selection::Any);
                        if view.peers.is_empty() {
                            tracing::warn!("no peers to gossip with found, check your connection");
                        }
                        tracing::trace!("gossiping with peers");
                        self.send_gossip_messages(view.peers)
                    }
                _ = quarantine_check.tick() => {
                    let span = tracing::debug_span!("quarantine_check", task = "topology");
                    let _guard = span.enter();
                    // Even if lifted from quarantine, peers will be re-added to the topology
                    // only after we receive a gossip about them.
                    let mut nodes_to_contact = self.topology.lift_reports();

                    // If we did not receive any incoming gossip recently let's try to contact known (but not active) nodes.
                    if last_update.elapsed() > self.network_stuck_check {
                        last_update = Instant::now();
                        tracing::warn!("p2p network have been too quiet for some time, will try to contanct known nodes");
                        nodes_to_contact.extend(
                            self.topology
                                .list_available()
                                .take(MAX_GOSSIP_SIZE.saturating_sub(nodes_to_contact.len()))
                        );
                    }

                    self.send_gossip_messages(nodes_to_contact);
                }
            }
        }
    }

    fn send_gossip_messages(&mut self, peers: Vec<Peer>) {
        for peer in peers {
            let gossip = self.topology.initiate_gossips(&peer.id());
            self.network_msgbox
                // do not block the current thread to avoid deadlocks
                .try_send(NetworkMsg::Propagate(Box::new(PropagateMsg::Gossip(
                    peer, gossip,
                ))))
                .unwrap_or_else(|e| {
                    tracing::error!(
                        reason = ?e,
                        "cannot send PropagateMsg request to network"
                    )
                });
        }
    }
}