partner_chains_db_sync_data_sources/candidates/
mod.rs

1//! Db-Sync data source used by Partner Chain committee selection
2use crate::DataSourceError::*;
3use crate::db_model::{
4	self, Address, Asset, BlockNumber, DbSyncConfigurationProvider, EpochNumber, MainchainTxOutput,
5	StakePoolEntry,
6};
7use crate::metrics::McFollowerMetrics;
8use crate::observed_async_trait;
9use authority_selection_inherents::*;
10use itertools::Itertools;
11use log::error;
12use partner_chains_plutus_data::{
13	d_param::DParamDatum, permissioned_candidates::PermissionedCandidateDatums,
14	registered_candidates::RegisterValidatorDatum,
15};
16use sidechain_domain::*;
17use sqlx::PgPool;
18use std::collections::HashMap;
19use std::error::Error;
20
21mod cached;
22
23#[cfg(test)]
24mod tests;
25
26#[derive(Clone, Debug)]
27struct ParsedCandidate {
28	utxo_info: UtxoInfo,
29	datum: RegisterValidatorDatum,
30	tx_inputs: Vec<UtxoId>,
31}
32
33#[derive(Debug)]
34struct RegisteredCandidate {
35	stake_pool_pub_key: StakePoolPublicKey,
36	registration_utxo: UtxoId,
37	tx_inputs: Vec<UtxoId>,
38	sidechain_signature: SidechainSignature,
39	mainchain_signature: MainchainSignature,
40	cross_chain_signature: CrossChainSignature,
41	sidechain_pub_key: SidechainPublicKey,
42	cross_chain_pub_key: CrossChainPublicKey,
43	keys: CandidateKeys,
44	utxo_info: UtxoInfo,
45}
46
47/// Db-Sync data source serving data for Partner Chain committee selection
48pub struct CandidatesDataSourceImpl {
49	/// Postgres connection pool
50	pool: PgPool,
51	/// Prometheus metrics client
52	metrics_opt: Option<McFollowerMetrics>,
53	/// Configuration used by Db-Sync
54	db_sync_config: DbSyncConfigurationProvider,
55}
56
57observed_async_trait!(
58impl AuthoritySelectionDataSource for CandidatesDataSourceImpl {
59	async fn get_ariadne_parameters(
60			&self,
61			epoch: McEpochNumber,
62			d_parameter_policy: PolicyId,
63			permissioned_candidate_policy: PolicyId
64	) -> Result<AriadneParameters, Box<dyn std::error::Error + Send + Sync>> {
65		let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
66		let d_parameter_asset = Asset::new(d_parameter_policy);
67		let permissioned_candidate_asset = Asset::new(permissioned_candidate_policy);
68
69		let (candidates_output_opt, d_output_opt) = tokio::try_join!(
70			db_model::get_token_utxo_for_epoch(&self.pool, &permissioned_candidate_asset, epoch),
71			db_model::get_token_utxo_for_epoch(&self.pool, &d_parameter_asset, epoch)
72		)?;
73
74		let d_output = d_output_opt.ok_or(ExpectedDataNotFound("DParameter".to_string()))?;
75
76		let d_datum = d_output
77			.datum
78			.map(|d| d.0)
79			.ok_or(ExpectedDataNotFound("DParameter Datum".to_string()))?;
80
81		let d_parameter = DParamDatum::try_from(d_datum)?.into();
82
83		let permissioned_candidates = match candidates_output_opt {
84			None => None,
85			Some(candidates_output) => {
86				let candidates_datum = candidates_output.datum.ok_or(
87					ExpectedDataNotFound("Permissioned Candidates List Datum".to_string()),
88				)?;
89				Some(PermissionedCandidateDatums::try_from(candidates_datum.0)?.into())
90			},
91		};
92
93		Ok(AriadneParameters { d_parameter, permissioned_candidates })
94	}
95
96	async fn get_candidates(
97			&self,
98			epoch: McEpochNumber,
99			committee_candidate_address: MainchainAddress
100	)-> Result<Vec<CandidateRegistrations>, Box<dyn std::error::Error + Send + Sync>> {
101		let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
102		let candidates = self.get_registered_candidates(epoch, committee_candidate_address).await?;
103		let stake_map = Self::make_stake_map(db_model::get_stake_distribution(&self.pool, epoch).await?);
104		Ok(Self::group_candidates_by_mc_pub_key(candidates).into_iter().map(|(mainchain_pub_key, candidate_registrations)| {
105			CandidateRegistrations {
106				stake_pool_public_key: mainchain_pub_key.clone(),
107				registrations: candidate_registrations.into_iter().map(Self::make_registration_data).collect(),
108				stake_delegation: Self::get_stake_delegation(&stake_map, &mainchain_pub_key),
109			}
110		}).collect())
111	}
112
113	async fn get_epoch_nonce(&self, epoch: McEpochNumber) -> Result<Option<EpochNonce>, Box<dyn std::error::Error + Send + Sync>> {
114		let epoch = self.get_epoch_of_data_storage(epoch)?;
115		let nonce = db_model::get_epoch_nonce(&self.pool, EpochNumber(epoch.0)).await?;
116		Ok(nonce.map(|n| EpochNonce(n.0)))
117	}
118
119	async fn data_epoch(&self, for_epoch: McEpochNumber) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
120		self.get_epoch_of_data_storage(for_epoch)
121	}
122});
123
124impl CandidatesDataSourceImpl {
125	/// Creates new instance of the data source
126	pub async fn new(
127		pool: PgPool,
128		metrics_opt: Option<McFollowerMetrics>,
129	) -> Result<CandidatesDataSourceImpl, Box<dyn std::error::Error + Send + Sync>> {
130		db_model::create_idx_ma_tx_out_ident(&pool).await?;
131		db_model::create_idx_tx_out_address(&pool).await?;
132		Ok(Self {
133			pool: pool.clone(),
134			metrics_opt,
135			db_sync_config: DbSyncConfigurationProvider::new(pool),
136		})
137	}
138
139	/// Creates a new caching instance of the data source
140	pub fn cached(
141		self,
142		candidates_for_epoch_cache_size: usize,
143	) -> std::result::Result<cached::CandidateDataSourceCached, Box<dyn Error + Send + Sync>> {
144		cached::CandidateDataSourceCached::new_from_env(self, candidates_for_epoch_cache_size)
145	}
146
147	/// Registrations state up to this block are considered as "active", after it - as "pending".
148	async fn get_last_block_for_epoch(
149		&self,
150		epoch: EpochNumber,
151	) -> Result<Option<BlockNumber>, Box<dyn std::error::Error + Send + Sync>> {
152		let block_option = db_model::get_latest_block_for_epoch(&self.pool, epoch).await?;
153		Ok(block_option.map(|b| b.block_no))
154	}
155
156	async fn get_registered_candidates(
157		&self,
158		epoch: EpochNumber,
159		committee_candidate_address: MainchainAddress,
160	) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
161		let registrations_block_for_epoch = self.get_last_block_for_epoch(epoch).await?;
162		let address: Address = Address(committee_candidate_address.to_string());
163		let active_utxos = match registrations_block_for_epoch {
164			Some(block) => {
165				db_model::get_utxos_for_address(
166					&self.pool,
167					&address,
168					block,
169					self.db_sync_config.get_tx_in_config().await?,
170				)
171				.await?
172			},
173			None => vec![],
174		};
175		self.convert_utxos_to_candidates(&active_utxos)
176	}
177
178	fn group_candidates_by_mc_pub_key(
179		candidates: Vec<RegisteredCandidate>,
180	) -> HashMap<StakePoolPublicKey, Vec<RegisteredCandidate>> {
181		candidates.into_iter().into_group_map_by(|c| c.stake_pool_pub_key.clone())
182	}
183
184	fn make_registration_data(c: RegisteredCandidate) -> RegistrationData {
185		RegistrationData {
186			registration_utxo: c.registration_utxo,
187			sidechain_signature: c.sidechain_signature,
188			mainchain_signature: c.mainchain_signature,
189			cross_chain_signature: c.cross_chain_signature,
190			sidechain_pub_key: c.sidechain_pub_key,
191			cross_chain_pub_key: c.cross_chain_pub_key,
192			keys: c.keys,
193			utxo_info: c.utxo_info,
194			tx_inputs: c.tx_inputs,
195		}
196	}
197
198	fn make_stake_map(
199		stake_pool_entries: Vec<StakePoolEntry>,
200	) -> HashMap<MainchainKeyHash, StakeDelegation> {
201		stake_pool_entries
202			.into_iter()
203			.map(|e| (MainchainKeyHash(e.pool_hash), StakeDelegation(e.stake.0)))
204			.collect()
205	}
206
207	fn get_stake_delegation(
208		stake_map: &HashMap<MainchainKeyHash, StakeDelegation>,
209		stake_pool_pub_key: &StakePoolPublicKey,
210	) -> Option<StakeDelegation> {
211		if stake_map.is_empty() {
212			None
213		} else {
214			Some(
215				stake_map
216					.get(&MainchainKeyHash::from_vkey(&stake_pool_pub_key.0))
217					.cloned()
218					.unwrap_or(StakeDelegation(0)),
219			)
220		}
221	}
222
223	// Converters
224	fn convert_utxos_to_candidates(
225		&self,
226		outputs: &[MainchainTxOutput],
227	) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
228		Self::parse_candidates(outputs)
229			.into_iter()
230			.map(|c| {
231				match c.datum {
232					RegisterValidatorDatum::V0 {
233						stake_ownership,
234						sidechain_pub_key,
235						sidechain_signature,
236						registration_utxo,
237						own_pkh: _own_pkh,
238						aura_pub_key,
239						grandpa_pub_key,
240					} => Ok(RegisteredCandidate {
241						stake_pool_pub_key: stake_ownership.pub_key,
242						mainchain_signature: stake_ownership.signature,
243						// For now we use the same key for both cross chain and sidechain actions
244						cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
245						cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
246						sidechain_signature,
247						sidechain_pub_key,
248						keys: CandidateKeys(vec![aura_pub_key.into(), grandpa_pub_key.into()]),
249						registration_utxo,
250						tx_inputs: c.tx_inputs,
251						utxo_info: c.utxo_info,
252					}),
253					RegisterValidatorDatum::V1 {
254						stake_ownership,
255						sidechain_pub_key,
256						sidechain_signature,
257						registration_utxo,
258						own_pkh: _own_pkh,
259						keys,
260					} => Ok(RegisteredCandidate {
261						stake_pool_pub_key: stake_ownership.pub_key,
262						mainchain_signature: stake_ownership.signature,
263						// For now we use the same key for both cross chain and sidechain actions
264						cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
265						cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
266						sidechain_signature,
267						sidechain_pub_key,
268						keys,
269						registration_utxo,
270						tx_inputs: c.tx_inputs,
271						utxo_info: c.utxo_info,
272					}),
273				}
274			})
275			.collect()
276	}
277
278	fn parse_candidates(outputs: &[MainchainTxOutput]) -> Vec<ParsedCandidate> {
279		let results: Vec<std::result::Result<ParsedCandidate, String>> = outputs
280			.iter()
281			.map(|output| {
282				let datum = output.datum.clone().ok_or(format!(
283					"Missing registration datum for {:?}",
284					output.clone().utxo_id
285				))?;
286				let register_validator_datum =
287					RegisterValidatorDatum::try_from(datum).map_err(|_| {
288						format!("Invalid registration datum for {:?}", output.clone().utxo_id)
289					})?;
290				Ok(ParsedCandidate {
291					utxo_info: UtxoInfo {
292						utxo_id: output.utxo_id,
293						epoch_number: output.tx_epoch_no.into(),
294						block_number: output.tx_block_no.into(),
295						slot_number: output.tx_slot_no.into(),
296						tx_index_within_block: McTxIndexInBlock(output.tx_index_in_block.0),
297					},
298					datum: register_validator_datum,
299					tx_inputs: output.tx_inputs.clone(),
300				})
301			})
302			.collect();
303		results
304			.into_iter()
305			.filter_map(|r| match r {
306				Ok(candidate) => Some(candidate.clone()),
307				Err(msg) => {
308					error!("{msg}");
309					None
310				},
311			})
312			.collect()
313	}
314
315	fn get_epoch_of_data_storage(
316		&self,
317		epoch_of_data_usage: McEpochNumber,
318	) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
319		offset_data_epoch(&epoch_of_data_usage).map_err(|offset| {
320			BadRequest(format!(
321				"Minimum supported epoch of data usage is {offset}, but {} was provided",
322				epoch_of_data_usage.0
323			))
324			.into()
325		})
326	}
327}