partner_chains_dolos_data_sources/
candidate.rs1use std::collections::HashMap;
2
3use crate::{client::*, *};
4use async_trait::async_trait;
5use authority_selection_inherents::*;
6use blockfrost_openapi::models::{
7 address_utxo_content_inner::AddressUtxoContentInner,
8 asset_transactions_inner::AssetTransactionsInner, block_content::BlockContent,
9 pool_list_extended_inner::PoolListExtendedInner,
10};
11use cardano_serialization_lib::PlutusData;
12use futures::StreamExt;
13use itertools::Itertools;
14use partner_chains_plutus_data::{
15 d_param::DParamDatum, permissioned_candidates::PermissionedCandidateDatums,
16 registered_candidates::RegisterValidatorDatum,
17};
18use sidechain_domain::*;
19
20pub struct AuthoritySelectionDataSourceImpl {
21 client: MiniBFClient,
22}
23
24impl AuthoritySelectionDataSourceImpl {
25 pub fn new(client: MiniBFClient) -> Self {
26 Self { client }
27 }
28}
29
30async fn get_token_utxo_datum_for_epoch(
31 client: &impl MiniBFApi,
32 policy_id: PolicyId,
33 epoch: McEpochNumber,
34) -> Result<Option<cardano_serialization_lib::PlutusData>> {
35 let asset_utxos = client
36 .assets_transactions(AssetId {
37 policy_id: policy_id.clone(),
38 asset_name: AssetName::empty(),
39 })
40 .await?;
41 let pred = |x: AssetTransactionsInner| async move {
42 let tx_hash = McTxHash::from_hex_unsafe(&x.tx_hash);
43 let tx = client.transaction_by_hash(tx_hash).await?;
44 let block = client.blocks_by_id(McBlockNumber(tx.block_height as u32)).await?;
45 Result::Ok(if block.epoch <= Some(epoch.0 as i32) { Some(tx_hash) } else { None })
46 };
47 let futures = asset_utxos.into_iter().map(|item| async move { pred(item.clone()).await });
48 let tx_hash = futures::future::try_join_all(futures)
49 .await?
50 .into_iter()
51 .flatten()
52 .collect::<Vec<McTxHash>>()
53 .first()
54 .ok_or("No policy utxo found after epoch")?
55 .to_owned();
56
57 let datum = client.transactions_utxos(tx_hash).await?.outputs.iter().find_map(|o| {
58 if o.amount.iter().any(|a| a.unit == &policy_id.to_hex_string()[2..]) {
60 o.inline_datum.clone()
61 } else {
62 None
63 }
64 });
65 Ok(match datum {
66 Some(datum) => Some(PlutusData::from_hex(&datum)?),
67 None => None,
68 })
69}
70
71#[async_trait]
72impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImpl {
73 async fn get_ariadne_parameters(
74 &self,
75 epoch_number: McEpochNumber,
76 d_parameter_policy: PolicyId,
77 permissioned_candidate_policy: PolicyId,
78 ) -> Result<AriadneParameters> {
79 let epoch = self.get_epoch_of_data_storage(epoch_number)?;
80
81 let (candidates_output_opt, d_output_opt) = tokio::try_join!(
82 get_token_utxo_datum_for_epoch(&self.client, permissioned_candidate_policy, epoch),
83 get_token_utxo_datum_for_epoch(&self.client, d_parameter_policy, epoch)
84 )?;
85
86 let d_datum = d_output_opt
87 .ok_or(DataSourceError::ExpectedDataNotFound("DParameter Datum".to_string()))?;
88
89 let d_parameter = DParamDatum::try_from(d_datum)?.into();
90
91 let permissioned_candidates = match candidates_output_opt {
92 None => None,
93 Some(candidates_datum) => {
94 Some(PermissionedCandidateDatums::try_from(candidates_datum)?.into())
95 },
96 };
97
98 Ok(AriadneParameters { d_parameter, permissioned_candidates })
99 }
100
101 async fn get_candidates(
102 &self,
103 epoch_number: McEpochNumber,
104 committee_candidate_address: MainchainAddress,
105 ) -> Result<Vec<CandidateRegistrations>> {
106 let epoch = self.get_epoch_of_data_storage(epoch_number)?;
107 let candidates = self.get_registered_candidates(epoch, committee_candidate_address).await?;
108 let pools = self.client.pools_extended().await?;
109 let pred = |pool: PoolListExtendedInner| async move {
110 let pool_id = mckeyhash_from_bech32(&pool.pool_id)?;
111 let history = self.client.pools_history(pool_id).await?;
112 Result::Ok(match history.into_iter().find(|h| h.epoch == epoch.0 as i32) {
113 Some(e) => Some((pool_id, StakeDelegation(e.active_stake.parse::<u64>()?))),
114 None => None,
115 })
116 };
117
118 let futures = pools.into_iter().map(|item| async move { pred(item.clone()).await });
119 let stake_map: HashMap<MainchainKeyHash, StakeDelegation> =
120 futures::future::try_join_all(futures).await?.into_iter().flatten().collect();
121
122 Ok(candidates
123 .into_iter()
124 .into_group_map_by(|c| c.stake_pool_pub_key.clone())
125 .into_iter()
126 .map(|(mainchain_pub_key, candidate_registrations)| CandidateRegistrations {
127 stake_pool_public_key: mainchain_pub_key.clone(),
128 registrations: candidate_registrations
129 .into_iter()
130 .map(Self::make_registration_data)
131 .collect(),
132 stake_delegation: Self::get_stake_delegation(&stake_map, &mainchain_pub_key),
133 })
134 .collect())
135 }
136
137 async fn get_epoch_nonce(&self, epoch_number: McEpochNumber) -> Result<Option<EpochNonce>> {
138 let epoch = self.get_epoch_of_data_storage(epoch_number)?;
139 let nonce: String = self.client.epochs_parameters(epoch).await?.nonce;
140 Ok(Some(EpochNonce::decode_hex(&nonce)?))
141 }
142
143 async fn data_epoch(&self, for_epoch: McEpochNumber) -> Result<McEpochNumber> {
144 self.get_epoch_of_data_storage(for_epoch)
145 }
146}
147
148fn mckeyhash_from_bech32(bech32_str: &str) -> Result<MainchainKeyHash> {
149 let (_hrp, val) = bech32::decode(bech32_str).map_err(|e| e.to_string())?;
150 Ok(MainchainKeyHash(val.try_into().map_err(|_| "failed to convert vec to array")?))
151}
152
153#[derive(Debug)]
154struct RegisteredCandidate {
155 stake_pool_pub_key: StakePoolPublicKey,
156 registration_utxo: UtxoId,
157 tx_inputs: Vec<UtxoId>,
158 sidechain_signature: SidechainSignature,
159 mainchain_signature: MainchainSignature,
160 cross_chain_signature: CrossChainSignature,
161 sidechain_pub_key: SidechainPublicKey,
162 cross_chain_pub_key: CrossChainPublicKey,
163 keys: CandidateKeys,
164 utxo_info: UtxoInfo,
165}
166
167#[derive(Clone, Debug)]
168struct ParsedCandidate {
169 utxo_info: UtxoInfo,
170 datum: RegisterValidatorDatum,
171 tx_inputs: Vec<UtxoId>,
172}
173
174impl AuthoritySelectionDataSourceImpl {
175 fn make_registration_data(c: RegisteredCandidate) -> RegistrationData {
176 RegistrationData {
177 registration_utxo: c.registration_utxo,
178 sidechain_signature: c.sidechain_signature,
179 mainchain_signature: c.mainchain_signature,
180 cross_chain_signature: c.cross_chain_signature,
181 sidechain_pub_key: c.sidechain_pub_key,
182 cross_chain_pub_key: c.cross_chain_pub_key,
183 keys: c.keys,
184 utxo_info: c.utxo_info,
185 tx_inputs: c.tx_inputs,
186 }
187 }
188
189 fn get_stake_delegation(
190 stake_map: &HashMap<MainchainKeyHash, StakeDelegation>,
191 stake_pool_pub_key: &StakePoolPublicKey,
192 ) -> Option<StakeDelegation> {
193 if stake_map.is_empty() {
194 None
195 } else {
196 Some(
197 stake_map
198 .get(&MainchainKeyHash::from_vkey(&stake_pool_pub_key.0))
199 .cloned()
200 .unwrap_or(StakeDelegation(0)),
201 )
202 }
203 }
204
205 async fn convert_utxos_to_candidates(
207 &self,
208 outputs: &[AddressUtxoContentInner],
209 ) -> Result<Vec<RegisteredCandidate>> {
210 Self::parse_candidates(&self.client, outputs)
211 .await
212 .into_iter()
213 .map(|c| {
214 match c.datum {
215 RegisterValidatorDatum::V0 {
216 stake_ownership,
217 sidechain_pub_key,
218 sidechain_signature,
219 registration_utxo,
220 own_pkh: _own_pkh,
221 aura_pub_key,
222 grandpa_pub_key,
223 } => Ok(RegisteredCandidate {
224 stake_pool_pub_key: stake_ownership.pub_key,
225 mainchain_signature: stake_ownership.signature,
226 cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
228 cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
229 sidechain_signature,
230 sidechain_pub_key,
231 keys: CandidateKeys(vec![aura_pub_key.into(), grandpa_pub_key.into()]),
232 registration_utxo,
233 tx_inputs: c.tx_inputs,
234 utxo_info: c.utxo_info,
235 }),
236 RegisterValidatorDatum::V1 {
237 stake_ownership,
238 sidechain_pub_key,
239 sidechain_signature,
240 registration_utxo,
241 own_pkh: _own_pkh,
242 keys,
243 } => Ok(RegisteredCandidate {
244 stake_pool_pub_key: stake_ownership.pub_key,
245 mainchain_signature: stake_ownership.signature,
246 cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
248 cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
249 sidechain_signature,
250 sidechain_pub_key,
251 keys,
252 registration_utxo,
253 tx_inputs: c.tx_inputs,
254 utxo_info: c.utxo_info,
255 }),
256 }
257 })
258 .collect()
259 }
260
261 async fn parse_candidate(
262 client: &impl MiniBFApi,
263 output: &AddressUtxoContentInner,
264 ) -> Result<ParsedCandidate> {
265 let datum_str = output.inline_datum.clone().ok_or(format!(
266 "Missing registration datum for {:?}:{:?}",
267 output.tx_hash,
268 output.clone().output_index
269 ))?;
270 let datum = cardano_serialization_lib::PlutusData::from_hex(&datum_str)
271 .map_err(|e| format!("Failed to parse datum string: {e}"))?;
272 let utxo_id = UtxoId {
273 tx_hash: McTxHash::decode_hex(&output.tx_hash)?,
274 index: UtxoIndex(output.tx_index.try_into()?),
275 };
276 let register_validator_datum = RegisterValidatorDatum::try_from(datum)
277 .map_err(|_| format!("Invalid registration datum for {:?}", utxo_id))?;
278 let block = client.blocks_by_id(output.block.clone()).await?;
279 let block_txs = client.blocks_txs(output.block.clone()).await?;
280 let tx_index_within_block = block_txs
281 .into_iter()
282 .position(|tx_hash| tx_hash == output.tx_hash)
283 .map(|pos| McTxIndexInBlock(pos as u32))
284 .ok_or("output tx hash not found in blocks/txs response")?;
285 let utxos = client.transactions_utxos(utxo_id.tx_hash).await?;
286 let tx_inputs = utxos
287 .inputs
288 .into_iter()
289 .map(|input| {
290 Ok::<sidechain_domain::UtxoId, Box<dyn std::error::Error + Send + Sync>>(UtxoId {
291 tx_hash: McTxHash::decode_hex(&input.tx_hash)?,
292 index: UtxoIndex(input.output_index.try_into()?),
293 })
294 })
295 .collect::<Result<Vec<_>>>()?;
296 Ok(ParsedCandidate {
297 utxo_info: UtxoInfo {
298 utxo_id,
299 epoch_number: McEpochNumber(block.epoch.ok_or("block epoch missing")? as u32),
300 block_number: McBlockNumber(block.height.ok_or("block number missing")? as u32),
301 slot_number: McSlotNumber(block.slot.ok_or("block slot missing")? as u64),
302 tx_index_within_block,
303 },
304 datum: register_validator_datum,
305 tx_inputs,
306 })
307 }
308
309 async fn parse_candidates(
310 client: &impl MiniBFApi,
311 outputs: &[AddressUtxoContentInner],
312 ) -> Vec<ParsedCandidate> {
313 let results = futures::stream::iter(outputs)
314 .then(|output| async { Self::parse_candidate(client, output).await })
315 .collect::<Vec<_>>()
316 .await;
317 results
318 .into_iter()
319 .filter_map(|r| match r {
320 Ok(candidate) => Some(candidate.clone()),
321 Err(msg) => {
322 log::error!("Failed to parse candidate: {msg}");
323 None
324 },
325 })
326 .collect()
327 }
328
329 fn get_epoch_of_data_storage(
330 &self,
331 epoch_of_data_usage: McEpochNumber,
332 ) -> Result<McEpochNumber> {
333 offset_data_epoch(&epoch_of_data_usage).map_err(|offset| {
334 DataSourceError::BadRequest(format!(
335 "Minimum supported epoch of data usage is {offset}, but {} was provided",
336 epoch_of_data_usage.0
337 ))
338 .into()
339 })
340 }
341
342 async fn get_last_block_for_epoch(
344 &self,
345 epoch_number: McEpochNumber,
346 ) -> Result<Option<BlockContent>> {
347 let block_option = self.client.epochs_blocks(epoch_number).await?.last().cloned();
348 let block = match block_option {
349 Some(block) => Some(self.client.blocks_by_id(block).await?),
350 None => None,
351 };
352 Ok(block)
353 }
354
355 async fn get_registered_candidates(
356 &self,
357 epoch: McEpochNumber,
358 committee_candidate_address: MainchainAddress,
359 ) -> Result<Vec<RegisteredCandidate>> {
360 let registrations_block_for_epoch_opt = self.get_last_block_for_epoch(epoch).await?;
361 let utxos = self.client.addresses_utxos(committee_candidate_address).await?;
362 let active_utxos = match registrations_block_for_epoch_opt {
363 Some(registrations_block_for_epoch) => {
364 let pred = |utxo: AddressUtxoContentInner| async move {
365 let block = self.client.blocks_by_id(utxo.block.clone()).await?;
366 Ok::<bool, ResultErr>(
367 block.height.ok_or("committee candidate block height missing")? as u32
368 <= registrations_block_for_epoch
369 .height
370 .ok_or("last_block_for_epoch block height missing")? as u32,
371 )
372 };
373 let futures = utxos.into_iter().map(|item| async move {
374 match pred(item.clone()).await {
375 Ok(true) => Ok(Some(item)),
376 Ok(false) => Ok(None),
377 Err(e) => Err(e),
378 }
379 });
380 futures::future::try_join_all(futures)
381 .await?
382 .into_iter()
383 .flatten()
384 .collect::<Vec<_>>()
385 },
386 None => vec![],
387 };
388 self.convert_utxos_to_candidates(&active_utxos).await
389 }
390}