partner_chains_demo_node/
service.rs

1//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
2
3use crate::data_sources::DataSources;
4use crate::inherent_data::{CreateInherentDataConfig, ProposalCIDP, VerifierCIDP};
5use crate::rpc::GrandpaDeps;
6use partner_chains_db_sync_data_sources::McFollowerMetrics;
7use partner_chains_db_sync_data_sources::register_metrics_warn_errors;
8use partner_chains_demo_runtime::{self, RuntimeApi, opaque::Block};
9use sc_client_api::BlockBackend;
10use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
11use sc_consensus_grandpa::SharedVoterState;
12pub use sc_executor::WasmExecutor;
13use sc_partner_chains_consensus_aura::import_queue as partner_chains_aura_import_queue;
14use sc_service::{Configuration, TaskManager, WarpSyncConfig, error::Error as ServiceError};
15use sc_telemetry::{Telemetry, TelemetryWorker};
16use sc_transaction_pool_api::OffchainTransactionPoolFactory;
17use sidechain_domain::mainchain_epoch::MainchainEpochConfig;
18use sidechain_mc_hash::McHashInherentDigest;
19use sp_consensus_aura::sr25519::AuthorityPair as AuraPair;
20use sp_partner_chains_consensus_aura::block_proposal::PartnerChainsProposerFactory;
21use sp_runtime::traits::Block as BlockT;
22use std::{sync::Arc, time::Duration};
23use time_source::SystemTimeSource;
24use tokio::task;
25
26type HostFunctions = sp_io::SubstrateHostFunctions;
27
28pub(crate) type FullClient =
29	sc_service::TFullClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
30type FullBackend = sc_service::TFullBackend<Block>;
31type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
32
33/// The minimum period of blocks on which justifications will be
34/// imported and generated.
35const GRANDPA_JUSTIFICATION_PERIOD: u32 = 512;
36
37#[allow(clippy::type_complexity)]
38pub fn new_partial(
39	config: &Configuration,
40) -> Result<
41	sc_service::PartialComponents<
42		FullClient,
43		FullBackend,
44		FullSelectChain,
45		sc_consensus::DefaultImportQueue<Block>,
46		sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
47		(
48			sc_consensus_grandpa::GrandpaBlockImport<
49				FullBackend,
50				Block,
51				FullClient,
52				FullSelectChain,
53			>,
54			sc_consensus_grandpa::LinkHalf<Block, FullClient, FullSelectChain>,
55			Option<Telemetry>,
56			DataSources,
57			Option<McFollowerMetrics>,
58		),
59	>,
60	ServiceError,
61> {
62	let mc_follower_metrics = register_metrics_warn_errors(config.prometheus_registry());
63	let data_sources = task::block_in_place(|| {
64		config
65			.tokio_handle
66			.block_on(crate::data_sources::create_cached_data_sources(mc_follower_metrics.clone()))
67	})?;
68
69	let telemetry = config
70		.telemetry_endpoints
71		.clone()
72		.filter(|x| !x.is_empty())
73		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
74			let worker = TelemetryWorker::new(16)?;
75			let telemetry = worker.handle().new_telemetry(endpoints);
76			Ok((worker, telemetry))
77		})
78		.transpose()?;
79
80	let executor = sc_service::new_wasm_executor(&config.executor);
81
82	let (client, backend, keystore_container, task_manager) =
83		sc_service::new_full_parts::<Block, RuntimeApi, _>(
84			config,
85			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
86			executor,
87		)?;
88	let client = Arc::new(client);
89
90	let telemetry = telemetry.map(|(worker, telemetry)| {
91		task_manager.spawn_handle().spawn("telemetry", None, worker.run());
92		telemetry
93	});
94
95	let select_chain = sc_consensus::LongestChain::new(backend.clone());
96
97	let transaction_pool = Arc::from(
98		sc_transaction_pool::Builder::new(
99			task_manager.spawn_essential_handle(),
100			client.clone(),
101			config.role.is_authority().into(),
102		)
103		.with_options(config.transaction_pool.clone())
104		.with_prometheus(config.prometheus_registry())
105		.build(),
106	);
107
108	let (grandpa_block_import, grandpa_link) = sc_consensus_grandpa::block_import(
109		client.clone(),
110		GRANDPA_JUSTIFICATION_PERIOD,
111		&client,
112		select_chain.clone(),
113		telemetry.as_ref().map(|x| x.handle()),
114	)?;
115
116	let sc_slot_config = sidechain_slots::runtime_api_client::slot_config(&*client)
117		.map_err(sp_blockchain::Error::from)?;
118
119	let time_source = Arc::new(SystemTimeSource);
120	let epoch_config = MainchainEpochConfig::read_from_env()
121		.map_err(|err| ServiceError::Application(err.into()))?;
122	let inherent_config = CreateInherentDataConfig::new(epoch_config, sc_slot_config, time_source);
123
124	let import_queue = partner_chains_aura_import_queue::import_queue::<
125		AuraPair,
126		_,
127		_,
128		_,
129		_,
130		_,
131		McHashInherentDigest,
132	>(ImportQueueParams {
133		block_import: grandpa_block_import.clone(),
134		justification_import: Some(Box::new(grandpa_block_import.clone())),
135		client: client.clone(),
136		create_inherent_data_providers: VerifierCIDP::new(
137			inherent_config,
138			client.clone(),
139			data_sources.mc_hash.clone(),
140			data_sources.authority_selection.clone(),
141			data_sources.native_token.clone(),
142			data_sources.block_participation.clone(),
143			data_sources.governed_map.clone(),
144		),
145		spawner: &task_manager.spawn_essential_handle(),
146		registry: config.prometheus_registry(),
147		check_for_equivocation: Default::default(),
148		telemetry: telemetry.as_ref().map(|x| x.handle()),
149		compatibility_mode: Default::default(),
150	})?;
151
152	Ok(sc_service::PartialComponents {
153		client,
154		backend,
155		task_manager,
156		import_queue,
157		keystore_container,
158		select_chain,
159		transaction_pool,
160		other: (grandpa_block_import, grandpa_link, telemetry, data_sources, mc_follower_metrics),
161	})
162}
163
164pub async fn new_full<Network: sc_network::NetworkBackend<Block, <Block as BlockT>::Hash>>(
165	config: Configuration,
166) -> Result<TaskManager, ServiceError> {
167	if let Some(git_hash) = std::option_env!("EARTHLY_GIT_HASH") {
168		log::info!("🌱 Running version: {}", git_hash);
169	}
170
171	let sc_service::PartialComponents {
172		client,
173		backend,
174		mut task_manager,
175		import_queue,
176		keystore_container,
177		select_chain,
178		transaction_pool,
179		other: (block_import, grandpa_link, mut telemetry, data_sources, _),
180	} = new_partial(&config)?;
181
182	let metrics = Network::register_notification_metrics(config.prometheus_registry());
183	let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(
184		&config.network,
185		config.prometheus_registry().cloned(),
186	);
187
188	let grandpa_protocol_name = sc_consensus_grandpa::protocol_standard_name(
189		&client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"),
190		&config.chain_spec,
191	);
192	let peer_store_handle = net_config.peer_store_handle();
193	let (grandpa_protocol_config, grandpa_notification_service) =
194		sc_consensus_grandpa::grandpa_peers_set_config::<_, Network>(
195			grandpa_protocol_name.clone(),
196			metrics.clone(),
197			Arc::clone(&peer_store_handle),
198		);
199	net_config.add_notification_protocol(grandpa_protocol_config);
200
201	let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new(
202		backend.clone(),
203		grandpa_link.shared_authority_set().clone(),
204		Vec::default(),
205	));
206
207	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
208		sc_service::build_network(sc_service::BuildNetworkParams {
209			config: &config,
210			net_config,
211			client: client.clone(),
212			transaction_pool: transaction_pool.clone(),
213			spawn_handle: task_manager.spawn_handle(),
214			import_queue,
215			block_announce_validator_builder: None,
216			warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
217			block_relay: None,
218			metrics,
219		})?;
220
221	let role = config.role;
222	let force_authoring = config.force_authoring;
223	let backoff_authoring_blocks: Option<()> = None;
224	let name = config.network.node_name.clone();
225	let enable_grandpa = !config.disable_grandpa;
226	let prometheus_registry = config.prometheus_registry().cloned();
227	let shared_voter_state = SharedVoterState::empty();
228
229	let rpc_extensions_builder = {
230		let client = client.clone();
231		let pool = transaction_pool.clone();
232		let backend = backend.clone();
233		let shared_voter_state = shared_voter_state.clone();
234		let shared_authority_set = grandpa_link.shared_authority_set().clone();
235		let justification_stream = grandpa_link.justification_stream();
236		let data_sources = data_sources.clone();
237
238		move |subscription_executor| {
239			let grandpa = GrandpaDeps {
240				shared_voter_state: shared_voter_state.clone(),
241				shared_authority_set: shared_authority_set.clone(),
242				justification_stream: justification_stream.clone(),
243				subscription_executor,
244				finality_provider: sc_consensus_grandpa::FinalityProofProvider::new_for_service(
245					backend.clone(),
246					Some(shared_authority_set.clone()),
247				),
248			};
249			let deps = crate::rpc::FullDeps {
250				client: client.clone(),
251				pool: pool.clone(),
252				grandpa,
253				data_sources: data_sources.clone(),
254				time_source: Arc::new(SystemTimeSource),
255			};
256			crate::rpc::create_full(deps).map_err(Into::into)
257		}
258	};
259
260	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
261		network: network.clone(),
262		client: client.clone(),
263		keystore: keystore_container.keystore(),
264		task_manager: &mut task_manager,
265		transaction_pool: transaction_pool.clone(),
266		rpc_builder: Box::new(rpc_extensions_builder),
267		backend,
268		system_rpc_tx,
269		tx_handler_controller,
270		sync_service: sync_service.clone(),
271		config,
272		telemetry: telemetry.as_mut(),
273	})?;
274
275	if role.is_authority() {
276		let basic_authorship_proposer_factory = sc_basic_authorship::ProposerFactory::new(
277			task_manager.spawn_handle(),
278			client.clone(),
279			transaction_pool.clone(),
280			prometheus_registry.as_ref(),
281			telemetry.as_ref().map(|x| x.handle()),
282		);
283		let proposer_factory: PartnerChainsProposerFactory<_, _, McHashInherentDigest> =
284			PartnerChainsProposerFactory::new(basic_authorship_proposer_factory);
285
286		let sc_slot_config = sidechain_slots::runtime_api_client::slot_config(&*client)
287			.map_err(sp_blockchain::Error::from)?;
288		let time_source = Arc::new(SystemTimeSource);
289		let mc_epoch_config = MainchainEpochConfig::read_from_env()
290			.map_err(|err| ServiceError::Application(err.into()))?;
291		let inherent_config =
292			CreateInherentDataConfig::new(mc_epoch_config, sc_slot_config.clone(), time_source);
293		let aura = sc_partner_chains_consensus_aura::start_aura::<
294			AuraPair,
295			_,
296			_,
297			_,
298			_,
299			_,
300			_,
301			_,
302			_,
303			_,
304			_,
305			McHashInherentDigest,
306		>(StartAuraParams {
307			slot_duration: sc_slot_config.slot_duration,
308			client: client.clone(),
309			select_chain,
310			block_import,
311			proposer_factory,
312			create_inherent_data_providers: ProposalCIDP::new(
313				inherent_config,
314				client.clone(),
315				data_sources.mc_hash.clone(),
316				data_sources.authority_selection.clone(),
317				data_sources.native_token.clone(),
318				data_sources.block_participation,
319				data_sources.governed_map,
320			),
321			force_authoring,
322			backoff_authoring_blocks,
323			keystore: keystore_container.keystore(),
324			sync_oracle: sync_service.clone(),
325			justification_sync_link: sync_service.clone(),
326			block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32),
327			max_block_proposal_slot_portion: None,
328			telemetry: telemetry.as_ref().map(|x| x.handle()),
329			compatibility_mode: Default::default(),
330		})?;
331
332		// the AURA authoring task is considered essential, i.e. if it
333		// fails we take down the service with it.
334		task_manager
335			.spawn_essential_handle()
336			.spawn_blocking("aura", Some("block-authoring"), aura);
337	}
338
339	if enable_grandpa {
340		// if the node isn't actively participating in consensus then it doesn't
341		// need a keystore, regardless of which protocol we use below.
342		let keystore = if role.is_authority() { Some(keystore_container.keystore()) } else { None };
343
344		let grandpa_config = sc_consensus_grandpa::Config {
345			// FIXME #1578 make this available through chainspec
346			gossip_duration: Duration::from_millis(333),
347			justification_generation_period: GRANDPA_JUSTIFICATION_PERIOD,
348			name: Some(name),
349			observer_enabled: false,
350			keystore,
351			local_role: role,
352			telemetry: telemetry.as_ref().map(|x| x.handle()),
353			protocol_name: grandpa_protocol_name,
354		};
355
356		// start the full GRANDPA voter
357		// NOTE: non-authorities could run the GRANDPA observer protocol, but at
358		// this point the full voter should provide better guarantees of block
359		// and vote data availability than the observer. The observer has not
360		// been tested extensively yet and having most nodes in a network run it
361		// could lead to finality stalls.
362		let grandpa_config = sc_consensus_grandpa::GrandpaParams {
363			config: grandpa_config,
364			link: grandpa_link,
365			network,
366			sync: Arc::new(sync_service),
367			notification_service: grandpa_notification_service,
368			voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(),
369			prometheus_registry,
370			shared_voter_state,
371			telemetry: telemetry.as_ref().map(|x| x.handle()),
372			offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool),
373		};
374
375		// the GRANDPA voter task is considered infallible, i.e.
376		// if it fails we take down the service with it.
377		task_manager.spawn_essential_handle().spawn_blocking(
378			"grandpa-voter",
379			None,
380			sc_consensus_grandpa::run_grandpa_voter(grandpa_config)?,
381		);
382	}
383
384	Ok(task_manager)
385}