partner_chains_db_sync_data_sources/candidates/
mod.rs

1//! Db-Sync data source used by Partner Chain committee selection
2use crate::DataSourceError::*;
3use crate::db_model::{
4	self, Address, Asset, BlockNumber, EpochNumber, MainchainTxOutput, StakePoolEntry,
5};
6use crate::metrics::McFollowerMetrics;
7use crate::observed_async_trait;
8use authority_selection_inherents::*;
9use itertools::Itertools;
10use log::error;
11use partner_chains_plutus_data::{
12	d_param::DParamDatum, permissioned_candidates::PermissionedCandidateDatums,
13	registered_candidates::RegisterValidatorDatum,
14};
15use sidechain_domain::*;
16use sqlx::PgPool;
17use std::collections::HashMap;
18use std::error::Error;
19
20mod cached;
21
22#[cfg(test)]
23mod tests;
24
25#[derive(Clone, Debug)]
26struct ParsedCandidate {
27	utxo_info: UtxoInfo,
28	datum: RegisterValidatorDatum,
29	tx_inputs: Vec<UtxoId>,
30}
31
32#[derive(Debug)]
33struct RegisteredCandidate {
34	stake_pool_pub_key: StakePoolPublicKey,
35	registration_utxo: UtxoId,
36	tx_inputs: Vec<UtxoId>,
37	sidechain_signature: SidechainSignature,
38	mainchain_signature: MainchainSignature,
39	cross_chain_signature: CrossChainSignature,
40	sidechain_pub_key: SidechainPublicKey,
41	cross_chain_pub_key: CrossChainPublicKey,
42	keys: CandidateKeys,
43	utxo_info: UtxoInfo,
44}
45
46/// Db-Sync data source serving data for Partner Chain committee selection
47pub struct CandidatesDataSourceImpl {
48	/// Postgres connection pool
49	pool: PgPool,
50	/// Prometheus metrics client
51	metrics_opt: Option<McFollowerMetrics>,
52}
53
54observed_async_trait!(
55impl AuthoritySelectionDataSource for CandidatesDataSourceImpl {
56	async fn get_ariadne_parameters(
57			&self,
58			epoch: McEpochNumber,
59			d_parameter_policy: PolicyId,
60			permissioned_candidate_policy: PolicyId
61	) -> Result<AriadneParameters, Box<dyn std::error::Error + Send + Sync>> {
62		let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
63		let d_parameter_asset = Asset::new(d_parameter_policy);
64		let permissioned_candidate_asset = Asset::new(permissioned_candidate_policy);
65
66		let (candidates_output_opt, d_output_opt) = tokio::try_join!(
67			db_model::get_token_utxo_for_epoch(&self.pool, &permissioned_candidate_asset, epoch),
68			db_model::get_token_utxo_for_epoch(&self.pool, &d_parameter_asset, epoch)
69		)?;
70
71		let d_output = d_output_opt.ok_or(ExpectedDataNotFound("DParameter".to_string()))?;
72
73		let d_datum = d_output
74			.datum
75			.map(|d| d.0)
76			.ok_or(ExpectedDataNotFound("DParameter Datum".to_string()))?;
77
78		let d_parameter = DParamDatum::try_from(d_datum)?.into();
79
80		let permissioned_candidates = match candidates_output_opt {
81			None => None,
82			Some(candidates_output) => {
83				let candidates_datum = candidates_output.datum.ok_or(
84					ExpectedDataNotFound("Permissioned Candidates List Datum".to_string()),
85				)?;
86				Some(PermissionedCandidateDatums::try_from(candidates_datum.0)?.into())
87			},
88		};
89
90		Ok(AriadneParameters { d_parameter, permissioned_candidates })
91	}
92
93	async fn get_candidates(
94			&self,
95			epoch: McEpochNumber,
96			committee_candidate_address: MainchainAddress
97	)-> Result<Vec<CandidateRegistrations>, Box<dyn std::error::Error + Send + Sync>> {
98		let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
99		let candidates = self.get_registered_candidates(epoch, committee_candidate_address).await?;
100		let stake_map = Self::make_stake_map(db_model::get_stake_distribution(&self.pool, epoch).await?);
101		Ok(Self::group_candidates_by_mc_pub_key(candidates).into_iter().map(|(mainchain_pub_key, candidate_registrations)| {
102			CandidateRegistrations {
103				stake_pool_public_key: mainchain_pub_key.clone(),
104				registrations: candidate_registrations.into_iter().map(Self::make_registration_data).collect(),
105				stake_delegation: Self::get_stake_delegation(&stake_map, &mainchain_pub_key),
106			}
107		}).collect())
108	}
109
110	async fn get_epoch_nonce(&self, epoch: McEpochNumber) -> Result<Option<EpochNonce>, Box<dyn std::error::Error + Send + Sync>> {
111		let epoch = self.get_epoch_of_data_storage(epoch)?;
112		let nonce = db_model::get_epoch_nonce(&self.pool, EpochNumber(epoch.0)).await?;
113		Ok(nonce.map(|n| EpochNonce(n.0)))
114	}
115
116	async fn data_epoch(&self, for_epoch: McEpochNumber) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
117		self.get_epoch_of_data_storage(for_epoch)
118	}
119});
120
121impl CandidatesDataSourceImpl {
122	/// Creates new instance of the data source
123	pub async fn new(
124		pool: PgPool,
125		metrics_opt: Option<McFollowerMetrics>,
126	) -> Result<CandidatesDataSourceImpl, Box<dyn std::error::Error + Send + Sync>> {
127		db_model::create_idx_ma_tx_out_ident(&pool).await?;
128		db_model::create_idx_tx_out_address(&pool).await?;
129		Ok(Self { pool, metrics_opt })
130	}
131
132	/// Creates a new caching instance of the data source
133	pub fn cached(
134		self,
135		candidates_for_epoch_cache_size: usize,
136	) -> std::result::Result<cached::CandidateDataSourceCached, Box<dyn Error + Send + Sync>> {
137		cached::CandidateDataSourceCached::new_from_env(self, candidates_for_epoch_cache_size)
138	}
139
140	/// Registrations state up to this block are considered as "active", after it - as "pending".
141	async fn get_last_block_for_epoch(
142		&self,
143		epoch: EpochNumber,
144	) -> Result<Option<BlockNumber>, Box<dyn std::error::Error + Send + Sync>> {
145		let block_option = db_model::get_latest_block_for_epoch(&self.pool, epoch).await?;
146		Ok(block_option.map(|b| b.block_no))
147	}
148
149	async fn get_registered_candidates(
150		&self,
151		epoch: EpochNumber,
152		committee_candidate_address: MainchainAddress,
153	) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
154		let registrations_block_for_epoch = self.get_last_block_for_epoch(epoch).await?;
155		let address: Address = Address(committee_candidate_address.to_string());
156		let active_utxos = match registrations_block_for_epoch {
157			Some(block) => db_model::get_utxos_for_address(&self.pool, &address, block).await?,
158			None => vec![],
159		};
160		self.convert_utxos_to_candidates(&active_utxos)
161	}
162
163	fn group_candidates_by_mc_pub_key(
164		candidates: Vec<RegisteredCandidate>,
165	) -> HashMap<StakePoolPublicKey, Vec<RegisteredCandidate>> {
166		candidates.into_iter().into_group_map_by(|c| c.stake_pool_pub_key.clone())
167	}
168
169	fn make_registration_data(c: RegisteredCandidate) -> RegistrationData {
170		RegistrationData {
171			registration_utxo: c.registration_utxo,
172			sidechain_signature: c.sidechain_signature,
173			mainchain_signature: c.mainchain_signature,
174			cross_chain_signature: c.cross_chain_signature,
175			sidechain_pub_key: c.sidechain_pub_key,
176			cross_chain_pub_key: c.cross_chain_pub_key,
177			keys: c.keys,
178			utxo_info: c.utxo_info,
179			tx_inputs: c.tx_inputs,
180		}
181	}
182
183	fn make_stake_map(
184		stake_pool_entries: Vec<StakePoolEntry>,
185	) -> HashMap<MainchainKeyHash, StakeDelegation> {
186		stake_pool_entries
187			.into_iter()
188			.map(|e| (MainchainKeyHash(e.pool_hash), StakeDelegation(e.stake.0)))
189			.collect()
190	}
191
192	fn get_stake_delegation(
193		stake_map: &HashMap<MainchainKeyHash, StakeDelegation>,
194		stake_pool_pub_key: &StakePoolPublicKey,
195	) -> Option<StakeDelegation> {
196		if stake_map.is_empty() {
197			None
198		} else {
199			Some(
200				stake_map
201					.get(&MainchainKeyHash::from_vkey(&stake_pool_pub_key.0))
202					.cloned()
203					.unwrap_or(StakeDelegation(0)),
204			)
205		}
206	}
207
208	// Converters
209	fn convert_utxos_to_candidates(
210		&self,
211		outputs: &[MainchainTxOutput],
212	) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
213		Self::parse_candidates(outputs)
214			.into_iter()
215			.map(|c| {
216				match c.datum {
217					RegisterValidatorDatum::V0 {
218						stake_ownership,
219						sidechain_pub_key,
220						sidechain_signature,
221						registration_utxo,
222						own_pkh: _own_pkh,
223						aura_pub_key,
224						grandpa_pub_key,
225					} => Ok(RegisteredCandidate {
226						stake_pool_pub_key: stake_ownership.pub_key,
227						mainchain_signature: stake_ownership.signature,
228						// For now we use the same key for both cross chain and sidechain actions
229						cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
230						cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
231						sidechain_signature,
232						sidechain_pub_key,
233						keys: CandidateKeys(vec![aura_pub_key.into(), grandpa_pub_key.into()]),
234						registration_utxo,
235						tx_inputs: c.tx_inputs,
236						utxo_info: c.utxo_info,
237					}),
238					RegisterValidatorDatum::V1 {
239						stake_ownership,
240						sidechain_pub_key,
241						sidechain_signature,
242						registration_utxo,
243						own_pkh: _own_pkh,
244						keys,
245					} => Ok(RegisteredCandidate {
246						stake_pool_pub_key: stake_ownership.pub_key,
247						mainchain_signature: stake_ownership.signature,
248						// For now we use the same key for both cross chain and sidechain actions
249						cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
250						cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
251						sidechain_signature,
252						sidechain_pub_key,
253						keys,
254						registration_utxo,
255						tx_inputs: c.tx_inputs,
256						utxo_info: c.utxo_info,
257					}),
258				}
259			})
260			.collect()
261	}
262
263	fn parse_candidates(outputs: &[MainchainTxOutput]) -> Vec<ParsedCandidate> {
264		let results: Vec<std::result::Result<ParsedCandidate, String>> = outputs
265			.iter()
266			.map(|output| {
267				let datum = output.datum.clone().ok_or(format!(
268					"Missing registration datum for {:?}",
269					output.clone().utxo_id
270				))?;
271				let register_validator_datum =
272					RegisterValidatorDatum::try_from(datum).map_err(|_| {
273						format!("Invalid registration datum for {:?}", output.clone().utxo_id)
274					})?;
275				Ok(ParsedCandidate {
276					utxo_info: UtxoInfo {
277						utxo_id: output.utxo_id,
278						epoch_number: output.tx_epoch_no.into(),
279						block_number: output.tx_block_no.into(),
280						slot_number: output.tx_slot_no.into(),
281						tx_index_within_block: McTxIndexInBlock(output.tx_index_in_block.0),
282					},
283					datum: register_validator_datum,
284					tx_inputs: output.tx_inputs.clone(),
285				})
286			})
287			.collect();
288		results
289			.into_iter()
290			.filter_map(|r| match r {
291				Ok(candidate) => Some(candidate.clone()),
292				Err(msg) => {
293					error!("{msg}");
294					None
295				},
296			})
297			.collect()
298	}
299
300	fn get_epoch_of_data_storage(
301		&self,
302		epoch_of_data_usage: McEpochNumber,
303	) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
304		offset_data_epoch(&epoch_of_data_usage).map_err(|offset| {
305			BadRequest(format!(
306				"Minimum supported epoch of data usage is {offset}, but {} was provided",
307				epoch_of_data_usage.0
308			))
309			.into()
310		})
311	}
312}