partner_chains_demo_node/
service.rs

1//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
2
3use crate::data_sources::DataSources;
4use crate::inherent_data::{CreateInherentDataConfig, ProposalCIDP, VerifierCIDP};
5use crate::rpc::GrandpaDeps;
6use authority_selection_inherents::AuthoritySelectionDataSource;
7use partner_chains_data_source_metrics::{McFollowerMetrics, register_metrics_warn_errors};
8use partner_chains_demo_runtime::{self, RuntimeApi, opaque::Block};
9use sc_client_api::BlockBackend;
10use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
11use sc_consensus_grandpa::SharedVoterState;
12pub use sc_executor::WasmExecutor;
13use sc_partner_chains_consensus_aura::import_queue as partner_chains_aura_import_queue;
14use sc_service::{Configuration, TaskManager, WarpSyncConfig, error::Error as ServiceError};
15use sc_telemetry::{Telemetry, TelemetryWorker};
16use sc_transaction_pool_api::OffchainTransactionPoolFactory;
17use sidechain_domain::mainchain_epoch::MainchainEpochConfig;
18use sidechain_mc_hash::McHashInherentDigest;
19use sp_api::ProvideRuntimeApi;
20use sp_blockchain::HeaderBackend;
21use sp_consensus_aura::AuraApi;
22use sp_consensus_aura::sr25519::AuthorityPair as AuraPair;
23use sp_partner_chains_consensus_aura::block_proposal::PartnerChainsProposerFactory;
24use sp_runtime::traits::Block as BlockT;
25use sp_sidechain::GetEpochDurationApi;
26use std::{sync::Arc, time::Duration};
27use time_source::SystemTimeSource;
28use tokio::task;
29
30type HostFunctions = sp_io::SubstrateHostFunctions;
31
32pub(crate) type FullClient =
33	sc_service::TFullClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
34type FullBackend = sc_service::TFullBackend<Block>;
35type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
36
37/// The minimum period of blocks on which justifications will be
38/// imported and generated.
39const GRANDPA_JUSTIFICATION_PERIOD: u32 = 512;
40
41/// This function provides dependencies of [partner_chains_node_commands::PartnerChainsSubcommand].
42/// It is not mandatory to have such a dedicated function, [new_partial] could be enough,
43/// however using such a specialized function decreases number of possible failures and wiring time.
44pub fn new_pc_command_deps(
45	config: &Configuration,
46) -> Result<
47	(Arc<FullClient>, TaskManager, Arc<dyn AuthoritySelectionDataSource + Send + Sync>),
48	ServiceError,
49> {
50	let data_sources = task::block_in_place(|| {
51		config
52			.tokio_handle
53			.block_on(crate::data_sources::create_cached_data_sources(None))
54	})?;
55	let executor = sc_service::new_wasm_executor(&config.executor);
56	let (client, _, _, task_manager) =
57		sc_service::new_full_parts::<Block, RuntimeApi, _>(config, None, executor)?;
58	let client = Arc::new(client);
59	Ok((client, task_manager, data_sources.authority_selection))
60}
61
62#[allow(clippy::type_complexity)]
63pub fn new_partial(
64	config: &Configuration,
65) -> Result<
66	sc_service::PartialComponents<
67		FullClient,
68		FullBackend,
69		FullSelectChain,
70		sc_consensus::DefaultImportQueue<Block>,
71		sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
72		(
73			sc_consensus_grandpa::GrandpaBlockImport<
74				FullBackend,
75				Block,
76				FullClient,
77				FullSelectChain,
78			>,
79			sc_consensus_grandpa::LinkHalf<Block, FullClient, FullSelectChain>,
80			Option<Telemetry>,
81			DataSources,
82			Option<McFollowerMetrics>,
83			CreateInherentDataConfig,
84		),
85	>,
86	ServiceError,
87> {
88	let mc_follower_metrics = register_metrics_warn_errors(config.prometheus_registry());
89	let data_sources = task::block_in_place(|| {
90		config
91			.tokio_handle
92			.block_on(crate::data_sources::create_cached_data_sources(mc_follower_metrics.clone()))
93	})?;
94
95	let telemetry = config
96		.telemetry_endpoints
97		.clone()
98		.filter(|x| !x.is_empty())
99		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
100			let worker = TelemetryWorker::new(16)?;
101			let telemetry = worker.handle().new_telemetry(endpoints);
102			Ok((worker, telemetry))
103		})
104		.transpose()?;
105
106	let executor = sc_service::new_wasm_executor(&config.executor);
107
108	let (client, backend, keystore_container, task_manager) =
109		sc_service::new_full_parts::<Block, RuntimeApi, _>(
110			config,
111			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
112			executor,
113		)?;
114	let client = Arc::new(client);
115
116	let telemetry = telemetry.map(|(worker, telemetry)| {
117		task_manager.spawn_handle().spawn("telemetry", None, worker.run());
118		telemetry
119	});
120
121	let select_chain = sc_consensus::LongestChain::new(backend.clone());
122
123	let transaction_pool = Arc::from(
124		sc_transaction_pool::Builder::new(
125			task_manager.spawn_essential_handle(),
126			client.clone(),
127			config.role.is_authority().into(),
128		)
129		.with_options(config.transaction_pool.clone())
130		.with_prometheus(config.prometheus_registry())
131		.build(),
132	);
133
134	let (grandpa_block_import, grandpa_link) = sc_consensus_grandpa::block_import(
135		client.clone(),
136		GRANDPA_JUSTIFICATION_PERIOD,
137		&client,
138		select_chain.clone(),
139		telemetry.as_ref().map(|x| x.handle()),
140	)?;
141
142	let slot_duration = client
143		.runtime_api()
144		.slot_duration(client.info().best_hash)
145		.expect("Aura slot duration must be configured in the runtime");
146	let sc_epoch_duration_millis = client
147		.runtime_api()
148		.get_epoch_duration_millis(client.info().best_hash)
149		.expect("Epoch duration must be configured in the runtime");
150
151	let time_source = Arc::new(SystemTimeSource);
152	let epoch_config = MainchainEpochConfig::read_from_env()
153		.map_err(|err| ServiceError::Application(err.into()))?;
154	let inherent_config = CreateInherentDataConfig::new(
155		epoch_config,
156		slot_duration,
157		sc_epoch_duration_millis,
158		time_source,
159	);
160
161	let import_queue = partner_chains_aura_import_queue::import_queue::<
162		AuraPair,
163		_,
164		_,
165		_,
166		_,
167		_,
168		McHashInherentDigest,
169	>(ImportQueueParams {
170		block_import: grandpa_block_import.clone(),
171		justification_import: Some(Box::new(grandpa_block_import.clone())),
172		client: client.clone(),
173		create_inherent_data_providers: VerifierCIDP::new(
174			inherent_config.clone(),
175			client.clone(),
176			data_sources.mc_hash.clone(),
177			data_sources.authority_selection.clone(),
178			data_sources.block_participation.clone(),
179			data_sources.governed_map.clone(),
180			data_sources.bridge.clone(),
181		),
182		spawner: &task_manager.spawn_essential_handle(),
183		registry: config.prometheus_registry(),
184		check_for_equivocation: Default::default(),
185		telemetry: telemetry.as_ref().map(|x| x.handle()),
186		compatibility_mode: Default::default(),
187	})?;
188
189	Ok(sc_service::PartialComponents {
190		client,
191		backend,
192		task_manager,
193		import_queue,
194		keystore_container,
195		select_chain,
196		transaction_pool,
197		other: (
198			grandpa_block_import,
199			grandpa_link,
200			telemetry,
201			data_sources,
202			mc_follower_metrics,
203			inherent_config,
204		),
205	})
206}
207
208pub async fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
209	let task_manager = match config.network.network_backend {
210		sc_network::config::NetworkBackendType::Libp2p => {
211			new_full_base::<sc_network::NetworkWorker<_, _>>(config).await?
212		},
213		sc_network::config::NetworkBackendType::Litep2p => {
214			new_full_base::<sc_network::Litep2pNetworkBackend>(config).await?
215		},
216	};
217
218	Ok(task_manager)
219}
220
221pub async fn new_full_base<Network: sc_network::NetworkBackend<Block, <Block as BlockT>::Hash>>(
222	config: Configuration,
223) -> Result<TaskManager, ServiceError> {
224	if let Some(git_hash) = std::option_env!("EARTHLY_GIT_HASH") {
225		log::info!("🌱 Running version: {}", git_hash);
226	}
227
228	let sc_service::PartialComponents {
229		client,
230		backend,
231		mut task_manager,
232		import_queue,
233		keystore_container,
234		select_chain,
235		transaction_pool,
236		other: (block_import, grandpa_link, mut telemetry, data_sources, _, inherent_config),
237	} = new_partial(&config)?;
238
239	let metrics = Network::register_notification_metrics(config.prometheus_registry());
240	let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(
241		&config.network,
242		config.prometheus_registry().cloned(),
243	);
244
245	let grandpa_protocol_name = sc_consensus_grandpa::protocol_standard_name(
246		&client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"),
247		&config.chain_spec,
248	);
249	let peer_store_handle = net_config.peer_store_handle();
250	let (grandpa_protocol_config, grandpa_notification_service) =
251		sc_consensus_grandpa::grandpa_peers_set_config::<_, Network>(
252			grandpa_protocol_name.clone(),
253			metrics.clone(),
254			Arc::clone(&peer_store_handle),
255		);
256	net_config.add_notification_protocol(grandpa_protocol_config);
257
258	let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new(
259		backend.clone(),
260		grandpa_link.shared_authority_set().clone(),
261		Vec::default(),
262	));
263
264	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
265		sc_service::build_network(sc_service::BuildNetworkParams {
266			config: &config,
267			net_config,
268			client: client.clone(),
269			transaction_pool: transaction_pool.clone(),
270			spawn_handle: task_manager.spawn_handle(),
271			import_queue,
272			block_announce_validator_builder: None,
273			warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
274			block_relay: None,
275			metrics,
276		})?;
277
278	let role = config.role;
279	let force_authoring = config.force_authoring;
280	let backoff_authoring_blocks: Option<()> = None;
281	let name = config.network.node_name.clone();
282	let enable_grandpa = !config.disable_grandpa;
283	let prometheus_registry = config.prometheus_registry().cloned();
284	let shared_voter_state = SharedVoterState::empty();
285
286	let rpc_extensions_builder = {
287		let client = client.clone();
288		let pool = transaction_pool.clone();
289		let backend = backend.clone();
290		let shared_voter_state = shared_voter_state.clone();
291		let shared_authority_set = grandpa_link.shared_authority_set().clone();
292		let justification_stream = grandpa_link.justification_stream();
293		let data_sources = data_sources.clone();
294
295		move |subscription_executor| {
296			let grandpa = GrandpaDeps {
297				shared_voter_state: shared_voter_state.clone(),
298				shared_authority_set: shared_authority_set.clone(),
299				justification_stream: justification_stream.clone(),
300				subscription_executor,
301				finality_provider: sc_consensus_grandpa::FinalityProofProvider::new_for_service(
302					backend.clone(),
303					Some(shared_authority_set.clone()),
304				),
305			};
306			let deps = crate::rpc::FullDeps {
307				client: client.clone(),
308				pool: pool.clone(),
309				grandpa,
310				data_sources: data_sources.clone(),
311				time_source: Arc::new(SystemTimeSource),
312			};
313			crate::rpc::create_full(deps).map_err(Into::into)
314		}
315	};
316
317	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
318		network: network.clone(),
319		client: client.clone(),
320		keystore: keystore_container.keystore(),
321		task_manager: &mut task_manager,
322		transaction_pool: transaction_pool.clone(),
323		rpc_builder: Box::new(rpc_extensions_builder),
324		backend,
325		system_rpc_tx,
326		tx_handler_controller,
327		sync_service: sync_service.clone(),
328		config,
329		telemetry: telemetry.as_mut(),
330	})?;
331
332	if role.is_authority() {
333		let basic_authorship_proposer_factory = sc_basic_authorship::ProposerFactory::new(
334			task_manager.spawn_handle(),
335			client.clone(),
336			transaction_pool.clone(),
337			prometheus_registry.as_ref(),
338			telemetry.as_ref().map(|x| x.handle()),
339		);
340		let proposer_factory: PartnerChainsProposerFactory<_, _, McHashInherentDigest> =
341			PartnerChainsProposerFactory::new(basic_authorship_proposer_factory);
342
343		let aura = sc_partner_chains_consensus_aura::start_aura::<
344			AuraPair,
345			_,
346			_,
347			_,
348			_,
349			_,
350			_,
351			_,
352			_,
353			_,
354			_,
355			McHashInherentDigest,
356		>(StartAuraParams {
357			slot_duration: inherent_config.slot_duration,
358			client: client.clone(),
359			select_chain,
360			block_import,
361			proposer_factory,
362			create_inherent_data_providers: ProposalCIDP::new(
363				inherent_config,
364				client.clone(),
365				data_sources.mc_hash.clone(),
366				data_sources.authority_selection.clone(),
367				data_sources.block_participation,
368				data_sources.governed_map,
369				data_sources.bridge.clone(),
370			),
371			force_authoring,
372			backoff_authoring_blocks,
373			keystore: keystore_container.keystore(),
374			sync_oracle: sync_service.clone(),
375			justification_sync_link: sync_service.clone(),
376			block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32),
377			max_block_proposal_slot_portion: None,
378			telemetry: telemetry.as_ref().map(|x| x.handle()),
379			compatibility_mode: Default::default(),
380		})?;
381
382		// the AURA authoring task is considered essential, i.e. if it
383		// fails we take down the service with it.
384		task_manager
385			.spawn_essential_handle()
386			.spawn_blocking("aura", Some("block-authoring"), aura);
387	}
388
389	if enable_grandpa {
390		// if the node isn't actively participating in consensus then it doesn't
391		// need a keystore, regardless of which protocol we use below.
392		let keystore = if role.is_authority() { Some(keystore_container.keystore()) } else { None };
393
394		let grandpa_config = sc_consensus_grandpa::Config {
395			// FIXME #1578 make this available through chainspec
396			gossip_duration: Duration::from_millis(333),
397			justification_generation_period: GRANDPA_JUSTIFICATION_PERIOD,
398			name: Some(name),
399			observer_enabled: false,
400			keystore,
401			local_role: role,
402			telemetry: telemetry.as_ref().map(|x| x.handle()),
403			protocol_name: grandpa_protocol_name,
404		};
405
406		// start the full GRANDPA voter
407		// NOTE: non-authorities could run the GRANDPA observer protocol, but at
408		// this point the full voter should provide better guarantees of block
409		// and vote data availability than the observer. The observer has not
410		// been tested extensively yet and having most nodes in a network run it
411		// could lead to finality stalls.
412		let grandpa_config = sc_consensus_grandpa::GrandpaParams {
413			config: grandpa_config,
414			link: grandpa_link,
415			network,
416			sync: Arc::new(sync_service),
417			notification_service: grandpa_notification_service,
418			voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(),
419			prometheus_registry,
420			shared_voter_state,
421			telemetry: telemetry.as_ref().map(|x| x.handle()),
422			offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool),
423		};
424
425		// the GRANDPA voter task is considered infallible, i.e.
426		// if it fails we take down the service with it.
427		task_manager.spawn_essential_handle().spawn_blocking(
428			"grandpa-voter",
429			None,
430			sc_consensus_grandpa::run_grandpa_voter(grandpa_config)?,
431		);
432	}
433
434	Ok(task_manager)
435}