1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use super::{JormungandrServerImpl, MockController, MockLogger, MockServerData, ProtocolVersion};
use crate::jormungandr::{configuration::get_available_port, grpc::server::NodeServer};
use chain_core::property::Serialize;
use chain_impl_mockchain::{block::Block, key::Hash};
use chain_storage::BlockStore;
use futures::FutureExt;
use std::{
    io::{Result, Write},
    net::SocketAddr,
    sync::{
        mpsc::{sync_channel, SyncSender},
        Arc, RwLock,
    },
};
use tokio::sync::oneshot;
use tonic::transport::Server;

pub struct MockBuilder {
    mock_port: Option<u16>,
    genesis_block: Option<Block>,
    protocol_version: ProtocolVersion,
    invalid_block0_hash: bool,
}

impl Default for MockBuilder {
    fn default() -> Self {
        Self {
            mock_port: None,
            genesis_block: None,
            protocol_version: ProtocolVersion::GenesisPraos,
            invalid_block0_hash: false,
        }
    }
}

impl MockBuilder {
    pub fn with_port(&mut self, mock_port: u16) -> &mut Self {
        self.mock_port = Some(mock_port);
        self
    }

    pub fn with_genesis_block(&mut self, block: Block) -> &mut Self {
        self.genesis_block = Some(block);
        self
    }

    pub fn with_protocol_version(&mut self, protocol_version: ProtocolVersion) -> &mut Self {
        self.protocol_version = protocol_version;
        self
    }

    pub fn with_invalid_block0_hash(&mut self, invalid_block0_hash: bool) -> &mut Self {
        self.invalid_block0_hash = invalid_block0_hash;
        self
    }

    pub fn build_data(&self) -> Arc<RwLock<MockServerData>> {
        let storage = BlockStore::memory(Hash::zero_hash().as_bytes().to_owned()).unwrap();
        let block0 = if let Some(block) = self.genesis_block.clone().take() {
            block
        } else {
            // Block contents do not really matter.
            // A full block is used just to make the storage consistent and reuse code
            super::data::block0()
        };

        let data = MockServerData::new(
            block0.header().hash(),
            self.protocol_version.clone(),
            format!(
                "127.0.0.1:{}",
                self.mock_port.unwrap_or_else(get_available_port)
            )
            .parse::<SocketAddr>()
            .unwrap(),
            storage,
            self.invalid_block0_hash,
        );

        data.put_block(&block0).unwrap();
        data.set_tip(block0.header().hash().serialize_as_vec().unwrap().as_ref())
            .unwrap();

        Arc::new(RwLock::new(data))
    }

    pub fn build(&self) -> MockController {
        let data = self.build_data();
        start_thread(data)
    }
}

struct ChannelWriter(SyncSender<Vec<u8>>);

impl Write for ChannelWriter {
    fn write(&mut self, buf: &[u8]) -> Result<usize> {
        self.0
            .try_send(buf.to_vec())
            .expect("receiver hanged up or channel is full");
        Ok(buf.len())
    }

    fn flush(&mut self) -> Result<()> {
        Ok(())
    }
}

pub fn start_thread(data: Arc<RwLock<MockServerData>>) -> MockController {
    let (tx, rx) = sync_channel(100);
    let logger = MockLogger::new(rx);
    let (shutdown_signal, rx) = oneshot::channel::<()>();
    let data_clone = data.clone();
    let addr = data.read().unwrap().profile().address();

    std::thread::spawn(move || {
        let subscriber = tracing_subscriber::fmt()
            .json()
            .with_writer(move || ChannelWriter(tx.clone()))
            .finish();
        tracing::subscriber::with_default(subscriber, || {
            let rt = tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap();
            rt.block_on(async move {
                let mock = JormungandrServerImpl::new(data_clone);
                Server::builder()
                    .add_service(NodeServer::new(mock))
                    .serve_with_shutdown(addr, rx.map(drop))
                    .await
                    .unwrap();
            })
        });
    });

    MockController::new(logger, shutdown_signal, data, addr.port())
}