partner_chains_demo_node/
service.rs

1//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
2
3use crate::data_sources::DataSources;
4use crate::inherent_data::{CreateInherentDataConfig, ProposalCIDP, VerifierCIDP};
5use crate::rpc::GrandpaDeps;
6use authority_selection_inherents::AuthoritySelectionDataSource;
7use partner_chains_db_sync_data_sources::McFollowerMetrics;
8use partner_chains_db_sync_data_sources::register_metrics_warn_errors;
9use partner_chains_demo_runtime::{self, RuntimeApi, opaque::Block};
10use sc_client_api::BlockBackend;
11use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
12use sc_consensus_grandpa::SharedVoterState;
13pub use sc_executor::WasmExecutor;
14use sc_partner_chains_consensus_aura::import_queue as partner_chains_aura_import_queue;
15use sc_service::{Configuration, TaskManager, WarpSyncConfig, error::Error as ServiceError};
16use sc_telemetry::{Telemetry, TelemetryWorker};
17use sc_transaction_pool_api::OffchainTransactionPoolFactory;
18use sidechain_domain::mainchain_epoch::MainchainEpochConfig;
19use sidechain_mc_hash::McHashInherentDigest;
20use sp_consensus_aura::sr25519::AuthorityPair as AuraPair;
21use sp_partner_chains_consensus_aura::block_proposal::PartnerChainsProposerFactory;
22use sp_runtime::traits::Block as BlockT;
23use std::{sync::Arc, time::Duration};
24use time_source::SystemTimeSource;
25use tokio::task;
26
27type HostFunctions = sp_io::SubstrateHostFunctions;
28
29pub(crate) type FullClient =
30	sc_service::TFullClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
31type FullBackend = sc_service::TFullBackend<Block>;
32type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
33
34/// The minimum period of blocks on which justifications will be
35/// imported and generated.
36const GRANDPA_JUSTIFICATION_PERIOD: u32 = 512;
37
38/// This function provides dependencies of [partner_chains_node_commands::PartnerChainsSubcommand].
39/// It is not mandatory to have such a dedicated function, [new_partial] could be enough,
40/// however using such a specialized function decreases number of possible failures and wiring time.
41pub fn new_pc_command_deps(
42	config: &Configuration,
43) -> Result<
44	(Arc<FullClient>, TaskManager, Arc<dyn AuthoritySelectionDataSource + Send + Sync>),
45	ServiceError,
46> {
47	let data_sources = task::block_in_place(|| {
48		config
49			.tokio_handle
50			.block_on(crate::data_sources::create_cached_data_sources(None))
51	})?;
52	let executor = sc_service::new_wasm_executor(&config.executor);
53	let (client, _, _, task_manager) =
54		sc_service::new_full_parts::<Block, RuntimeApi, _>(config, None, executor)?;
55	let client = Arc::new(client);
56	Ok((client, task_manager, data_sources.authority_selection))
57}
58
59#[allow(clippy::type_complexity)]
60pub fn new_partial(
61	config: &Configuration,
62) -> Result<
63	sc_service::PartialComponents<
64		FullClient,
65		FullBackend,
66		FullSelectChain,
67		sc_consensus::DefaultImportQueue<Block>,
68		sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
69		(
70			sc_consensus_grandpa::GrandpaBlockImport<
71				FullBackend,
72				Block,
73				FullClient,
74				FullSelectChain,
75			>,
76			sc_consensus_grandpa::LinkHalf<Block, FullClient, FullSelectChain>,
77			Option<Telemetry>,
78			DataSources,
79			Option<McFollowerMetrics>,
80		),
81	>,
82	ServiceError,
83> {
84	let mc_follower_metrics = register_metrics_warn_errors(config.prometheus_registry());
85	let data_sources = task::block_in_place(|| {
86		config
87			.tokio_handle
88			.block_on(crate::data_sources::create_cached_data_sources(mc_follower_metrics.clone()))
89	})?;
90
91	let telemetry = config
92		.telemetry_endpoints
93		.clone()
94		.filter(|x| !x.is_empty())
95		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
96			let worker = TelemetryWorker::new(16)?;
97			let telemetry = worker.handle().new_telemetry(endpoints);
98			Ok((worker, telemetry))
99		})
100		.transpose()?;
101
102	let executor = sc_service::new_wasm_executor(&config.executor);
103
104	let (client, backend, keystore_container, task_manager) =
105		sc_service::new_full_parts::<Block, RuntimeApi, _>(
106			config,
107			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
108			executor,
109		)?;
110	let client = Arc::new(client);
111
112	let telemetry = telemetry.map(|(worker, telemetry)| {
113		task_manager.spawn_handle().spawn("telemetry", None, worker.run());
114		telemetry
115	});
116
117	let select_chain = sc_consensus::LongestChain::new(backend.clone());
118
119	let transaction_pool = Arc::from(
120		sc_transaction_pool::Builder::new(
121			task_manager.spawn_essential_handle(),
122			client.clone(),
123			config.role.is_authority().into(),
124		)
125		.with_options(config.transaction_pool.clone())
126		.with_prometheus(config.prometheus_registry())
127		.build(),
128	);
129
130	let (grandpa_block_import, grandpa_link) = sc_consensus_grandpa::block_import(
131		client.clone(),
132		GRANDPA_JUSTIFICATION_PERIOD,
133		&client,
134		select_chain.clone(),
135		telemetry.as_ref().map(|x| x.handle()),
136	)?;
137
138	let sc_slot_config = sidechain_slots::runtime_api_client::slot_config(&*client)
139		.map_err(sp_blockchain::Error::from)?;
140
141	let time_source = Arc::new(SystemTimeSource);
142	let epoch_config = MainchainEpochConfig::read_from_env()
143		.map_err(|err| ServiceError::Application(err.into()))?;
144	let inherent_config = CreateInherentDataConfig::new(epoch_config, sc_slot_config, time_source);
145
146	let import_queue = partner_chains_aura_import_queue::import_queue::<
147		AuraPair,
148		_,
149		_,
150		_,
151		_,
152		_,
153		McHashInherentDigest,
154	>(ImportQueueParams {
155		block_import: grandpa_block_import.clone(),
156		justification_import: Some(Box::new(grandpa_block_import.clone())),
157		client: client.clone(),
158		create_inherent_data_providers: VerifierCIDP::new(
159			inherent_config,
160			client.clone(),
161			data_sources.mc_hash.clone(),
162			data_sources.authority_selection.clone(),
163			data_sources.native_token.clone(),
164			data_sources.block_participation.clone(),
165			data_sources.governed_map.clone(),
166			data_sources.bridge.clone(),
167		),
168		spawner: &task_manager.spawn_essential_handle(),
169		registry: config.prometheus_registry(),
170		check_for_equivocation: Default::default(),
171		telemetry: telemetry.as_ref().map(|x| x.handle()),
172		compatibility_mode: Default::default(),
173	})?;
174
175	Ok(sc_service::PartialComponents {
176		client,
177		backend,
178		task_manager,
179		import_queue,
180		keystore_container,
181		select_chain,
182		transaction_pool,
183		other: (grandpa_block_import, grandpa_link, telemetry, data_sources, mc_follower_metrics),
184	})
185}
186
187pub async fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
188	let task_manager = match config.network.network_backend {
189		sc_network::config::NetworkBackendType::Libp2p => {
190			new_full_base::<sc_network::NetworkWorker<_, _>>(config).await?
191		},
192		sc_network::config::NetworkBackendType::Litep2p => {
193			new_full_base::<sc_network::Litep2pNetworkBackend>(config).await?
194		},
195	};
196
197	Ok(task_manager)
198}
199
200pub async fn new_full_base<Network: sc_network::NetworkBackend<Block, <Block as BlockT>::Hash>>(
201	config: Configuration,
202) -> Result<TaskManager, ServiceError> {
203	if let Some(git_hash) = std::option_env!("EARTHLY_GIT_HASH") {
204		log::info!("🌱 Running version: {}", git_hash);
205	}
206
207	let sc_service::PartialComponents {
208		client,
209		backend,
210		mut task_manager,
211		import_queue,
212		keystore_container,
213		select_chain,
214		transaction_pool,
215		other: (block_import, grandpa_link, mut telemetry, data_sources, _),
216	} = new_partial(&config)?;
217
218	let metrics = Network::register_notification_metrics(config.prometheus_registry());
219	let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(
220		&config.network,
221		config.prometheus_registry().cloned(),
222	);
223
224	let grandpa_protocol_name = sc_consensus_grandpa::protocol_standard_name(
225		&client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"),
226		&config.chain_spec,
227	);
228	let peer_store_handle = net_config.peer_store_handle();
229	let (grandpa_protocol_config, grandpa_notification_service) =
230		sc_consensus_grandpa::grandpa_peers_set_config::<_, Network>(
231			grandpa_protocol_name.clone(),
232			metrics.clone(),
233			Arc::clone(&peer_store_handle),
234		);
235	net_config.add_notification_protocol(grandpa_protocol_config);
236
237	let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new(
238		backend.clone(),
239		grandpa_link.shared_authority_set().clone(),
240		Vec::default(),
241	));
242
243	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
244		sc_service::build_network(sc_service::BuildNetworkParams {
245			config: &config,
246			net_config,
247			client: client.clone(),
248			transaction_pool: transaction_pool.clone(),
249			spawn_handle: task_manager.spawn_handle(),
250			import_queue,
251			block_announce_validator_builder: None,
252			warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
253			block_relay: None,
254			metrics,
255		})?;
256
257	let role = config.role;
258	let force_authoring = config.force_authoring;
259	let backoff_authoring_blocks: Option<()> = None;
260	let name = config.network.node_name.clone();
261	let enable_grandpa = !config.disable_grandpa;
262	let prometheus_registry = config.prometheus_registry().cloned();
263	let shared_voter_state = SharedVoterState::empty();
264
265	let rpc_extensions_builder = {
266		let client = client.clone();
267		let pool = transaction_pool.clone();
268		let backend = backend.clone();
269		let shared_voter_state = shared_voter_state.clone();
270		let shared_authority_set = grandpa_link.shared_authority_set().clone();
271		let justification_stream = grandpa_link.justification_stream();
272		let data_sources = data_sources.clone();
273
274		move |subscription_executor| {
275			let grandpa = GrandpaDeps {
276				shared_voter_state: shared_voter_state.clone(),
277				shared_authority_set: shared_authority_set.clone(),
278				justification_stream: justification_stream.clone(),
279				subscription_executor,
280				finality_provider: sc_consensus_grandpa::FinalityProofProvider::new_for_service(
281					backend.clone(),
282					Some(shared_authority_set.clone()),
283				),
284			};
285			let deps = crate::rpc::FullDeps {
286				client: client.clone(),
287				pool: pool.clone(),
288				grandpa,
289				data_sources: data_sources.clone(),
290				time_source: Arc::new(SystemTimeSource),
291			};
292			crate::rpc::create_full(deps).map_err(Into::into)
293		}
294	};
295
296	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
297		network: network.clone(),
298		client: client.clone(),
299		keystore: keystore_container.keystore(),
300		task_manager: &mut task_manager,
301		transaction_pool: transaction_pool.clone(),
302		rpc_builder: Box::new(rpc_extensions_builder),
303		backend,
304		system_rpc_tx,
305		tx_handler_controller,
306		sync_service: sync_service.clone(),
307		config,
308		telemetry: telemetry.as_mut(),
309	})?;
310
311	if role.is_authority() {
312		let basic_authorship_proposer_factory = sc_basic_authorship::ProposerFactory::new(
313			task_manager.spawn_handle(),
314			client.clone(),
315			transaction_pool.clone(),
316			prometheus_registry.as_ref(),
317			telemetry.as_ref().map(|x| x.handle()),
318		);
319		let proposer_factory: PartnerChainsProposerFactory<_, _, McHashInherentDigest> =
320			PartnerChainsProposerFactory::new(basic_authorship_proposer_factory);
321
322		let sc_slot_config = sidechain_slots::runtime_api_client::slot_config(&*client)
323			.map_err(sp_blockchain::Error::from)?;
324		let time_source = Arc::new(SystemTimeSource);
325		let mc_epoch_config = MainchainEpochConfig::read_from_env()
326			.map_err(|err| ServiceError::Application(err.into()))?;
327		let inherent_config =
328			CreateInherentDataConfig::new(mc_epoch_config, sc_slot_config.clone(), time_source);
329		let aura = sc_partner_chains_consensus_aura::start_aura::<
330			AuraPair,
331			_,
332			_,
333			_,
334			_,
335			_,
336			_,
337			_,
338			_,
339			_,
340			_,
341			McHashInherentDigest,
342		>(StartAuraParams {
343			slot_duration: sc_slot_config.slot_duration,
344			client: client.clone(),
345			select_chain,
346			block_import,
347			proposer_factory,
348			create_inherent_data_providers: ProposalCIDP::new(
349				inherent_config,
350				client.clone(),
351				data_sources.mc_hash.clone(),
352				data_sources.authority_selection.clone(),
353				data_sources.native_token.clone(),
354				data_sources.block_participation,
355				data_sources.governed_map,
356				data_sources.bridge.clone(),
357			),
358			force_authoring,
359			backoff_authoring_blocks,
360			keystore: keystore_container.keystore(),
361			sync_oracle: sync_service.clone(),
362			justification_sync_link: sync_service.clone(),
363			block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32),
364			max_block_proposal_slot_portion: None,
365			telemetry: telemetry.as_ref().map(|x| x.handle()),
366			compatibility_mode: Default::default(),
367		})?;
368
369		// the AURA authoring task is considered essential, i.e. if it
370		// fails we take down the service with it.
371		task_manager
372			.spawn_essential_handle()
373			.spawn_blocking("aura", Some("block-authoring"), aura);
374	}
375
376	if enable_grandpa {
377		// if the node isn't actively participating in consensus then it doesn't
378		// need a keystore, regardless of which protocol we use below.
379		let keystore = if role.is_authority() { Some(keystore_container.keystore()) } else { None };
380
381		let grandpa_config = sc_consensus_grandpa::Config {
382			// FIXME #1578 make this available through chainspec
383			gossip_duration: Duration::from_millis(333),
384			justification_generation_period: GRANDPA_JUSTIFICATION_PERIOD,
385			name: Some(name),
386			observer_enabled: false,
387			keystore,
388			local_role: role,
389			telemetry: telemetry.as_ref().map(|x| x.handle()),
390			protocol_name: grandpa_protocol_name,
391		};
392
393		// start the full GRANDPA voter
394		// NOTE: non-authorities could run the GRANDPA observer protocol, but at
395		// this point the full voter should provide better guarantees of block
396		// and vote data availability than the observer. The observer has not
397		// been tested extensively yet and having most nodes in a network run it
398		// could lead to finality stalls.
399		let grandpa_config = sc_consensus_grandpa::GrandpaParams {
400			config: grandpa_config,
401			link: grandpa_link,
402			network,
403			sync: Arc::new(sync_service),
404			notification_service: grandpa_notification_service,
405			voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(),
406			prometheus_registry,
407			shared_voter_state,
408			telemetry: telemetry.as_ref().map(|x| x.handle()),
409			offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool),
410		};
411
412		// the GRANDPA voter task is considered infallible, i.e.
413		// if it fails we take down the service with it.
414		task_manager.spawn_essential_handle().spawn_blocking(
415			"grandpa-voter",
416			None,
417			sc_consensus_grandpa::run_grandpa_voter(grandpa_config)?,
418		);
419	}
420
421	Ok(task_manager)
422}