partner_chains_db_sync_data_sources/candidates/
mod.rs1use crate::DataSourceError::*;
3use crate::db_model::{
4 self, Address, Asset, BlockNumber, EpochNumber, MainchainTxOutput, StakePoolEntry,
5};
6use crate::metrics::McFollowerMetrics;
7use crate::observed_async_trait;
8use authority_selection_inherents::authority_selection_inputs::*;
9use datum::raw_permissioned_candidate_data_vec_from;
10use itertools::Itertools;
11use log::error;
12use partner_chains_plutus_data::{
13 d_param::DParamDatum, permissioned_candidates::PermissionedCandidateDatums,
14 registered_candidates::RegisterValidatorDatum,
15};
16use sidechain_domain::*;
17use sqlx::PgPool;
18use std::collections::HashMap;
19use std::error::Error;
20
21mod cached;
22mod datum;
23
24#[cfg(test)]
25mod tests;
26
27#[derive(Clone, Debug)]
28struct ParsedCandidate {
29 utxo_info: UtxoInfo,
30 datum: RegisterValidatorDatum,
31 tx_inputs: Vec<UtxoId>,
32}
33
34#[derive(Debug)]
35struct RegisteredCandidate {
36 stake_pool_pub_key: StakePoolPublicKey,
37 registration_utxo: UtxoId,
38 tx_inputs: Vec<UtxoId>,
39 sidechain_signature: SidechainSignature,
40 mainchain_signature: MainchainSignature,
41 cross_chain_signature: CrossChainSignature,
42 sidechain_pub_key: SidechainPublicKey,
43 cross_chain_pub_key: CrossChainPublicKey,
44 aura_pub_key: AuraPublicKey,
45 grandpa_pub_key: GrandpaPublicKey,
46 utxo_info: UtxoInfo,
47}
48
49pub struct CandidatesDataSourceImpl {
51 pool: PgPool,
53 metrics_opt: Option<McFollowerMetrics>,
55}
56
57observed_async_trait!(
58impl AuthoritySelectionDataSource for CandidatesDataSourceImpl {
59 async fn get_ariadne_parameters(
60 &self,
61 epoch: McEpochNumber,
62 d_parameter_policy: PolicyId,
63 permissioned_candidate_policy: PolicyId
64 ) -> Result<AriadneParameters, Box<dyn std::error::Error + Send + Sync>> {
65 let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
66 let d_parameter_asset = Asset::new(d_parameter_policy);
67 let permissioned_candidate_asset = Asset::new(permissioned_candidate_policy);
68
69 let (candidates_output_opt, d_output_opt) = tokio::try_join!(
70 db_model::get_token_utxo_for_epoch(&self.pool, &permissioned_candidate_asset, epoch),
71 db_model::get_token_utxo_for_epoch(&self.pool, &d_parameter_asset, epoch)
72 )?;
73
74 let d_output = d_output_opt.ok_or(ExpectedDataNotFound("DParameter".to_string()))?;
75
76 let d_datum = d_output
77 .datum
78 .map(|d| d.0)
79 .ok_or(ExpectedDataNotFound("DParameter Datum".to_string()))?;
80
81 let d_parameter = DParamDatum::try_from(d_datum)?.into();
82
83 let permissioned_candidates = match candidates_output_opt {
84 None => None,
85 Some(candidates_output) => {
86 let candidates_datum = candidates_output.datum.ok_or(
87 ExpectedDataNotFound("Permissioned Candidates List Datum".to_string()),
88 )?;
89
90 Some(raw_permissioned_candidate_data_vec_from(
91 PermissionedCandidateDatums::try_from(candidates_datum.0)?
92 ))
93 },
94 };
95
96 Ok(AriadneParameters { d_parameter, permissioned_candidates })
97 }
98
99 async fn get_candidates(
100 &self,
101 epoch: McEpochNumber,
102 committee_candidate_address: MainchainAddress
103 )-> Result<Vec<CandidateRegistrations>, Box<dyn std::error::Error + Send + Sync>> {
104 let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
105 let candidates = self.get_registered_candidates(epoch, committee_candidate_address).await?;
106 let stake_map = Self::make_stake_map(db_model::get_stake_distribution(&self.pool, epoch).await?);
107 Ok(Self::group_candidates_by_mc_pub_key(candidates).into_iter().map(|(mainchain_pub_key, candidate_registrations)| {
108 CandidateRegistrations {
109 stake_pool_public_key: mainchain_pub_key.clone(),
110 registrations: candidate_registrations.into_iter().map(Self::make_registration_data).collect(),
111 stake_delegation: Self::get_stake_delegation(&stake_map, &mainchain_pub_key),
112 }
113 }).collect())
114 }
115
116 async fn get_epoch_nonce(&self, epoch: McEpochNumber) -> Result<Option<EpochNonce>, Box<dyn std::error::Error + Send + Sync>> {
117 let epoch = self.get_epoch_of_data_storage(epoch)?;
118 let nonce = db_model::get_epoch_nonce(&self.pool, EpochNumber(epoch.0)).await?;
119 Ok(nonce.map(|n| EpochNonce(n.0)))
120 }
121
122 async fn data_epoch(&self, for_epoch: McEpochNumber) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
123 self.get_epoch_of_data_storage(for_epoch)
124 }
125});
126
127impl CandidatesDataSourceImpl {
128 pub async fn new(
130 pool: PgPool,
131 metrics_opt: Option<McFollowerMetrics>,
132 ) -> Result<CandidatesDataSourceImpl, Box<dyn std::error::Error + Send + Sync>> {
133 db_model::create_idx_ma_tx_out_ident(&pool).await?;
134 db_model::create_idx_tx_out_address(&pool).await?;
135 Ok(Self { pool, metrics_opt })
136 }
137
138 pub fn cached(
140 self,
141 candidates_for_epoch_cache_size: usize,
142 ) -> std::result::Result<cached::CandidateDataSourceCached, Box<dyn Error + Send + Sync>> {
143 cached::CandidateDataSourceCached::new_from_env(self, candidates_for_epoch_cache_size)
144 }
145
146 async fn get_last_block_for_epoch(
148 &self,
149 epoch: EpochNumber,
150 ) -> Result<Option<BlockNumber>, Box<dyn std::error::Error + Send + Sync>> {
151 let block_option = db_model::get_latest_block_for_epoch(&self.pool, epoch).await?;
152 Ok(block_option.map(|b| b.block_no))
153 }
154
155 async fn get_registered_candidates(
156 &self,
157 epoch: EpochNumber,
158 committee_candidate_address: MainchainAddress,
159 ) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
160 let registrations_block_for_epoch = self.get_last_block_for_epoch(epoch).await?;
161 let address: Address = Address(committee_candidate_address.to_string());
162 let active_utxos = match registrations_block_for_epoch {
163 Some(block) => db_model::get_utxos_for_address(&self.pool, &address, block).await?,
164 None => vec![],
165 };
166 self.convert_utxos_to_candidates(&active_utxos)
167 }
168
169 fn group_candidates_by_mc_pub_key(
170 candidates: Vec<RegisteredCandidate>,
171 ) -> HashMap<StakePoolPublicKey, Vec<RegisteredCandidate>> {
172 candidates.into_iter().into_group_map_by(|c| c.stake_pool_pub_key.clone())
173 }
174
175 fn make_registration_data(c: RegisteredCandidate) -> RegistrationData {
176 RegistrationData {
177 registration_utxo: c.registration_utxo,
178 sidechain_signature: c.sidechain_signature,
179 mainchain_signature: c.mainchain_signature,
180 cross_chain_signature: c.cross_chain_signature,
181 sidechain_pub_key: c.sidechain_pub_key,
182 cross_chain_pub_key: c.cross_chain_pub_key,
183 aura_pub_key: c.aura_pub_key,
184 grandpa_pub_key: c.grandpa_pub_key,
185 utxo_info: c.utxo_info,
186 tx_inputs: c.tx_inputs,
187 }
188 }
189
190 fn make_stake_map(
191 stake_pool_entries: Vec<StakePoolEntry>,
192 ) -> HashMap<MainchainKeyHash, StakeDelegation> {
193 stake_pool_entries
194 .into_iter()
195 .map(|e| (MainchainKeyHash(e.pool_hash), StakeDelegation(e.stake.0)))
196 .collect()
197 }
198
199 fn get_stake_delegation(
200 stake_map: &HashMap<MainchainKeyHash, StakeDelegation>,
201 stake_pool_pub_key: &StakePoolPublicKey,
202 ) -> Option<StakeDelegation> {
203 if stake_map.is_empty() {
204 None
205 } else {
206 Some(
207 stake_map
208 .get(&MainchainKeyHash::from_vkey(&stake_pool_pub_key.0))
209 .cloned()
210 .unwrap_or(StakeDelegation(0)),
211 )
212 }
213 }
214
215 fn convert_utxos_to_candidates(
217 &self,
218 outputs: &[MainchainTxOutput],
219 ) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
220 Self::parse_candidates(outputs)
221 .into_iter()
222 .map(|c| {
223 let RegisterValidatorDatum::V0 {
224 stake_ownership,
225 sidechain_pub_key,
226 sidechain_signature,
227 registration_utxo,
228 own_pkh: _own_pkh,
229 aura_pub_key,
230 grandpa_pub_key,
231 } = c.datum;
232 Ok(RegisteredCandidate {
233 stake_pool_pub_key: stake_ownership.pub_key,
234 mainchain_signature: stake_ownership.signature,
235 cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
237 cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
238 sidechain_signature,
239 sidechain_pub_key,
240 aura_pub_key,
241 grandpa_pub_key,
242 registration_utxo,
243 tx_inputs: c.tx_inputs,
244 utxo_info: c.utxo_info,
245 })
246 })
247 .collect()
248 }
249
250 fn parse_candidates(outputs: &[MainchainTxOutput]) -> Vec<ParsedCandidate> {
251 let results: Vec<std::result::Result<ParsedCandidate, String>> = outputs
252 .iter()
253 .map(|output| {
254 let datum = output.datum.clone().ok_or(format!(
255 "Missing registration datum for {:?}",
256 output.clone().utxo_id
257 ))?;
258 let register_validator_datum =
259 RegisterValidatorDatum::try_from(datum).map_err(|_| {
260 format!("Invalid registration datum for {:?}", output.clone().utxo_id)
261 })?;
262 Ok(ParsedCandidate {
263 utxo_info: UtxoInfo {
264 utxo_id: output.utxo_id,
265 epoch_number: output.tx_epoch_no.into(),
266 block_number: output.tx_block_no.into(),
267 slot_number: output.tx_slot_no.into(),
268 tx_index_within_block: McTxIndexInBlock(output.tx_index_in_block.0),
269 },
270 datum: register_validator_datum,
271 tx_inputs: output.tx_inputs.clone(),
272 })
273 })
274 .collect();
275 results
276 .into_iter()
277 .filter_map(|r| match r {
278 Ok(candidate) => Some(candidate.clone()),
279 Err(msg) => {
280 error!("{msg}");
281 None
282 },
283 })
284 .collect()
285 }
286
287 fn get_epoch_of_data_storage(
288 &self,
289 epoch_of_data_usage: McEpochNumber,
290 ) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
291 offset_data_epoch(&epoch_of_data_usage).map_err(|offset| {
292 BadRequest(format!(
293 "Minimum supported epoch of data usage is {offset}, but {} was provided",
294 epoch_of_data_usage.0
295 ))
296 .into()
297 })
298 }
299}