partner_chains_db_sync_data_sources/candidates/
mod.rs1use crate::DataSourceError::*;
3use crate::db_model::{
4 self, Address, Asset, BlockNumber, DbSyncConfigurationProvider, EpochNumber, MainchainTxOutput,
5 StakePoolEntry,
6};
7use crate::metrics::{McFollowerMetrics, observed_async_trait};
8use authority_selection_inherents::*;
9use itertools::Itertools;
10use log::error;
11use partner_chains_plutus_data::{
12 d_param::DParamDatum, permissioned_candidates::PermissionedCandidateDatums,
13 registered_candidates::RegisterValidatorDatum,
14};
15use sidechain_domain::*;
16use sqlx::PgPool;
17use std::collections::HashMap;
18use std::error::Error;
19
20mod cached;
21
22#[cfg(test)]
23mod tests;
24
25#[derive(Clone, Debug)]
26struct ParsedCandidate {
27 utxo_info: UtxoInfo,
28 datum: RegisterValidatorDatum,
29 tx_inputs: Vec<UtxoId>,
30}
31
32#[derive(Debug)]
33struct RegisteredCandidate {
34 stake_pool_pub_key: StakePoolPublicKey,
35 registration_utxo: UtxoId,
36 tx_inputs: Vec<UtxoId>,
37 sidechain_signature: SidechainSignature,
38 mainchain_signature: MainchainSignature,
39 cross_chain_signature: CrossChainSignature,
40 sidechain_pub_key: SidechainPublicKey,
41 cross_chain_pub_key: CrossChainPublicKey,
42 keys: CandidateKeys,
43 utxo_info: UtxoInfo,
44}
45
46pub struct CandidatesDataSourceImpl {
48 pool: PgPool,
50 metrics_opt: Option<McFollowerMetrics>,
52 db_sync_config: DbSyncConfigurationProvider,
54}
55
56observed_async_trait!(
57impl AuthoritySelectionDataSource for CandidatesDataSourceImpl {
58 async fn get_ariadne_parameters(
59 &self,
60 epoch: McEpochNumber,
61 d_parameter_policy: PolicyId,
62 permissioned_candidate_policy: PolicyId
63 ) -> Result<AriadneParameters, Box<dyn std::error::Error + Send + Sync>> {
64 let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
65 let d_parameter_asset = Asset::new(d_parameter_policy);
66 let permissioned_candidate_asset = Asset::new(permissioned_candidate_policy);
67
68 let (candidates_output_opt, d_output_opt) = tokio::try_join!(
69 db_model::get_token_utxo_for_epoch(&self.pool, &permissioned_candidate_asset, epoch),
70 db_model::get_token_utxo_for_epoch(&self.pool, &d_parameter_asset, epoch)
71 )?;
72
73 let d_output = d_output_opt.ok_or(ExpectedDataNotFound("DParameter".to_string()))?;
74
75 let d_datum = d_output
76 .datum
77 .map(|d| d.0)
78 .ok_or(ExpectedDataNotFound("DParameter Datum".to_string()))?;
79
80 let d_parameter = DParamDatum::try_from(d_datum)?.into();
81
82 let permissioned_candidates = match candidates_output_opt {
83 None => None,
84 Some(candidates_output) => {
85 let candidates_datum = candidates_output.datum.ok_or(
86 ExpectedDataNotFound("Permissioned Candidates List Datum".to_string()),
87 )?;
88 Some(PermissionedCandidateDatums::try_from(candidates_datum.0)?.into())
89 },
90 };
91
92 Ok(AriadneParameters { d_parameter, permissioned_candidates })
93 }
94
95 async fn get_candidates(
96 &self,
97 epoch: McEpochNumber,
98 committee_candidate_address: MainchainAddress
99 )-> Result<Vec<CandidateRegistrations>, Box<dyn std::error::Error + Send + Sync>> {
100 let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
101 let candidates = self.get_registered_candidates(epoch, committee_candidate_address).await?;
102 let stake_map = Self::make_stake_map(db_model::get_stake_distribution(&self.pool, epoch).await?);
103 Ok(Self::group_candidates_by_mc_pub_key(candidates).into_iter().map(|(mainchain_pub_key, candidate_registrations)| {
104 CandidateRegistrations {
105 stake_pool_public_key: mainchain_pub_key.clone(),
106 registrations: candidate_registrations.into_iter().map(Self::make_registration_data).collect(),
107 stake_delegation: Self::get_stake_delegation(&stake_map, &mainchain_pub_key),
108 }
109 }).collect())
110 }
111
112 async fn get_epoch_nonce(&self, epoch: McEpochNumber) -> Result<Option<EpochNonce>, Box<dyn std::error::Error + Send + Sync>> {
113 let epoch = self.get_epoch_of_data_storage(epoch)?;
114 let nonce = db_model::get_epoch_nonce(&self.pool, EpochNumber(epoch.0)).await?;
115 Ok(nonce.map(|n| EpochNonce(n.0)))
116 }
117
118 async fn data_epoch(&self, for_epoch: McEpochNumber) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
119 self.get_epoch_of_data_storage(for_epoch)
120 }
121});
122
123impl CandidatesDataSourceImpl {
124 pub async fn new(
126 pool: PgPool,
127 metrics_opt: Option<McFollowerMetrics>,
128 ) -> Result<CandidatesDataSourceImpl, Box<dyn std::error::Error + Send + Sync>> {
129 db_model::create_idx_ma_tx_out_ident(&pool).await?;
130 db_model::create_idx_tx_out_address(&pool).await?;
131 Ok(Self {
132 pool: pool.clone(),
133 metrics_opt,
134 db_sync_config: DbSyncConfigurationProvider::new(pool),
135 })
136 }
137
138 pub fn cached(
140 self,
141 candidates_for_epoch_cache_size: usize,
142 ) -> std::result::Result<cached::CandidateDataSourceCached, Box<dyn Error + Send + Sync>> {
143 cached::CandidateDataSourceCached::new_from_env(self, candidates_for_epoch_cache_size)
144 }
145
146 async fn get_last_block_for_epoch(
148 &self,
149 epoch: EpochNumber,
150 ) -> Result<Option<BlockNumber>, Box<dyn std::error::Error + Send + Sync>> {
151 let block_option = db_model::get_latest_block_for_epoch(&self.pool, epoch).await?;
152 Ok(block_option.map(|b| b.block_no))
153 }
154
155 async fn get_registered_candidates(
156 &self,
157 epoch: EpochNumber,
158 committee_candidate_address: MainchainAddress,
159 ) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
160 let registrations_block_for_epoch = self.get_last_block_for_epoch(epoch).await?;
161 let address: Address = Address(committee_candidate_address.to_string());
162 let active_utxos = match registrations_block_for_epoch {
163 Some(block) => {
164 db_model::get_utxos_for_address(
165 &self.pool,
166 &address,
167 block,
168 self.db_sync_config.get_tx_in_config().await?,
169 )
170 .await?
171 },
172 None => vec![],
173 };
174 self.convert_utxos_to_candidates(&active_utxos)
175 }
176
177 fn group_candidates_by_mc_pub_key(
178 candidates: Vec<RegisteredCandidate>,
179 ) -> HashMap<StakePoolPublicKey, Vec<RegisteredCandidate>> {
180 candidates.into_iter().into_group_map_by(|c| c.stake_pool_pub_key.clone())
181 }
182
183 fn make_registration_data(c: RegisteredCandidate) -> RegistrationData {
184 RegistrationData {
185 registration_utxo: c.registration_utxo,
186 sidechain_signature: c.sidechain_signature,
187 mainchain_signature: c.mainchain_signature,
188 cross_chain_signature: c.cross_chain_signature,
189 sidechain_pub_key: c.sidechain_pub_key,
190 cross_chain_pub_key: c.cross_chain_pub_key,
191 keys: c.keys,
192 utxo_info: c.utxo_info,
193 tx_inputs: c.tx_inputs,
194 }
195 }
196
197 fn make_stake_map(
198 stake_pool_entries: Vec<StakePoolEntry>,
199 ) -> HashMap<MainchainKeyHash, StakeDelegation> {
200 stake_pool_entries
201 .into_iter()
202 .map(|e| (MainchainKeyHash(e.pool_hash), StakeDelegation(e.stake.0)))
203 .collect()
204 }
205
206 fn get_stake_delegation(
207 stake_map: &HashMap<MainchainKeyHash, StakeDelegation>,
208 stake_pool_pub_key: &StakePoolPublicKey,
209 ) -> Option<StakeDelegation> {
210 if stake_map.is_empty() {
211 None
212 } else {
213 Some(
214 stake_map
215 .get(&MainchainKeyHash::from_vkey(&stake_pool_pub_key.0))
216 .cloned()
217 .unwrap_or(StakeDelegation(0)),
218 )
219 }
220 }
221
222 fn convert_utxos_to_candidates(
224 &self,
225 outputs: &[MainchainTxOutput],
226 ) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
227 Self::parse_candidates(outputs)
228 .into_iter()
229 .map(|c| {
230 match c.datum {
231 RegisterValidatorDatum::V0 {
232 stake_ownership,
233 sidechain_pub_key,
234 sidechain_signature,
235 registration_utxo,
236 own_pkh: _own_pkh,
237 aura_pub_key,
238 grandpa_pub_key,
239 } => Ok(RegisteredCandidate {
240 stake_pool_pub_key: stake_ownership.pub_key,
241 mainchain_signature: stake_ownership.signature,
242 cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
244 cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
245 sidechain_signature,
246 sidechain_pub_key,
247 keys: CandidateKeys(vec![aura_pub_key.into(), grandpa_pub_key.into()]),
248 registration_utxo,
249 tx_inputs: c.tx_inputs,
250 utxo_info: c.utxo_info,
251 }),
252 RegisterValidatorDatum::V1 {
253 stake_ownership,
254 sidechain_pub_key,
255 sidechain_signature,
256 registration_utxo,
257 own_pkh: _own_pkh,
258 keys,
259 } => Ok(RegisteredCandidate {
260 stake_pool_pub_key: stake_ownership.pub_key,
261 mainchain_signature: stake_ownership.signature,
262 cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
264 cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
265 sidechain_signature,
266 sidechain_pub_key,
267 keys,
268 registration_utxo,
269 tx_inputs: c.tx_inputs,
270 utxo_info: c.utxo_info,
271 }),
272 }
273 })
274 .collect()
275 }
276
277 fn parse_candidates(outputs: &[MainchainTxOutput]) -> Vec<ParsedCandidate> {
278 let results: Vec<std::result::Result<ParsedCandidate, String>> = outputs
279 .iter()
280 .map(|output| {
281 let datum = output.datum.clone().ok_or(format!(
282 "Missing registration datum for {:?}",
283 output.clone().utxo_id
284 ))?;
285 let register_validator_datum =
286 RegisterValidatorDatum::try_from(datum).map_err(|_| {
287 format!("Invalid registration datum for {:?}", output.clone().utxo_id)
288 })?;
289 Ok(ParsedCandidate {
290 utxo_info: UtxoInfo {
291 utxo_id: output.utxo_id,
292 epoch_number: output.tx_epoch_no.into(),
293 block_number: output.tx_block_no.into(),
294 slot_number: output.tx_slot_no.into(),
295 tx_index_within_block: McTxIndexInBlock(output.tx_index_in_block.0),
296 },
297 datum: register_validator_datum,
298 tx_inputs: output.tx_inputs.clone(),
299 })
300 })
301 .collect();
302 results
303 .into_iter()
304 .filter_map(|r| match r {
305 Ok(candidate) => Some(candidate.clone()),
306 Err(msg) => {
307 error!("{msg}");
308 None
309 },
310 })
311 .collect()
312 }
313
314 fn get_epoch_of_data_storage(
315 &self,
316 epoch_of_data_usage: McEpochNumber,
317 ) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
318 offset_data_epoch(&epoch_of_data_usage).map_err(|offset| {
319 BadRequest(format!(
320 "Minimum supported epoch of data usage is {offset}, but {} was provided",
321 epoch_of_data_usage.0
322 ))
323 .into()
324 })
325 }
326}