partner_chains_db_sync_data_sources/candidates/
mod.rs

1//! Db-Sync data source used by Partner Chain committee selection
2use crate::DataSourceError::*;
3use crate::db_model::{
4	self, Address, Asset, BlockNumber, EpochNumber, MainchainTxOutput, StakePoolEntry,
5};
6use crate::metrics::McFollowerMetrics;
7use crate::observed_async_trait;
8use authority_selection_inherents::authority_selection_inputs::*;
9use datum::raw_permissioned_candidate_data_vec_from;
10use itertools::Itertools;
11use log::error;
12use partner_chains_plutus_data::{
13	d_param::DParamDatum, permissioned_candidates::PermissionedCandidateDatums,
14	registered_candidates::RegisterValidatorDatum,
15};
16use sidechain_domain::*;
17use sqlx::PgPool;
18use std::collections::HashMap;
19use std::error::Error;
20
21mod cached;
22mod datum;
23
24#[cfg(test)]
25mod tests;
26
27#[derive(Clone, Debug)]
28struct ParsedCandidate {
29	utxo_info: UtxoInfo,
30	datum: RegisterValidatorDatum,
31	tx_inputs: Vec<UtxoId>,
32}
33
34#[derive(Debug)]
35struct RegisteredCandidate {
36	stake_pool_pub_key: StakePoolPublicKey,
37	registration_utxo: UtxoId,
38	tx_inputs: Vec<UtxoId>,
39	sidechain_signature: SidechainSignature,
40	mainchain_signature: MainchainSignature,
41	cross_chain_signature: CrossChainSignature,
42	sidechain_pub_key: SidechainPublicKey,
43	cross_chain_pub_key: CrossChainPublicKey,
44	aura_pub_key: AuraPublicKey,
45	grandpa_pub_key: GrandpaPublicKey,
46	utxo_info: UtxoInfo,
47}
48
49/// Db-Sync data source serving data for Partner Chain committee selection
50pub struct CandidatesDataSourceImpl {
51	/// Postgres connection pool
52	pool: PgPool,
53	/// Prometheus metrics client
54	metrics_opt: Option<McFollowerMetrics>,
55}
56
57observed_async_trait!(
58impl AuthoritySelectionDataSource for CandidatesDataSourceImpl {
59	async fn get_ariadne_parameters(
60			&self,
61			epoch: McEpochNumber,
62			d_parameter_policy: PolicyId,
63			permissioned_candidate_policy: PolicyId
64	) -> Result<AriadneParameters, Box<dyn std::error::Error + Send + Sync>> {
65		let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
66		let d_parameter_asset = Asset::new(d_parameter_policy);
67		let permissioned_candidate_asset = Asset::new(permissioned_candidate_policy);
68
69		let (candidates_output_opt, d_output_opt) = tokio::try_join!(
70			db_model::get_token_utxo_for_epoch(&self.pool, &permissioned_candidate_asset, epoch),
71			db_model::get_token_utxo_for_epoch(&self.pool, &d_parameter_asset, epoch)
72		)?;
73
74		let d_output = d_output_opt.ok_or(ExpectedDataNotFound("DParameter".to_string()))?;
75
76		let d_datum = d_output
77			.datum
78			.map(|d| d.0)
79			.ok_or(ExpectedDataNotFound("DParameter Datum".to_string()))?;
80
81		let d_parameter = DParamDatum::try_from(d_datum)?.into();
82
83		let permissioned_candidates = match candidates_output_opt {
84			None => None,
85			Some(candidates_output) => {
86				let candidates_datum = candidates_output.datum.ok_or(
87					ExpectedDataNotFound("Permissioned Candidates List Datum".to_string()),
88				)?;
89
90				Some(raw_permissioned_candidate_data_vec_from(
91					PermissionedCandidateDatums::try_from(candidates_datum.0)?
92				))
93			},
94		};
95
96		Ok(AriadneParameters { d_parameter, permissioned_candidates })
97	}
98
99	async fn get_candidates(
100			&self,
101			epoch: McEpochNumber,
102			committee_candidate_address: MainchainAddress
103	)-> Result<Vec<CandidateRegistrations>, Box<dyn std::error::Error + Send + Sync>> {
104		let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
105		let candidates = self.get_registered_candidates(epoch, committee_candidate_address).await?;
106		let stake_map = Self::make_stake_map(db_model::get_stake_distribution(&self.pool, epoch).await?);
107		Ok(Self::group_candidates_by_mc_pub_key(candidates).into_iter().map(|(mainchain_pub_key, candidate_registrations)| {
108			CandidateRegistrations {
109				stake_pool_public_key: mainchain_pub_key.clone(),
110				registrations: candidate_registrations.into_iter().map(Self::make_registration_data).collect(),
111				stake_delegation: Self::get_stake_delegation(&stake_map, &mainchain_pub_key),
112			}
113		}).collect())
114	}
115
116	async fn get_epoch_nonce(&self, epoch: McEpochNumber) -> Result<Option<EpochNonce>, Box<dyn std::error::Error + Send + Sync>> {
117		let epoch = self.get_epoch_of_data_storage(epoch)?;
118		let nonce = db_model::get_epoch_nonce(&self.pool, EpochNumber(epoch.0)).await?;
119		Ok(nonce.map(|n| EpochNonce(n.0)))
120	}
121
122	async fn data_epoch(&self, for_epoch: McEpochNumber) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
123		self.get_epoch_of_data_storage(for_epoch)
124	}
125});
126
127impl CandidatesDataSourceImpl {
128	/// Creates new instance of the data source
129	pub async fn new(
130		pool: PgPool,
131		metrics_opt: Option<McFollowerMetrics>,
132	) -> Result<CandidatesDataSourceImpl, Box<dyn std::error::Error + Send + Sync>> {
133		db_model::create_idx_ma_tx_out_ident(&pool).await?;
134		db_model::create_idx_tx_out_address(&pool).await?;
135		Ok(Self { pool, metrics_opt })
136	}
137
138	/// Creates a new caching instance of the data source
139	pub fn cached(
140		self,
141		candidates_for_epoch_cache_size: usize,
142	) -> std::result::Result<cached::CandidateDataSourceCached, Box<dyn Error + Send + Sync>> {
143		cached::CandidateDataSourceCached::new_from_env(self, candidates_for_epoch_cache_size)
144	}
145
146	/// Registrations state up to this block are considered as "active", after it - as "pending".
147	async fn get_last_block_for_epoch(
148		&self,
149		epoch: EpochNumber,
150	) -> Result<Option<BlockNumber>, Box<dyn std::error::Error + Send + Sync>> {
151		let block_option = db_model::get_latest_block_for_epoch(&self.pool, epoch).await?;
152		Ok(block_option.map(|b| b.block_no))
153	}
154
155	async fn get_registered_candidates(
156		&self,
157		epoch: EpochNumber,
158		committee_candidate_address: MainchainAddress,
159	) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
160		let registrations_block_for_epoch = self.get_last_block_for_epoch(epoch).await?;
161		let address: Address = Address(committee_candidate_address.to_string());
162		let active_utxos = match registrations_block_for_epoch {
163			Some(block) => db_model::get_utxos_for_address(&self.pool, &address, block).await?,
164			None => vec![],
165		};
166		self.convert_utxos_to_candidates(&active_utxos)
167	}
168
169	fn group_candidates_by_mc_pub_key(
170		candidates: Vec<RegisteredCandidate>,
171	) -> HashMap<StakePoolPublicKey, Vec<RegisteredCandidate>> {
172		candidates.into_iter().into_group_map_by(|c| c.stake_pool_pub_key.clone())
173	}
174
175	fn make_registration_data(c: RegisteredCandidate) -> RegistrationData {
176		RegistrationData {
177			registration_utxo: c.registration_utxo,
178			sidechain_signature: c.sidechain_signature,
179			mainchain_signature: c.mainchain_signature,
180			cross_chain_signature: c.cross_chain_signature,
181			sidechain_pub_key: c.sidechain_pub_key,
182			cross_chain_pub_key: c.cross_chain_pub_key,
183			aura_pub_key: c.aura_pub_key,
184			grandpa_pub_key: c.grandpa_pub_key,
185			utxo_info: c.utxo_info,
186			tx_inputs: c.tx_inputs,
187		}
188	}
189
190	fn make_stake_map(
191		stake_pool_entries: Vec<StakePoolEntry>,
192	) -> HashMap<MainchainKeyHash, StakeDelegation> {
193		stake_pool_entries
194			.into_iter()
195			.map(|e| (MainchainKeyHash(e.pool_hash), StakeDelegation(e.stake.0)))
196			.collect()
197	}
198
199	fn get_stake_delegation(
200		stake_map: &HashMap<MainchainKeyHash, StakeDelegation>,
201		stake_pool_pub_key: &StakePoolPublicKey,
202	) -> Option<StakeDelegation> {
203		if stake_map.is_empty() {
204			None
205		} else {
206			Some(
207				stake_map
208					.get(&MainchainKeyHash::from_vkey(&stake_pool_pub_key.0))
209					.cloned()
210					.unwrap_or(StakeDelegation(0)),
211			)
212		}
213	}
214
215	// Converters
216	fn convert_utxos_to_candidates(
217		&self,
218		outputs: &[MainchainTxOutput],
219	) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
220		Self::parse_candidates(outputs)
221			.into_iter()
222			.map(|c| {
223				let RegisterValidatorDatum::V0 {
224					stake_ownership,
225					sidechain_pub_key,
226					sidechain_signature,
227					registration_utxo,
228					own_pkh: _own_pkh,
229					aura_pub_key,
230					grandpa_pub_key,
231				} = c.datum;
232				Ok(RegisteredCandidate {
233					stake_pool_pub_key: stake_ownership.pub_key,
234					mainchain_signature: stake_ownership.signature,
235					// For now we use the same key for both cross chain and sidechain actions
236					cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
237					cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
238					sidechain_signature,
239					sidechain_pub_key,
240					aura_pub_key,
241					grandpa_pub_key,
242					registration_utxo,
243					tx_inputs: c.tx_inputs,
244					utxo_info: c.utxo_info,
245				})
246			})
247			.collect()
248	}
249
250	fn parse_candidates(outputs: &[MainchainTxOutput]) -> Vec<ParsedCandidate> {
251		let results: Vec<std::result::Result<ParsedCandidate, String>> = outputs
252			.iter()
253			.map(|output| {
254				let datum = output.datum.clone().ok_or(format!(
255					"Missing registration datum for {:?}",
256					output.clone().utxo_id
257				))?;
258				let register_validator_datum =
259					RegisterValidatorDatum::try_from(datum).map_err(|_| {
260						format!("Invalid registration datum for {:?}", output.clone().utxo_id)
261					})?;
262				Ok(ParsedCandidate {
263					utxo_info: UtxoInfo {
264						utxo_id: output.utxo_id,
265						epoch_number: output.tx_epoch_no.into(),
266						block_number: output.tx_block_no.into(),
267						slot_number: output.tx_slot_no.into(),
268						tx_index_within_block: McTxIndexInBlock(output.tx_index_in_block.0),
269					},
270					datum: register_validator_datum,
271					tx_inputs: output.tx_inputs.clone(),
272				})
273			})
274			.collect();
275		results
276			.into_iter()
277			.filter_map(|r| match r {
278				Ok(candidate) => Some(candidate.clone()),
279				Err(msg) => {
280					error!("{msg}");
281					None
282				},
283			})
284			.collect()
285	}
286
287	fn get_epoch_of_data_storage(
288		&self,
289		epoch_of_data_usage: McEpochNumber,
290	) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
291		offset_data_epoch(&epoch_of_data_usage).map_err(|offset| {
292			BadRequest(format!(
293				"Minimum supported epoch of data usage is {offset}, but {} was provided",
294				epoch_of_data_usage.0
295			))
296			.into()
297		})
298	}
299}