partner_chains_db_sync_data_sources/candidates/
mod.rs1use crate::DataSourceError::*;
3use crate::db_model::{
4 self, Address, Asset, BlockNumber, EpochNumber, MainchainTxOutput, StakePoolEntry,
5};
6use crate::metrics::McFollowerMetrics;
7use crate::observed_async_trait;
8use authority_selection_inherents::*;
9use itertools::Itertools;
10use log::error;
11use partner_chains_plutus_data::{
12 d_param::DParamDatum, permissioned_candidates::PermissionedCandidateDatums,
13 registered_candidates::RegisterValidatorDatum,
14};
15use sidechain_domain::*;
16use sqlx::PgPool;
17use std::collections::HashMap;
18use std::error::Error;
19
20mod cached;
21
22#[cfg(test)]
23mod tests;
24
25#[derive(Clone, Debug)]
26struct ParsedCandidate {
27 utxo_info: UtxoInfo,
28 datum: RegisterValidatorDatum,
29 tx_inputs: Vec<UtxoId>,
30}
31
32#[derive(Debug)]
33struct RegisteredCandidate {
34 stake_pool_pub_key: StakePoolPublicKey,
35 registration_utxo: UtxoId,
36 tx_inputs: Vec<UtxoId>,
37 sidechain_signature: SidechainSignature,
38 mainchain_signature: MainchainSignature,
39 cross_chain_signature: CrossChainSignature,
40 sidechain_pub_key: SidechainPublicKey,
41 cross_chain_pub_key: CrossChainPublicKey,
42 keys: CandidateKeys,
43 utxo_info: UtxoInfo,
44}
45
46pub struct CandidatesDataSourceImpl {
48 pool: PgPool,
50 metrics_opt: Option<McFollowerMetrics>,
52}
53
54observed_async_trait!(
55impl AuthoritySelectionDataSource for CandidatesDataSourceImpl {
56 async fn get_ariadne_parameters(
57 &self,
58 epoch: McEpochNumber,
59 d_parameter_policy: PolicyId,
60 permissioned_candidate_policy: PolicyId
61 ) -> Result<AriadneParameters, Box<dyn std::error::Error + Send + Sync>> {
62 let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
63 let d_parameter_asset = Asset::new(d_parameter_policy);
64 let permissioned_candidate_asset = Asset::new(permissioned_candidate_policy);
65
66 let (candidates_output_opt, d_output_opt) = tokio::try_join!(
67 db_model::get_token_utxo_for_epoch(&self.pool, &permissioned_candidate_asset, epoch),
68 db_model::get_token_utxo_for_epoch(&self.pool, &d_parameter_asset, epoch)
69 )?;
70
71 let d_output = d_output_opt.ok_or(ExpectedDataNotFound("DParameter".to_string()))?;
72
73 let d_datum = d_output
74 .datum
75 .map(|d| d.0)
76 .ok_or(ExpectedDataNotFound("DParameter Datum".to_string()))?;
77
78 let d_parameter = DParamDatum::try_from(d_datum)?.into();
79
80 let permissioned_candidates = match candidates_output_opt {
81 None => None,
82 Some(candidates_output) => {
83 let candidates_datum = candidates_output.datum.ok_or(
84 ExpectedDataNotFound("Permissioned Candidates List Datum".to_string()),
85 )?;
86 Some(PermissionedCandidateDatums::try_from(candidates_datum.0)?.into())
87 },
88 };
89
90 Ok(AriadneParameters { d_parameter, permissioned_candidates })
91 }
92
93 async fn get_candidates(
94 &self,
95 epoch: McEpochNumber,
96 committee_candidate_address: MainchainAddress
97 )-> Result<Vec<CandidateRegistrations>, Box<dyn std::error::Error + Send + Sync>> {
98 let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
99 let candidates = self.get_registered_candidates(epoch, committee_candidate_address).await?;
100 let stake_map = Self::make_stake_map(db_model::get_stake_distribution(&self.pool, epoch).await?);
101 Ok(Self::group_candidates_by_mc_pub_key(candidates).into_iter().map(|(mainchain_pub_key, candidate_registrations)| {
102 CandidateRegistrations {
103 stake_pool_public_key: mainchain_pub_key.clone(),
104 registrations: candidate_registrations.into_iter().map(Self::make_registration_data).collect(),
105 stake_delegation: Self::get_stake_delegation(&stake_map, &mainchain_pub_key),
106 }
107 }).collect())
108 }
109
110 async fn get_epoch_nonce(&self, epoch: McEpochNumber) -> Result<Option<EpochNonce>, Box<dyn std::error::Error + Send + Sync>> {
111 let epoch = self.get_epoch_of_data_storage(epoch)?;
112 let nonce = db_model::get_epoch_nonce(&self.pool, EpochNumber(epoch.0)).await?;
113 Ok(nonce.map(|n| EpochNonce(n.0)))
114 }
115
116 async fn data_epoch(&self, for_epoch: McEpochNumber) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
117 self.get_epoch_of_data_storage(for_epoch)
118 }
119});
120
121impl CandidatesDataSourceImpl {
122 pub async fn new(
124 pool: PgPool,
125 metrics_opt: Option<McFollowerMetrics>,
126 ) -> Result<CandidatesDataSourceImpl, Box<dyn std::error::Error + Send + Sync>> {
127 db_model::create_idx_ma_tx_out_ident(&pool).await?;
128 db_model::create_idx_tx_out_address(&pool).await?;
129 Ok(Self { pool, metrics_opt })
130 }
131
132 pub fn cached(
134 self,
135 candidates_for_epoch_cache_size: usize,
136 ) -> std::result::Result<cached::CandidateDataSourceCached, Box<dyn Error + Send + Sync>> {
137 cached::CandidateDataSourceCached::new_from_env(self, candidates_for_epoch_cache_size)
138 }
139
140 async fn get_last_block_for_epoch(
142 &self,
143 epoch: EpochNumber,
144 ) -> Result<Option<BlockNumber>, Box<dyn std::error::Error + Send + Sync>> {
145 let block_option = db_model::get_latest_block_for_epoch(&self.pool, epoch).await?;
146 Ok(block_option.map(|b| b.block_no))
147 }
148
149 async fn get_registered_candidates(
150 &self,
151 epoch: EpochNumber,
152 committee_candidate_address: MainchainAddress,
153 ) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
154 let registrations_block_for_epoch = self.get_last_block_for_epoch(epoch).await?;
155 let address: Address = Address(committee_candidate_address.to_string());
156 let active_utxos = match registrations_block_for_epoch {
157 Some(block) => db_model::get_utxos_for_address(&self.pool, &address, block).await?,
158 None => vec![],
159 };
160 self.convert_utxos_to_candidates(&active_utxos)
161 }
162
163 fn group_candidates_by_mc_pub_key(
164 candidates: Vec<RegisteredCandidate>,
165 ) -> HashMap<StakePoolPublicKey, Vec<RegisteredCandidate>> {
166 candidates.into_iter().into_group_map_by(|c| c.stake_pool_pub_key.clone())
167 }
168
169 fn make_registration_data(c: RegisteredCandidate) -> RegistrationData {
170 RegistrationData {
171 registration_utxo: c.registration_utxo,
172 sidechain_signature: c.sidechain_signature,
173 mainchain_signature: c.mainchain_signature,
174 cross_chain_signature: c.cross_chain_signature,
175 sidechain_pub_key: c.sidechain_pub_key,
176 cross_chain_pub_key: c.cross_chain_pub_key,
177 keys: c.keys,
178 utxo_info: c.utxo_info,
179 tx_inputs: c.tx_inputs,
180 }
181 }
182
183 fn make_stake_map(
184 stake_pool_entries: Vec<StakePoolEntry>,
185 ) -> HashMap<MainchainKeyHash, StakeDelegation> {
186 stake_pool_entries
187 .into_iter()
188 .map(|e| (MainchainKeyHash(e.pool_hash), StakeDelegation(e.stake.0)))
189 .collect()
190 }
191
192 fn get_stake_delegation(
193 stake_map: &HashMap<MainchainKeyHash, StakeDelegation>,
194 stake_pool_pub_key: &StakePoolPublicKey,
195 ) -> Option<StakeDelegation> {
196 if stake_map.is_empty() {
197 None
198 } else {
199 Some(
200 stake_map
201 .get(&MainchainKeyHash::from_vkey(&stake_pool_pub_key.0))
202 .cloned()
203 .unwrap_or(StakeDelegation(0)),
204 )
205 }
206 }
207
208 fn convert_utxos_to_candidates(
210 &self,
211 outputs: &[MainchainTxOutput],
212 ) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
213 Self::parse_candidates(outputs)
214 .into_iter()
215 .map(|c| {
216 match c.datum {
217 RegisterValidatorDatum::V0 {
218 stake_ownership,
219 sidechain_pub_key,
220 sidechain_signature,
221 registration_utxo,
222 own_pkh: _own_pkh,
223 aura_pub_key,
224 grandpa_pub_key,
225 } => Ok(RegisteredCandidate {
226 stake_pool_pub_key: stake_ownership.pub_key,
227 mainchain_signature: stake_ownership.signature,
228 cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
230 cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
231 sidechain_signature,
232 sidechain_pub_key,
233 keys: CandidateKeys(vec![aura_pub_key.into(), grandpa_pub_key.into()]),
234 registration_utxo,
235 tx_inputs: c.tx_inputs,
236 utxo_info: c.utxo_info,
237 }),
238 RegisterValidatorDatum::V1 {
239 stake_ownership,
240 sidechain_pub_key,
241 sidechain_signature,
242 registration_utxo,
243 own_pkh: _own_pkh,
244 keys,
245 } => Ok(RegisteredCandidate {
246 stake_pool_pub_key: stake_ownership.pub_key,
247 mainchain_signature: stake_ownership.signature,
248 cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
250 cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
251 sidechain_signature,
252 sidechain_pub_key,
253 keys,
254 registration_utxo,
255 tx_inputs: c.tx_inputs,
256 utxo_info: c.utxo_info,
257 }),
258 }
259 })
260 .collect()
261 }
262
263 fn parse_candidates(outputs: &[MainchainTxOutput]) -> Vec<ParsedCandidate> {
264 let results: Vec<std::result::Result<ParsedCandidate, String>> = outputs
265 .iter()
266 .map(|output| {
267 let datum = output.datum.clone().ok_or(format!(
268 "Missing registration datum for {:?}",
269 output.clone().utxo_id
270 ))?;
271 let register_validator_datum =
272 RegisterValidatorDatum::try_from(datum).map_err(|_| {
273 format!("Invalid registration datum for {:?}", output.clone().utxo_id)
274 })?;
275 Ok(ParsedCandidate {
276 utxo_info: UtxoInfo {
277 utxo_id: output.utxo_id,
278 epoch_number: output.tx_epoch_no.into(),
279 block_number: output.tx_block_no.into(),
280 slot_number: output.tx_slot_no.into(),
281 tx_index_within_block: McTxIndexInBlock(output.tx_index_in_block.0),
282 },
283 datum: register_validator_datum,
284 tx_inputs: output.tx_inputs.clone(),
285 })
286 })
287 .collect();
288 results
289 .into_iter()
290 .filter_map(|r| match r {
291 Ok(candidate) => Some(candidate.clone()),
292 Err(msg) => {
293 error!("{msg}");
294 None
295 },
296 })
297 .collect()
298 }
299
300 fn get_epoch_of_data_storage(
301 &self,
302 epoch_of_data_usage: McEpochNumber,
303 ) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
304 offset_data_epoch(&epoch_of_data_usage).map_err(|offset| {
305 BadRequest(format!(
306 "Minimum supported epoch of data usage is {offset}, but {} was provided",
307 epoch_of_data_usage.0
308 ))
309 .into()
310 })
311 }
312}