partner_chains_db_sync_data_sources/candidates/
mod.rs1use crate::DataSourceError::*;
3use crate::db_model::{
4 self, Address, Asset, BlockNumber, DbSyncConfigurationProvider, EpochNumber, MainchainTxOutput,
5 StakePoolEntry,
6};
7use crate::metrics::McFollowerMetrics;
8use crate::observed_async_trait;
9use authority_selection_inherents::*;
10use itertools::Itertools;
11use log::error;
12use partner_chains_plutus_data::{
13 d_param::DParamDatum, permissioned_candidates::PermissionedCandidateDatums,
14 registered_candidates::RegisterValidatorDatum,
15};
16use sidechain_domain::*;
17use sqlx::PgPool;
18use std::collections::HashMap;
19use std::error::Error;
20
21mod cached;
22
23#[cfg(test)]
24mod tests;
25
26#[derive(Clone, Debug)]
27struct ParsedCandidate {
28 utxo_info: UtxoInfo,
29 datum: RegisterValidatorDatum,
30 tx_inputs: Vec<UtxoId>,
31}
32
33#[derive(Debug)]
34struct RegisteredCandidate {
35 stake_pool_pub_key: StakePoolPublicKey,
36 registration_utxo: UtxoId,
37 tx_inputs: Vec<UtxoId>,
38 sidechain_signature: SidechainSignature,
39 mainchain_signature: MainchainSignature,
40 cross_chain_signature: CrossChainSignature,
41 sidechain_pub_key: SidechainPublicKey,
42 cross_chain_pub_key: CrossChainPublicKey,
43 keys: CandidateKeys,
44 utxo_info: UtxoInfo,
45}
46
47pub struct CandidatesDataSourceImpl {
49 pool: PgPool,
51 metrics_opt: Option<McFollowerMetrics>,
53 db_sync_config: DbSyncConfigurationProvider,
55}
56
57observed_async_trait!(
58impl AuthoritySelectionDataSource for CandidatesDataSourceImpl {
59 async fn get_ariadne_parameters(
60 &self,
61 epoch: McEpochNumber,
62 d_parameter_policy: PolicyId,
63 permissioned_candidate_policy: PolicyId
64 ) -> Result<AriadneParameters, Box<dyn std::error::Error + Send + Sync>> {
65 let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
66 let d_parameter_asset = Asset::new(d_parameter_policy);
67 let permissioned_candidate_asset = Asset::new(permissioned_candidate_policy);
68
69 let (candidates_output_opt, d_output_opt) = tokio::try_join!(
70 db_model::get_token_utxo_for_epoch(&self.pool, &permissioned_candidate_asset, epoch),
71 db_model::get_token_utxo_for_epoch(&self.pool, &d_parameter_asset, epoch)
72 )?;
73
74 let d_output = d_output_opt.ok_or(ExpectedDataNotFound("DParameter".to_string()))?;
75
76 let d_datum = d_output
77 .datum
78 .map(|d| d.0)
79 .ok_or(ExpectedDataNotFound("DParameter Datum".to_string()))?;
80
81 let d_parameter = DParamDatum::try_from(d_datum)?.into();
82
83 let permissioned_candidates = match candidates_output_opt {
84 None => None,
85 Some(candidates_output) => {
86 let candidates_datum = candidates_output.datum.ok_or(
87 ExpectedDataNotFound("Permissioned Candidates List Datum".to_string()),
88 )?;
89 Some(PermissionedCandidateDatums::try_from(candidates_datum.0)?.into())
90 },
91 };
92
93 Ok(AriadneParameters { d_parameter, permissioned_candidates })
94 }
95
96 async fn get_candidates(
97 &self,
98 epoch: McEpochNumber,
99 committee_candidate_address: MainchainAddress
100 )-> Result<Vec<CandidateRegistrations>, Box<dyn std::error::Error + Send + Sync>> {
101 let epoch = EpochNumber::from(self.get_epoch_of_data_storage(epoch)?);
102 let candidates = self.get_registered_candidates(epoch, committee_candidate_address).await?;
103 let stake_map = Self::make_stake_map(db_model::get_stake_distribution(&self.pool, epoch).await?);
104 Ok(Self::group_candidates_by_mc_pub_key(candidates).into_iter().map(|(mainchain_pub_key, candidate_registrations)| {
105 CandidateRegistrations {
106 stake_pool_public_key: mainchain_pub_key.clone(),
107 registrations: candidate_registrations.into_iter().map(Self::make_registration_data).collect(),
108 stake_delegation: Self::get_stake_delegation(&stake_map, &mainchain_pub_key),
109 }
110 }).collect())
111 }
112
113 async fn get_epoch_nonce(&self, epoch: McEpochNumber) -> Result<Option<EpochNonce>, Box<dyn std::error::Error + Send + Sync>> {
114 let epoch = self.get_epoch_of_data_storage(epoch)?;
115 let nonce = db_model::get_epoch_nonce(&self.pool, EpochNumber(epoch.0)).await?;
116 Ok(nonce.map(|n| EpochNonce(n.0)))
117 }
118
119 async fn data_epoch(&self, for_epoch: McEpochNumber) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
120 self.get_epoch_of_data_storage(for_epoch)
121 }
122});
123
124impl CandidatesDataSourceImpl {
125 pub async fn new(
127 pool: PgPool,
128 metrics_opt: Option<McFollowerMetrics>,
129 ) -> Result<CandidatesDataSourceImpl, Box<dyn std::error::Error + Send + Sync>> {
130 db_model::create_idx_ma_tx_out_ident(&pool).await?;
131 db_model::create_idx_tx_out_address(&pool).await?;
132 Ok(Self {
133 pool: pool.clone(),
134 metrics_opt,
135 db_sync_config: DbSyncConfigurationProvider::new(pool),
136 })
137 }
138
139 pub fn cached(
141 self,
142 candidates_for_epoch_cache_size: usize,
143 ) -> std::result::Result<cached::CandidateDataSourceCached, Box<dyn Error + Send + Sync>> {
144 cached::CandidateDataSourceCached::new_from_env(self, candidates_for_epoch_cache_size)
145 }
146
147 async fn get_last_block_for_epoch(
149 &self,
150 epoch: EpochNumber,
151 ) -> Result<Option<BlockNumber>, Box<dyn std::error::Error + Send + Sync>> {
152 let block_option = db_model::get_latest_block_for_epoch(&self.pool, epoch).await?;
153 Ok(block_option.map(|b| b.block_no))
154 }
155
156 async fn get_registered_candidates(
157 &self,
158 epoch: EpochNumber,
159 committee_candidate_address: MainchainAddress,
160 ) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
161 let registrations_block_for_epoch = self.get_last_block_for_epoch(epoch).await?;
162 let address: Address = Address(committee_candidate_address.to_string());
163 let active_utxos = match registrations_block_for_epoch {
164 Some(block) => {
165 db_model::get_utxos_for_address(
166 &self.pool,
167 &address,
168 block,
169 self.db_sync_config.get_tx_in_config().await?,
170 )
171 .await?
172 },
173 None => vec![],
174 };
175 self.convert_utxos_to_candidates(&active_utxos)
176 }
177
178 fn group_candidates_by_mc_pub_key(
179 candidates: Vec<RegisteredCandidate>,
180 ) -> HashMap<StakePoolPublicKey, Vec<RegisteredCandidate>> {
181 candidates.into_iter().into_group_map_by(|c| c.stake_pool_pub_key.clone())
182 }
183
184 fn make_registration_data(c: RegisteredCandidate) -> RegistrationData {
185 RegistrationData {
186 registration_utxo: c.registration_utxo,
187 sidechain_signature: c.sidechain_signature,
188 mainchain_signature: c.mainchain_signature,
189 cross_chain_signature: c.cross_chain_signature,
190 sidechain_pub_key: c.sidechain_pub_key,
191 cross_chain_pub_key: c.cross_chain_pub_key,
192 keys: c.keys,
193 utxo_info: c.utxo_info,
194 tx_inputs: c.tx_inputs,
195 }
196 }
197
198 fn make_stake_map(
199 stake_pool_entries: Vec<StakePoolEntry>,
200 ) -> HashMap<MainchainKeyHash, StakeDelegation> {
201 stake_pool_entries
202 .into_iter()
203 .map(|e| (MainchainKeyHash(e.pool_hash), StakeDelegation(e.stake.0)))
204 .collect()
205 }
206
207 fn get_stake_delegation(
208 stake_map: &HashMap<MainchainKeyHash, StakeDelegation>,
209 stake_pool_pub_key: &StakePoolPublicKey,
210 ) -> Option<StakeDelegation> {
211 if stake_map.is_empty() {
212 None
213 } else {
214 Some(
215 stake_map
216 .get(&MainchainKeyHash::from_vkey(&stake_pool_pub_key.0))
217 .cloned()
218 .unwrap_or(StakeDelegation(0)),
219 )
220 }
221 }
222
223 fn convert_utxos_to_candidates(
225 &self,
226 outputs: &[MainchainTxOutput],
227 ) -> Result<Vec<RegisteredCandidate>, Box<dyn std::error::Error + Send + Sync>> {
228 Self::parse_candidates(outputs)
229 .into_iter()
230 .map(|c| {
231 match c.datum {
232 RegisterValidatorDatum::V0 {
233 stake_ownership,
234 sidechain_pub_key,
235 sidechain_signature,
236 registration_utxo,
237 own_pkh: _own_pkh,
238 aura_pub_key,
239 grandpa_pub_key,
240 } => Ok(RegisteredCandidate {
241 stake_pool_pub_key: stake_ownership.pub_key,
242 mainchain_signature: stake_ownership.signature,
243 cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
245 cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
246 sidechain_signature,
247 sidechain_pub_key,
248 keys: CandidateKeys(vec![aura_pub_key.into(), grandpa_pub_key.into()]),
249 registration_utxo,
250 tx_inputs: c.tx_inputs,
251 utxo_info: c.utxo_info,
252 }),
253 RegisterValidatorDatum::V1 {
254 stake_ownership,
255 sidechain_pub_key,
256 sidechain_signature,
257 registration_utxo,
258 own_pkh: _own_pkh,
259 keys,
260 } => Ok(RegisteredCandidate {
261 stake_pool_pub_key: stake_ownership.pub_key,
262 mainchain_signature: stake_ownership.signature,
263 cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
265 cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
266 sidechain_signature,
267 sidechain_pub_key,
268 keys,
269 registration_utxo,
270 tx_inputs: c.tx_inputs,
271 utxo_info: c.utxo_info,
272 }),
273 }
274 })
275 .collect()
276 }
277
278 fn parse_candidates(outputs: &[MainchainTxOutput]) -> Vec<ParsedCandidate> {
279 let results: Vec<std::result::Result<ParsedCandidate, String>> = outputs
280 .iter()
281 .map(|output| {
282 let datum = output.datum.clone().ok_or(format!(
283 "Missing registration datum for {:?}",
284 output.clone().utxo_id
285 ))?;
286 let register_validator_datum =
287 RegisterValidatorDatum::try_from(datum).map_err(|_| {
288 format!("Invalid registration datum for {:?}", output.clone().utxo_id)
289 })?;
290 Ok(ParsedCandidate {
291 utxo_info: UtxoInfo {
292 utxo_id: output.utxo_id,
293 epoch_number: output.tx_epoch_no.into(),
294 block_number: output.tx_block_no.into(),
295 slot_number: output.tx_slot_no.into(),
296 tx_index_within_block: McTxIndexInBlock(output.tx_index_in_block.0),
297 },
298 datum: register_validator_datum,
299 tx_inputs: output.tx_inputs.clone(),
300 })
301 })
302 .collect();
303 results
304 .into_iter()
305 .filter_map(|r| match r {
306 Ok(candidate) => Some(candidate.clone()),
307 Err(msg) => {
308 error!("{msg}");
309 None
310 },
311 })
312 .collect()
313 }
314
315 fn get_epoch_of_data_storage(
316 &self,
317 epoch_of_data_usage: McEpochNumber,
318 ) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
319 offset_data_epoch(&epoch_of_data_usage).map_err(|offset| {
320 BadRequest(format!(
321 "Minimum supported epoch of data usage is {offset}, but {} was provided",
322 epoch_of_data_usage.0
323 ))
324 .into()
325 })
326 }
327}