partner_chains_demo_node/
service.rs

1//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
2
3use crate::data_sources::DataSources;
4use crate::inherent_data::{CreateInherentDataConfig, ProposalCIDP, VerifierCIDP};
5use crate::rpc::GrandpaDeps;
6use authority_selection_inherents::AuthoritySelectionDataSource;
7use partner_chains_data_source_metrics::{McFollowerMetrics, register_metrics_warn_errors};
8use partner_chains_demo_runtime::{self, RuntimeApi, opaque::Block};
9use sc_client_api::BlockBackend;
10use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
11use sc_consensus_grandpa::SharedVoterState;
12pub use sc_executor::WasmExecutor;
13use sc_partner_chains_consensus_aura::import_queue as partner_chains_aura_import_queue;
14use sc_service::{Configuration, TaskManager, WarpSyncConfig, error::Error as ServiceError};
15use sc_telemetry::{Telemetry, TelemetryWorker};
16use sc_transaction_pool_api::OffchainTransactionPoolFactory;
17use sidechain_domain::mainchain_epoch::MainchainEpochConfig;
18use sidechain_mc_hash::McHashInherentDigest;
19use sp_consensus_aura::sr25519::AuthorityPair as AuraPair;
20use sp_partner_chains_consensus_aura::block_proposal::PartnerChainsProposerFactory;
21use sp_runtime::traits::Block as BlockT;
22use std::{sync::Arc, time::Duration};
23use time_source::SystemTimeSource;
24use tokio::task;
25
26type HostFunctions = sp_io::SubstrateHostFunctions;
27
28pub(crate) type FullClient =
29	sc_service::TFullClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
30type FullBackend = sc_service::TFullBackend<Block>;
31type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
32
33/// The minimum period of blocks on which justifications will be
34/// imported and generated.
35const GRANDPA_JUSTIFICATION_PERIOD: u32 = 512;
36
37/// This function provides dependencies of [partner_chains_node_commands::PartnerChainsSubcommand].
38/// It is not mandatory to have such a dedicated function, [new_partial] could be enough,
39/// however using such a specialized function decreases number of possible failures and wiring time.
40pub fn new_pc_command_deps(
41	config: &Configuration,
42) -> Result<
43	(Arc<FullClient>, TaskManager, Arc<dyn AuthoritySelectionDataSource + Send + Sync>),
44	ServiceError,
45> {
46	let data_sources = task::block_in_place(|| {
47		config
48			.tokio_handle
49			.block_on(crate::data_sources::create_cached_data_sources(None))
50	})?;
51	let executor = sc_service::new_wasm_executor(&config.executor);
52	let (client, _, _, task_manager) =
53		sc_service::new_full_parts::<Block, RuntimeApi, _>(config, None, executor)?;
54	let client = Arc::new(client);
55	Ok((client, task_manager, data_sources.authority_selection))
56}
57
58#[allow(clippy::type_complexity)]
59pub fn new_partial(
60	config: &Configuration,
61) -> Result<
62	sc_service::PartialComponents<
63		FullClient,
64		FullBackend,
65		FullSelectChain,
66		sc_consensus::DefaultImportQueue<Block>,
67		sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
68		(
69			sc_consensus_grandpa::GrandpaBlockImport<
70				FullBackend,
71				Block,
72				FullClient,
73				FullSelectChain,
74			>,
75			sc_consensus_grandpa::LinkHalf<Block, FullClient, FullSelectChain>,
76			Option<Telemetry>,
77			DataSources,
78			Option<McFollowerMetrics>,
79		),
80	>,
81	ServiceError,
82> {
83	let mc_follower_metrics = register_metrics_warn_errors(config.prometheus_registry());
84	let data_sources = task::block_in_place(|| {
85		config
86			.tokio_handle
87			.block_on(crate::data_sources::create_cached_data_sources(mc_follower_metrics.clone()))
88	})?;
89
90	let telemetry = config
91		.telemetry_endpoints
92		.clone()
93		.filter(|x| !x.is_empty())
94		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
95			let worker = TelemetryWorker::new(16)?;
96			let telemetry = worker.handle().new_telemetry(endpoints);
97			Ok((worker, telemetry))
98		})
99		.transpose()?;
100
101	let executor = sc_service::new_wasm_executor(&config.executor);
102
103	let (client, backend, keystore_container, task_manager) =
104		sc_service::new_full_parts::<Block, RuntimeApi, _>(
105			config,
106			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
107			executor,
108		)?;
109	let client = Arc::new(client);
110
111	let telemetry = telemetry.map(|(worker, telemetry)| {
112		task_manager.spawn_handle().spawn("telemetry", None, worker.run());
113		telemetry
114	});
115
116	let select_chain = sc_consensus::LongestChain::new(backend.clone());
117
118	let transaction_pool = Arc::from(
119		sc_transaction_pool::Builder::new(
120			task_manager.spawn_essential_handle(),
121			client.clone(),
122			config.role.is_authority().into(),
123		)
124		.with_options(config.transaction_pool.clone())
125		.with_prometheus(config.prometheus_registry())
126		.build(),
127	);
128
129	let (grandpa_block_import, grandpa_link) = sc_consensus_grandpa::block_import(
130		client.clone(),
131		GRANDPA_JUSTIFICATION_PERIOD,
132		&client,
133		select_chain.clone(),
134		telemetry.as_ref().map(|x| x.handle()),
135	)?;
136
137	let sc_slot_config = sidechain_slots::runtime_api_client::slot_config(&*client)
138		.map_err(sp_blockchain::Error::from)?;
139
140	let time_source = Arc::new(SystemTimeSource);
141	let epoch_config = MainchainEpochConfig::read_from_env()
142		.map_err(|err| ServiceError::Application(err.into()))?;
143	let inherent_config = CreateInherentDataConfig::new(epoch_config, sc_slot_config, time_source);
144
145	let import_queue = partner_chains_aura_import_queue::import_queue::<
146		AuraPair,
147		_,
148		_,
149		_,
150		_,
151		_,
152		McHashInherentDigest,
153	>(ImportQueueParams {
154		block_import: grandpa_block_import.clone(),
155		justification_import: Some(Box::new(grandpa_block_import.clone())),
156		client: client.clone(),
157		create_inherent_data_providers: VerifierCIDP::new(
158			inherent_config,
159			client.clone(),
160			data_sources.mc_hash.clone(),
161			data_sources.authority_selection.clone(),
162			data_sources.block_participation.clone(),
163			data_sources.governed_map.clone(),
164			data_sources.bridge.clone(),
165		),
166		spawner: &task_manager.spawn_essential_handle(),
167		registry: config.prometheus_registry(),
168		check_for_equivocation: Default::default(),
169		telemetry: telemetry.as_ref().map(|x| x.handle()),
170		compatibility_mode: Default::default(),
171	})?;
172
173	Ok(sc_service::PartialComponents {
174		client,
175		backend,
176		task_manager,
177		import_queue,
178		keystore_container,
179		select_chain,
180		transaction_pool,
181		other: (grandpa_block_import, grandpa_link, telemetry, data_sources, mc_follower_metrics),
182	})
183}
184
185pub async fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
186	let task_manager = match config.network.network_backend {
187		sc_network::config::NetworkBackendType::Libp2p => {
188			new_full_base::<sc_network::NetworkWorker<_, _>>(config).await?
189		},
190		sc_network::config::NetworkBackendType::Litep2p => {
191			new_full_base::<sc_network::Litep2pNetworkBackend>(config).await?
192		},
193	};
194
195	Ok(task_manager)
196}
197
198pub async fn new_full_base<Network: sc_network::NetworkBackend<Block, <Block as BlockT>::Hash>>(
199	config: Configuration,
200) -> Result<TaskManager, ServiceError> {
201	if let Some(git_hash) = std::option_env!("EARTHLY_GIT_HASH") {
202		log::info!("🌱 Running version: {}", git_hash);
203	}
204
205	let sc_service::PartialComponents {
206		client,
207		backend,
208		mut task_manager,
209		import_queue,
210		keystore_container,
211		select_chain,
212		transaction_pool,
213		other: (block_import, grandpa_link, mut telemetry, data_sources, _),
214	} = new_partial(&config)?;
215
216	let metrics = Network::register_notification_metrics(config.prometheus_registry());
217	let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(
218		&config.network,
219		config.prometheus_registry().cloned(),
220	);
221
222	let grandpa_protocol_name = sc_consensus_grandpa::protocol_standard_name(
223		&client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"),
224		&config.chain_spec,
225	);
226	let peer_store_handle = net_config.peer_store_handle();
227	let (grandpa_protocol_config, grandpa_notification_service) =
228		sc_consensus_grandpa::grandpa_peers_set_config::<_, Network>(
229			grandpa_protocol_name.clone(),
230			metrics.clone(),
231			Arc::clone(&peer_store_handle),
232		);
233	net_config.add_notification_protocol(grandpa_protocol_config);
234
235	let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new(
236		backend.clone(),
237		grandpa_link.shared_authority_set().clone(),
238		Vec::default(),
239	));
240
241	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
242		sc_service::build_network(sc_service::BuildNetworkParams {
243			config: &config,
244			net_config,
245			client: client.clone(),
246			transaction_pool: transaction_pool.clone(),
247			spawn_handle: task_manager.spawn_handle(),
248			import_queue,
249			block_announce_validator_builder: None,
250			warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
251			block_relay: None,
252			metrics,
253		})?;
254
255	let role = config.role;
256	let force_authoring = config.force_authoring;
257	let backoff_authoring_blocks: Option<()> = None;
258	let name = config.network.node_name.clone();
259	let enable_grandpa = !config.disable_grandpa;
260	let prometheus_registry = config.prometheus_registry().cloned();
261	let shared_voter_state = SharedVoterState::empty();
262
263	let rpc_extensions_builder = {
264		let client = client.clone();
265		let pool = transaction_pool.clone();
266		let backend = backend.clone();
267		let shared_voter_state = shared_voter_state.clone();
268		let shared_authority_set = grandpa_link.shared_authority_set().clone();
269		let justification_stream = grandpa_link.justification_stream();
270		let data_sources = data_sources.clone();
271
272		move |subscription_executor| {
273			let grandpa = GrandpaDeps {
274				shared_voter_state: shared_voter_state.clone(),
275				shared_authority_set: shared_authority_set.clone(),
276				justification_stream: justification_stream.clone(),
277				subscription_executor,
278				finality_provider: sc_consensus_grandpa::FinalityProofProvider::new_for_service(
279					backend.clone(),
280					Some(shared_authority_set.clone()),
281				),
282			};
283			let deps = crate::rpc::FullDeps {
284				client: client.clone(),
285				pool: pool.clone(),
286				grandpa,
287				data_sources: data_sources.clone(),
288				time_source: Arc::new(SystemTimeSource),
289			};
290			crate::rpc::create_full(deps).map_err(Into::into)
291		}
292	};
293
294	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
295		network: network.clone(),
296		client: client.clone(),
297		keystore: keystore_container.keystore(),
298		task_manager: &mut task_manager,
299		transaction_pool: transaction_pool.clone(),
300		rpc_builder: Box::new(rpc_extensions_builder),
301		backend,
302		system_rpc_tx,
303		tx_handler_controller,
304		sync_service: sync_service.clone(),
305		config,
306		telemetry: telemetry.as_mut(),
307	})?;
308
309	if role.is_authority() {
310		let basic_authorship_proposer_factory = sc_basic_authorship::ProposerFactory::new(
311			task_manager.spawn_handle(),
312			client.clone(),
313			transaction_pool.clone(),
314			prometheus_registry.as_ref(),
315			telemetry.as_ref().map(|x| x.handle()),
316		);
317		let proposer_factory: PartnerChainsProposerFactory<_, _, McHashInherentDigest> =
318			PartnerChainsProposerFactory::new(basic_authorship_proposer_factory);
319
320		let sc_slot_config = sidechain_slots::runtime_api_client::slot_config(&*client)
321			.map_err(sp_blockchain::Error::from)?;
322		let time_source = Arc::new(SystemTimeSource);
323		let mc_epoch_config = MainchainEpochConfig::read_from_env()
324			.map_err(|err| ServiceError::Application(err.into()))?;
325		let inherent_config =
326			CreateInherentDataConfig::new(mc_epoch_config, sc_slot_config.clone(), time_source);
327		let aura = sc_partner_chains_consensus_aura::start_aura::<
328			AuraPair,
329			_,
330			_,
331			_,
332			_,
333			_,
334			_,
335			_,
336			_,
337			_,
338			_,
339			McHashInherentDigest,
340		>(StartAuraParams {
341			slot_duration: sc_slot_config.slot_duration,
342			client: client.clone(),
343			select_chain,
344			block_import,
345			proposer_factory,
346			create_inherent_data_providers: ProposalCIDP::new(
347				inherent_config,
348				client.clone(),
349				data_sources.mc_hash.clone(),
350				data_sources.authority_selection.clone(),
351				data_sources.block_participation,
352				data_sources.governed_map,
353				data_sources.bridge.clone(),
354			),
355			force_authoring,
356			backoff_authoring_blocks,
357			keystore: keystore_container.keystore(),
358			sync_oracle: sync_service.clone(),
359			justification_sync_link: sync_service.clone(),
360			block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32),
361			max_block_proposal_slot_portion: None,
362			telemetry: telemetry.as_ref().map(|x| x.handle()),
363			compatibility_mode: Default::default(),
364		})?;
365
366		// the AURA authoring task is considered essential, i.e. if it
367		// fails we take down the service with it.
368		task_manager
369			.spawn_essential_handle()
370			.spawn_blocking("aura", Some("block-authoring"), aura);
371	}
372
373	if enable_grandpa {
374		// if the node isn't actively participating in consensus then it doesn't
375		// need a keystore, regardless of which protocol we use below.
376		let keystore = if role.is_authority() { Some(keystore_container.keystore()) } else { None };
377
378		let grandpa_config = sc_consensus_grandpa::Config {
379			// FIXME #1578 make this available through chainspec
380			gossip_duration: Duration::from_millis(333),
381			justification_generation_period: GRANDPA_JUSTIFICATION_PERIOD,
382			name: Some(name),
383			observer_enabled: false,
384			keystore,
385			local_role: role,
386			telemetry: telemetry.as_ref().map(|x| x.handle()),
387			protocol_name: grandpa_protocol_name,
388		};
389
390		// start the full GRANDPA voter
391		// NOTE: non-authorities could run the GRANDPA observer protocol, but at
392		// this point the full voter should provide better guarantees of block
393		// and vote data availability than the observer. The observer has not
394		// been tested extensively yet and having most nodes in a network run it
395		// could lead to finality stalls.
396		let grandpa_config = sc_consensus_grandpa::GrandpaParams {
397			config: grandpa_config,
398			link: grandpa_link,
399			network,
400			sync: Arc::new(sync_service),
401			notification_service: grandpa_notification_service,
402			voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(),
403			prometheus_registry,
404			shared_voter_state,
405			telemetry: telemetry.as_ref().map(|x| x.handle()),
406			offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool),
407		};
408
409		// the GRANDPA voter task is considered infallible, i.e.
410		// if it fails we take down the service with it.
411		task_manager.spawn_essential_handle().spawn_blocking(
412			"grandpa-voter",
413			None,
414			sc_consensus_grandpa::run_grandpa_voter(grandpa_config)?,
415		);
416	}
417
418	Ok(task_manager)
419}