partner_chains_demo_node/
service.rs

1//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
2
3use crate::data_sources::DataSources;
4use crate::inherent_data::{CreateInherentDataConfig, ProposalCIDP, VerifierCIDP};
5use crate::rpc::GrandpaDeps;
6use authority_selection_inherents::AuthoritySelectionDataSource;
7use partner_chains_db_sync_data_sources::McFollowerMetrics;
8use partner_chains_db_sync_data_sources::register_metrics_warn_errors;
9use partner_chains_demo_runtime::{self, RuntimeApi, opaque::Block};
10use sc_client_api::BlockBackend;
11use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
12use sc_consensus_grandpa::SharedVoterState;
13pub use sc_executor::WasmExecutor;
14use sc_partner_chains_consensus_aura::import_queue as partner_chains_aura_import_queue;
15use sc_service::{Configuration, TaskManager, WarpSyncConfig, error::Error as ServiceError};
16use sc_telemetry::{Telemetry, TelemetryWorker};
17use sc_transaction_pool_api::OffchainTransactionPoolFactory;
18use sidechain_domain::mainchain_epoch::MainchainEpochConfig;
19use sidechain_mc_hash::McHashInherentDigest;
20use sp_consensus_aura::sr25519::AuthorityPair as AuraPair;
21use sp_partner_chains_consensus_aura::block_proposal::PartnerChainsProposerFactory;
22use sp_runtime::traits::Block as BlockT;
23use std::{sync::Arc, time::Duration};
24use time_source::SystemTimeSource;
25use tokio::task;
26
27type HostFunctions = sp_io::SubstrateHostFunctions;
28
29pub(crate) type FullClient =
30	sc_service::TFullClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
31type FullBackend = sc_service::TFullBackend<Block>;
32type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
33
34/// The minimum period of blocks on which justifications will be
35/// imported and generated.
36const GRANDPA_JUSTIFICATION_PERIOD: u32 = 512;
37
38/// This function provides dependencies of [partner_chains_node_commands::PartnerChainsSubcommand].
39/// It is not mandatory to have such a dedicated function, [new_partial] could be enough,
40/// however using such a specialized function decreases number of possible failures and wiring time.
41pub fn new_pc_command_deps(
42	config: &Configuration,
43) -> Result<
44	(Arc<FullClient>, TaskManager, Arc<dyn AuthoritySelectionDataSource + Send + Sync>),
45	ServiceError,
46> {
47	let data_sources = task::block_in_place(|| {
48		config
49			.tokio_handle
50			.block_on(crate::data_sources::create_cached_data_sources(None))
51	})?;
52	let executor = sc_service::new_wasm_executor(&config.executor);
53	let (client, _, _, task_manager) =
54		sc_service::new_full_parts::<Block, RuntimeApi, _>(config, None, executor)?;
55	let client = Arc::new(client);
56	Ok((client, task_manager, data_sources.authority_selection))
57}
58
59#[allow(clippy::type_complexity)]
60pub fn new_partial(
61	config: &Configuration,
62) -> Result<
63	sc_service::PartialComponents<
64		FullClient,
65		FullBackend,
66		FullSelectChain,
67		sc_consensus::DefaultImportQueue<Block>,
68		sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
69		(
70			sc_consensus_grandpa::GrandpaBlockImport<
71				FullBackend,
72				Block,
73				FullClient,
74				FullSelectChain,
75			>,
76			sc_consensus_grandpa::LinkHalf<Block, FullClient, FullSelectChain>,
77			Option<Telemetry>,
78			DataSources,
79			Option<McFollowerMetrics>,
80		),
81	>,
82	ServiceError,
83> {
84	let mc_follower_metrics = register_metrics_warn_errors(config.prometheus_registry());
85	let data_sources = task::block_in_place(|| {
86		config
87			.tokio_handle
88			.block_on(crate::data_sources::create_cached_data_sources(mc_follower_metrics.clone()))
89	})?;
90
91	let telemetry = config
92		.telemetry_endpoints
93		.clone()
94		.filter(|x| !x.is_empty())
95		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
96			let worker = TelemetryWorker::new(16)?;
97			let telemetry = worker.handle().new_telemetry(endpoints);
98			Ok((worker, telemetry))
99		})
100		.transpose()?;
101
102	let executor = sc_service::new_wasm_executor(&config.executor);
103
104	let (client, backend, keystore_container, task_manager) =
105		sc_service::new_full_parts::<Block, RuntimeApi, _>(
106			config,
107			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
108			executor,
109		)?;
110	let client = Arc::new(client);
111
112	let telemetry = telemetry.map(|(worker, telemetry)| {
113		task_manager.spawn_handle().spawn("telemetry", None, worker.run());
114		telemetry
115	});
116
117	let select_chain = sc_consensus::LongestChain::new(backend.clone());
118
119	let transaction_pool = Arc::from(
120		sc_transaction_pool::Builder::new(
121			task_manager.spawn_essential_handle(),
122			client.clone(),
123			config.role.is_authority().into(),
124		)
125		.with_options(config.transaction_pool.clone())
126		.with_prometheus(config.prometheus_registry())
127		.build(),
128	);
129
130	let (grandpa_block_import, grandpa_link) = sc_consensus_grandpa::block_import(
131		client.clone(),
132		GRANDPA_JUSTIFICATION_PERIOD,
133		&client,
134		select_chain.clone(),
135		telemetry.as_ref().map(|x| x.handle()),
136	)?;
137
138	let sc_slot_config = sidechain_slots::runtime_api_client::slot_config(&*client)
139		.map_err(sp_blockchain::Error::from)?;
140
141	let time_source = Arc::new(SystemTimeSource);
142	let epoch_config = MainchainEpochConfig::read_from_env()
143		.map_err(|err| ServiceError::Application(err.into()))?;
144	let inherent_config = CreateInherentDataConfig::new(epoch_config, sc_slot_config, time_source);
145
146	let import_queue = partner_chains_aura_import_queue::import_queue::<
147		AuraPair,
148		_,
149		_,
150		_,
151		_,
152		_,
153		McHashInherentDigest,
154	>(ImportQueueParams {
155		block_import: grandpa_block_import.clone(),
156		justification_import: Some(Box::new(grandpa_block_import.clone())),
157		client: client.clone(),
158		create_inherent_data_providers: VerifierCIDP::new(
159			inherent_config,
160			client.clone(),
161			data_sources.mc_hash.clone(),
162			data_sources.authority_selection.clone(),
163			data_sources.native_token.clone(),
164			data_sources.block_participation.clone(),
165			data_sources.governed_map.clone(),
166		),
167		spawner: &task_manager.spawn_essential_handle(),
168		registry: config.prometheus_registry(),
169		check_for_equivocation: Default::default(),
170		telemetry: telemetry.as_ref().map(|x| x.handle()),
171		compatibility_mode: Default::default(),
172	})?;
173
174	Ok(sc_service::PartialComponents {
175		client,
176		backend,
177		task_manager,
178		import_queue,
179		keystore_container,
180		select_chain,
181		transaction_pool,
182		other: (grandpa_block_import, grandpa_link, telemetry, data_sources, mc_follower_metrics),
183	})
184}
185
186pub async fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
187	let task_manager = match config.network.network_backend {
188		sc_network::config::NetworkBackendType::Libp2p => {
189			new_full_base::<sc_network::NetworkWorker<_, _>>(config).await?
190		},
191		sc_network::config::NetworkBackendType::Litep2p => {
192			new_full_base::<sc_network::Litep2pNetworkBackend>(config).await?
193		},
194	};
195
196	Ok(task_manager)
197}
198
199pub async fn new_full_base<Network: sc_network::NetworkBackend<Block, <Block as BlockT>::Hash>>(
200	config: Configuration,
201) -> Result<TaskManager, ServiceError> {
202	if let Some(git_hash) = std::option_env!("EARTHLY_GIT_HASH") {
203		log::info!("🌱 Running version: {}", git_hash);
204	}
205
206	let sc_service::PartialComponents {
207		client,
208		backend,
209		mut task_manager,
210		import_queue,
211		keystore_container,
212		select_chain,
213		transaction_pool,
214		other: (block_import, grandpa_link, mut telemetry, data_sources, _),
215	} = new_partial(&config)?;
216
217	let metrics = Network::register_notification_metrics(config.prometheus_registry());
218	let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(
219		&config.network,
220		config.prometheus_registry().cloned(),
221	);
222
223	let grandpa_protocol_name = sc_consensus_grandpa::protocol_standard_name(
224		&client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"),
225		&config.chain_spec,
226	);
227	let peer_store_handle = net_config.peer_store_handle();
228	let (grandpa_protocol_config, grandpa_notification_service) =
229		sc_consensus_grandpa::grandpa_peers_set_config::<_, Network>(
230			grandpa_protocol_name.clone(),
231			metrics.clone(),
232			Arc::clone(&peer_store_handle),
233		);
234	net_config.add_notification_protocol(grandpa_protocol_config);
235
236	let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new(
237		backend.clone(),
238		grandpa_link.shared_authority_set().clone(),
239		Vec::default(),
240	));
241
242	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
243		sc_service::build_network(sc_service::BuildNetworkParams {
244			config: &config,
245			net_config,
246			client: client.clone(),
247			transaction_pool: transaction_pool.clone(),
248			spawn_handle: task_manager.spawn_handle(),
249			import_queue,
250			block_announce_validator_builder: None,
251			warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
252			block_relay: None,
253			metrics,
254		})?;
255
256	let role = config.role;
257	let force_authoring = config.force_authoring;
258	let backoff_authoring_blocks: Option<()> = None;
259	let name = config.network.node_name.clone();
260	let enable_grandpa = !config.disable_grandpa;
261	let prometheus_registry = config.prometheus_registry().cloned();
262	let shared_voter_state = SharedVoterState::empty();
263
264	let rpc_extensions_builder = {
265		let client = client.clone();
266		let pool = transaction_pool.clone();
267		let backend = backend.clone();
268		let shared_voter_state = shared_voter_state.clone();
269		let shared_authority_set = grandpa_link.shared_authority_set().clone();
270		let justification_stream = grandpa_link.justification_stream();
271		let data_sources = data_sources.clone();
272
273		move |subscription_executor| {
274			let grandpa = GrandpaDeps {
275				shared_voter_state: shared_voter_state.clone(),
276				shared_authority_set: shared_authority_set.clone(),
277				justification_stream: justification_stream.clone(),
278				subscription_executor,
279				finality_provider: sc_consensus_grandpa::FinalityProofProvider::new_for_service(
280					backend.clone(),
281					Some(shared_authority_set.clone()),
282				),
283			};
284			let deps = crate::rpc::FullDeps {
285				client: client.clone(),
286				pool: pool.clone(),
287				grandpa,
288				data_sources: data_sources.clone(),
289				time_source: Arc::new(SystemTimeSource),
290			};
291			crate::rpc::create_full(deps).map_err(Into::into)
292		}
293	};
294
295	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
296		network: network.clone(),
297		client: client.clone(),
298		keystore: keystore_container.keystore(),
299		task_manager: &mut task_manager,
300		transaction_pool: transaction_pool.clone(),
301		rpc_builder: Box::new(rpc_extensions_builder),
302		backend,
303		system_rpc_tx,
304		tx_handler_controller,
305		sync_service: sync_service.clone(),
306		config,
307		telemetry: telemetry.as_mut(),
308	})?;
309
310	if role.is_authority() {
311		let basic_authorship_proposer_factory = sc_basic_authorship::ProposerFactory::new(
312			task_manager.spawn_handle(),
313			client.clone(),
314			transaction_pool.clone(),
315			prometheus_registry.as_ref(),
316			telemetry.as_ref().map(|x| x.handle()),
317		);
318		let proposer_factory: PartnerChainsProposerFactory<_, _, McHashInherentDigest> =
319			PartnerChainsProposerFactory::new(basic_authorship_proposer_factory);
320
321		let sc_slot_config = sidechain_slots::runtime_api_client::slot_config(&*client)
322			.map_err(sp_blockchain::Error::from)?;
323		let time_source = Arc::new(SystemTimeSource);
324		let mc_epoch_config = MainchainEpochConfig::read_from_env()
325			.map_err(|err| ServiceError::Application(err.into()))?;
326		let inherent_config =
327			CreateInherentDataConfig::new(mc_epoch_config, sc_slot_config.clone(), time_source);
328		let aura = sc_partner_chains_consensus_aura::start_aura::<
329			AuraPair,
330			_,
331			_,
332			_,
333			_,
334			_,
335			_,
336			_,
337			_,
338			_,
339			_,
340			McHashInherentDigest,
341		>(StartAuraParams {
342			slot_duration: sc_slot_config.slot_duration,
343			client: client.clone(),
344			select_chain,
345			block_import,
346			proposer_factory,
347			create_inherent_data_providers: ProposalCIDP::new(
348				inherent_config,
349				client.clone(),
350				data_sources.mc_hash.clone(),
351				data_sources.authority_selection.clone(),
352				data_sources.native_token.clone(),
353				data_sources.block_participation,
354				data_sources.governed_map,
355			),
356			force_authoring,
357			backoff_authoring_blocks,
358			keystore: keystore_container.keystore(),
359			sync_oracle: sync_service.clone(),
360			justification_sync_link: sync_service.clone(),
361			block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32),
362			max_block_proposal_slot_portion: None,
363			telemetry: telemetry.as_ref().map(|x| x.handle()),
364			compatibility_mode: Default::default(),
365		})?;
366
367		// the AURA authoring task is considered essential, i.e. if it
368		// fails we take down the service with it.
369		task_manager
370			.spawn_essential_handle()
371			.spawn_blocking("aura", Some("block-authoring"), aura);
372	}
373
374	if enable_grandpa {
375		// if the node isn't actively participating in consensus then it doesn't
376		// need a keystore, regardless of which protocol we use below.
377		let keystore = if role.is_authority() { Some(keystore_container.keystore()) } else { None };
378
379		let grandpa_config = sc_consensus_grandpa::Config {
380			// FIXME #1578 make this available through chainspec
381			gossip_duration: Duration::from_millis(333),
382			justification_generation_period: GRANDPA_JUSTIFICATION_PERIOD,
383			name: Some(name),
384			observer_enabled: false,
385			keystore,
386			local_role: role,
387			telemetry: telemetry.as_ref().map(|x| x.handle()),
388			protocol_name: grandpa_protocol_name,
389		};
390
391		// start the full GRANDPA voter
392		// NOTE: non-authorities could run the GRANDPA observer protocol, but at
393		// this point the full voter should provide better guarantees of block
394		// and vote data availability than the observer. The observer has not
395		// been tested extensively yet and having most nodes in a network run it
396		// could lead to finality stalls.
397		let grandpa_config = sc_consensus_grandpa::GrandpaParams {
398			config: grandpa_config,
399			link: grandpa_link,
400			network,
401			sync: Arc::new(sync_service),
402			notification_service: grandpa_notification_service,
403			voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(),
404			prometheus_registry,
405			shared_voter_state,
406			telemetry: telemetry.as_ref().map(|x| x.handle()),
407			offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool),
408		};
409
410		// the GRANDPA voter task is considered infallible, i.e.
411		// if it fails we take down the service with it.
412		task_manager.spawn_essential_handle().spawn_blocking(
413			"grandpa-voter",
414			None,
415			sc_consensus_grandpa::run_grandpa_voter(grandpa_config)?,
416		);
417	}
418
419	Ok(task_manager)
420}