partner_chains_db_sync_data_sources/candidates/
mod.rs

1//! Db-Sync data source used by Partner Chain committee selection
2use crate::DataSourceError::*;
3use crate::db_model::{
4	self, Address, Asset, BlockNumber, DbSyncConfigurationProvider, EpochNumber, MainchainTxOutput,
5	StakePoolEntry,
6};
7use crate::metrics::{McFollowerMetrics, observed_async_trait};
8use authority_selection_inherents::*;
9use itertools::Itertools;
10use log::error;
11use partner_chains_plutus_data::{
12	d_param::DParamDatum, permissioned_candidates::PermissionedCandidateDatums,
13	registered_candidates::RegisterValidatorDatum,
14};
15use sidechain_domain::*;
16use sqlx::PgPool;
17use std::collections::HashMap;
18use std::error::Error;
19
20mod cached;
21
22#[cfg(test)]
23mod tests;
24
25#[derive(Clone, Debug)]
26struct ParsedCandidate {
27	utxo_info: UtxoInfo,
28	datum: RegisterValidatorDatum,
29	tx_inputs: Vec<UtxoId>,
30}
31
32#[derive(Debug)]
33struct RegisteredCandidate {
34	stake_pool_pub_key: StakePoolPublicKey,
35	registration_utxo: UtxoId,
36	tx_inputs: Vec<UtxoId>,
37	sidechain_signature: SidechainSignature,
38	mainchain_signature: MainchainSignature,
39	cross_chain_signature: CrossChainSignature,
40	sidechain_pub_key: SidechainPublicKey,
41	cross_chain_pub_key: CrossChainPublicKey,
42	keys: CandidateKeys,
43	utxo_info: UtxoInfo,
44}
45
46/// Db-Sync data source serving data for Partner Chain committee selection
47pub struct CandidatesDataSourceImpl {
48	/// Postgres connection pool
49	pool: PgPool,
50	/// Prometheus metrics client
51	metrics_opt: Option<McFollowerMetrics>,
52	/// Configuration used by Db-Sync
53	db_sync_config: DbSyncConfigurationProvider,
54}
55
56observed_async_trait!(
57impl AuthoritySelectionDataSource for CandidatesDataSourceImpl {
58	async fn get_ariadne_parameters(
59			&self,
60			epoch: McEpochNumber,
61			d_parameter_policy: PolicyId,
62			permissioned_candidate_policy: PolicyId
63	) -> Result<AriadneParameters, Box<dyn std::error::Error + Send + Sync>> {
64		let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
65		let d_parameter_asset = Asset::new(d_parameter_policy);
66		let permissioned_candidate_asset = Asset::new(permissioned_candidate_policy);
67
68		let (candidates_output_opt, d_output_opt) = tokio::try_join!(
69			db_model::get_token_utxo_for_epoch(&self.pool, &permissioned_candidate_asset, epoch),
70			db_model::get_token_utxo_for_epoch(&self.pool, &d_parameter_asset, epoch)
71		)?;
72
73		let d_output = d_output_opt.ok_or(ExpectedDataNotFound("DParameter".to_string()))?;
74
75		let d_datum = d_output
76			.datum
77			.map(|d| d.0)
78			.ok_or(ExpectedDataNotFound("DParameter Datum".to_string()))?;
79
80		let d_parameter = DParamDatum::try_from(d_datum)?.into();
81
82		let permissioned_candidates = match candidates_output_opt {
83			None => None,
84			Some(candidates_output) => {
85				let candidates_datum = candidates_output.datum.ok_or(
86					ExpectedDataNotFound("Permissioned Candidates List Datum".to_string()),
87				)?;
88				Some(PermissionedCandidateDatums::try_from(candidates_datum.0)?.into())
89			},
90		};
91
92		Ok(AriadneParameters { d_parameter, permissioned_candidates })
93	}
94
95	async fn get_candidates(
96			&self,
97			epoch: McEpochNumber,
98			committee_candidate_address: MainchainAddress
99	)-> Result<Vec<CandidateRegistrations>, Box<dyn std::error::Error + Send + Sync>> {
100		let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
101		let candidates = self.get_registered_candidates(epoch, committee_candidate_address).await?;
102		let stake_map = Self::make_stake_map(db_model::get_stake_distribution(&self.pool, epoch).await?);
103		Ok(Self::group_candidates_by_mc_pub_key(candidates).into_iter().map(|(mainchain_pub_key, candidate_registrations)| {
104			CandidateRegistrations {
105				stake_pool_public_key: mainchain_pub_key.clone(),
106				registrations: candidate_registrations.into_iter().map(Self::make_registration_data).collect(),
107				stake_delegation: Self::get_stake_delegation(&stake_map, &mainchain_pub_key),
108			}
109		}).collect())
110	}
111
112	async fn get_epoch_nonce(&self, epoch: McEpochNumber) -> Result<Option<EpochNonce>, Box<dyn std::error::Error + Send + Sync>> {
113		let epoch = self.get_epoch_of_data_storage(epoch)?;
114		let nonce = db_model::get_epoch_nonce(&self.pool, EpochNumber(epoch.0)).await?;
115		Ok(nonce.map(|n| EpochNonce(n.0)))
116	}
117
118	async fn data_epoch(&self, for_epoch: McEpochNumber) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
119		self.get_epoch_of_data_storage(for_epoch)
120	}
121});
122
123impl CandidatesDataSourceImpl {
124	/// Creates new instance of the data source
125	pub async fn new(
126		pool: PgPool,
127		metrics_opt: Option<McFollowerMetrics>,
128	) -> Result<CandidatesDataSourceImpl, Box<dyn std::error::Error + Send + Sync>> {
129		db_model::create_idx_ma_tx_out_ident(&pool).await?;
130		db_model::create_idx_tx_out_address(&pool).await?;
131		Ok(Self {
132			pool: pool.clone(),
133			metrics_opt,
134			db_sync_config: DbSyncConfigurationProvider::new(pool),
135		})
136	}
137
138	/// Creates a new caching instance of the data source
139	pub fn cached(
140		self,
141		candidates_for_epoch_cache_size: usize,
142	) -> std::result::Result<cached::CandidateDataSourceCached, Box<dyn Error + Send + Sync>> {
143		cached::CandidateDataSourceCached::new_from_env(self, candidates_for_epoch_cache_size)
144	}
145
146	/// Registrations state up to this block are considered as "active", after it - as "pending".
147	async fn get_last_block_for_epoch(
148		&self,
149		epoch: EpochNumber,
150	) -> Result<Option<BlockNumber>, Box<dyn std::error::Error + Send + Sync>> {
151		let block_option = db_model::get_latest_block_for_epoch(&self.pool, epoch).await?;
152		Ok(block_option.map(|b| b.block_no))
153	}
154
155	async fn get_registered_candidates(
156		&self,
157		epoch: EpochNumber,
158		committee_candidate_address: MainchainAddress,
159	) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
160		let registrations_block_for_epoch = self.get_last_block_for_epoch(epoch).await?;
161		let address: Address = Address(committee_candidate_address.to_string());
162		let active_utxos = match registrations_block_for_epoch {
163			Some(block) => {
164				db_model::get_utxos_for_address(
165					&self.pool,
166					&address,
167					block,
168					self.db_sync_config.get_tx_in_config().await?,
169				)
170				.await?
171			},
172			None => vec![],
173		};
174		self.convert_utxos_to_candidates(&active_utxos)
175	}
176
177	fn group_candidates_by_mc_pub_key(
178		candidates: Vec<RegisteredCandidate>,
179	) -> HashMap<StakePoolPublicKey, Vec<RegisteredCandidate>> {
180		candidates.into_iter().into_group_map_by(|c| c.stake_pool_pub_key.clone())
181	}
182
183	fn make_registration_data(c: RegisteredCandidate) -> RegistrationData {
184		RegistrationData {
185			registration_utxo: c.registration_utxo,
186			sidechain_signature: c.sidechain_signature,
187			mainchain_signature: c.mainchain_signature,
188			cross_chain_signature: c.cross_chain_signature,
189			sidechain_pub_key: c.sidechain_pub_key,
190			cross_chain_pub_key: c.cross_chain_pub_key,
191			keys: c.keys,
192			utxo_info: c.utxo_info,
193			tx_inputs: c.tx_inputs,
194		}
195	}
196
197	fn make_stake_map(
198		stake_pool_entries: Vec<StakePoolEntry>,
199	) -> HashMap<MainchainKeyHash, StakeDelegation> {
200		stake_pool_entries
201			.into_iter()
202			.map(|e| (MainchainKeyHash(e.pool_hash), StakeDelegation(e.stake.0)))
203			.collect()
204	}
205
206	fn get_stake_delegation(
207		stake_map: &HashMap<MainchainKeyHash, StakeDelegation>,
208		stake_pool_pub_key: &StakePoolPublicKey,
209	) -> Option<StakeDelegation> {
210		if stake_map.is_empty() {
211			None
212		} else {
213			Some(
214				stake_map
215					.get(&MainchainKeyHash::from_vkey(&stake_pool_pub_key.0))
216					.cloned()
217					.unwrap_or(StakeDelegation(0)),
218			)
219		}
220	}
221
222	// Converters
223	fn convert_utxos_to_candidates(
224		&self,
225		outputs: &[MainchainTxOutput],
226	) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
227		Self::parse_candidates(outputs)
228			.into_iter()
229			.map(|c| {
230				match c.datum {
231					RegisterValidatorDatum::V0 {
232						stake_ownership,
233						sidechain_pub_key,
234						sidechain_signature,
235						registration_utxo,
236						own_pkh: _own_pkh,
237						aura_pub_key,
238						grandpa_pub_key,
239					} => Ok(RegisteredCandidate {
240						stake_pool_pub_key: stake_ownership.pub_key,
241						mainchain_signature: stake_ownership.signature,
242						// For now we use the same key for both cross chain and sidechain actions
243						cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
244						cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
245						sidechain_signature,
246						sidechain_pub_key,
247						keys: CandidateKeys(vec![aura_pub_key.into(), grandpa_pub_key.into()]),
248						registration_utxo,
249						tx_inputs: c.tx_inputs,
250						utxo_info: c.utxo_info,
251					}),
252					RegisterValidatorDatum::V1 {
253						stake_ownership,
254						sidechain_pub_key,
255						sidechain_signature,
256						registration_utxo,
257						own_pkh: _own_pkh,
258						keys,
259					} => Ok(RegisteredCandidate {
260						stake_pool_pub_key: stake_ownership.pub_key,
261						mainchain_signature: stake_ownership.signature,
262						// For now we use the same key for both cross chain and sidechain actions
263						cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
264						cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
265						sidechain_signature,
266						sidechain_pub_key,
267						keys,
268						registration_utxo,
269						tx_inputs: c.tx_inputs,
270						utxo_info: c.utxo_info,
271					}),
272				}
273			})
274			.collect()
275	}
276
277	fn parse_candidates(outputs: &[MainchainTxOutput]) -> Vec<ParsedCandidate> {
278		let results: Vec<std::result::Result<ParsedCandidate, String>> = outputs
279			.iter()
280			.map(|output| {
281				let datum = output.datum.clone().ok_or(format!(
282					"Missing registration datum for {:?}",
283					output.clone().utxo_id
284				))?;
285				let register_validator_datum =
286					RegisterValidatorDatum::try_from(datum).map_err(|_| {
287						format!("Invalid registration datum for {:?}", output.clone().utxo_id)
288					})?;
289				Ok(ParsedCandidate {
290					utxo_info: UtxoInfo {
291						utxo_id: output.utxo_id,
292						epoch_number: output.tx_epoch_no.into(),
293						block_number: output.tx_block_no.into(),
294						slot_number: output.tx_slot_no.into(),
295						tx_index_within_block: McTxIndexInBlock(output.tx_index_in_block.0),
296					},
297					datum: register_validator_datum,
298					tx_inputs: output.tx_inputs.clone(),
299				})
300			})
301			.collect();
302		results
303			.into_iter()
304			.filter_map(|r| match r {
305				Ok(candidate) => Some(candidate.clone()),
306				Err(msg) => {
307					error!("{msg}");
308					None
309				},
310			})
311			.collect()
312	}
313
314	fn get_epoch_of_data_storage(
315		&self,
316		epoch_of_data_usage: McEpochNumber,
317	) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
318		offset_data_epoch(&epoch_of_data_usage).map_err(|offset| {
319			BadRequest(format!(
320				"Minimum supported epoch of data usage is {offset}, but {} was provided",
321				epoch_of_data_usage.0
322			))
323			.into()
324		})
325	}
326}