partner_chains_dolos_data_sources/
candidate.rs

1use std::collections::HashMap;
2
3use crate::{client::*, *};
4use async_trait::async_trait;
5use authority_selection_inherents::*;
6use blockfrost_openapi::models::{
7	address_utxo_content_inner::AddressUtxoContentInner,
8	asset_transactions_inner::AssetTransactionsInner, block_content::BlockContent,
9	pool_list_extended_inner::PoolListExtendedInner,
10};
11use cardano_serialization_lib::PlutusData;
12use futures::StreamExt;
13use itertools::Itertools;
14use partner_chains_plutus_data::{
15	d_param::DParamDatum, permissioned_candidates::PermissionedCandidateDatums,
16	registered_candidates::RegisterValidatorDatum,
17};
18use sidechain_domain::*;
19
20pub struct AuthoritySelectionDataSourceImpl {
21	client: MiniBFClient,
22}
23
24impl AuthoritySelectionDataSourceImpl {
25	pub fn new(client: MiniBFClient) -> Self {
26		Self { client }
27	}
28}
29
30async fn get_token_utxo_datum_for_epoch(
31	client: &impl MiniBFApi,
32	policy_id: PolicyId,
33	epoch: McEpochNumber,
34) -> Result<Option<cardano_serialization_lib::PlutusData>> {
35	let asset_utxos = client
36		.assets_transactions(AssetId {
37			policy_id: policy_id.clone(),
38			asset_name: AssetName::empty(),
39		})
40		.await?;
41	let pred = |x: AssetTransactionsInner| async move {
42		let tx_hash = McTxHash::from_hex_unsafe(&x.tx_hash);
43		let tx = client.transaction_by_hash(tx_hash).await?;
44		let block = client.blocks_by_id(McBlockNumber(tx.block_height as u32)).await?;
45		Result::Ok(if block.epoch <= Some(epoch.0 as i32) { Some(tx_hash) } else { None })
46	};
47	let futures = asset_utxos.into_iter().map(|item| async move { pred(item.clone()).await });
48	let tx_hash = futures::future::try_join_all(futures)
49		.await?
50		.into_iter()
51		.flatten()
52		.collect::<Vec<McTxHash>>()
53		.first()
54		.ok_or("No policy utxo found after epoch")?
55		.to_owned();
56
57	let datum = client.transactions_utxos(tx_hash).await?.outputs.iter().find_map(|o| {
58		// TODO compare on the level of PolicyId instead of String
59		if o.amount.iter().any(|a| a.unit == &policy_id.to_hex_string()[2..]) {
60			o.inline_datum.clone()
61		} else {
62			None
63		}
64	});
65	Ok(match datum {
66		Some(datum) => Some(PlutusData::from_hex(&datum)?),
67		None => None,
68	})
69}
70
71#[async_trait]
72impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImpl {
73	async fn get_ariadne_parameters(
74		&self,
75		epoch_number: McEpochNumber,
76		d_parameter_policy: PolicyId,
77		permissioned_candidate_policy: PolicyId,
78	) -> Result<AriadneParameters> {
79		let epoch = self.get_epoch_of_data_storage(epoch_number)?;
80
81		let (candidates_output_opt, d_output_opt) = tokio::try_join!(
82			get_token_utxo_datum_for_epoch(&self.client, permissioned_candidate_policy, epoch),
83			get_token_utxo_datum_for_epoch(&self.client, d_parameter_policy, epoch)
84		)?;
85
86		let d_datum = d_output_opt
87			.ok_or(DataSourceError::ExpectedDataNotFound("DParameter Datum".to_string()))?;
88
89		let d_parameter = DParamDatum::try_from(d_datum)?.into();
90
91		let permissioned_candidates = match candidates_output_opt {
92			None => None,
93			Some(candidates_datum) => {
94				Some(PermissionedCandidateDatums::try_from(candidates_datum)?.into())
95			},
96		};
97
98		Ok(AriadneParameters { d_parameter, permissioned_candidates })
99	}
100
101	async fn get_candidates(
102		&self,
103		epoch_number: McEpochNumber,
104		committee_candidate_address: MainchainAddress,
105	) -> Result<Vec<CandidateRegistrations>> {
106		let epoch = self.get_epoch_of_data_storage(epoch_number)?;
107		let candidates = self.get_registered_candidates(epoch, committee_candidate_address).await?;
108		let pools = self.client.pools_extended().await?;
109		let pred = |pool: PoolListExtendedInner| async move {
110			let pool_id = mckeyhash_from_bech32(&pool.pool_id)?;
111			let history = self.client.pools_history(pool_id).await?;
112			Result::Ok(match history.into_iter().find(|h| h.epoch == epoch.0 as i32) {
113				Some(e) => Some((pool_id, StakeDelegation(e.active_stake.parse::<u64>()?))),
114				None => None,
115			})
116		};
117
118		let futures = pools.into_iter().map(|item| async move { pred(item.clone()).await });
119		let stake_map: HashMap<MainchainKeyHash, StakeDelegation> =
120			futures::future::try_join_all(futures).await?.into_iter().flatten().collect();
121
122		Ok(candidates
123			.into_iter()
124			.into_group_map_by(|c| c.stake_pool_pub_key.clone())
125			.into_iter()
126			.map(|(mainchain_pub_key, candidate_registrations)| CandidateRegistrations {
127				stake_pool_public_key: mainchain_pub_key.clone(),
128				registrations: candidate_registrations
129					.into_iter()
130					.map(Self::make_registration_data)
131					.collect(),
132				stake_delegation: Self::get_stake_delegation(&stake_map, &mainchain_pub_key),
133			})
134			.collect())
135	}
136
137	async fn get_epoch_nonce(&self, epoch_number: McEpochNumber) -> Result<Option<EpochNonce>> {
138		let epoch = self.get_epoch_of_data_storage(epoch_number)?;
139		let nonce: String = self.client.epochs_parameters(epoch).await?.nonce;
140		Ok(Some(EpochNonce::decode_hex(&nonce)?))
141	}
142
143	async fn data_epoch(&self, for_epoch: McEpochNumber) -> Result<McEpochNumber> {
144		self.get_epoch_of_data_storage(for_epoch)
145	}
146}
147
148fn mckeyhash_from_bech32(bech32_str: &str) -> Result<MainchainKeyHash> {
149	let (_hrp, val) = bech32::decode(bech32_str).map_err(|e| e.to_string())?;
150	Ok(MainchainKeyHash(val.try_into().map_err(|_| "failed to convert vec to array")?))
151}
152
153#[derive(Debug)]
154struct RegisteredCandidate {
155	stake_pool_pub_key: StakePoolPublicKey,
156	registration_utxo: UtxoId,
157	tx_inputs: Vec<UtxoId>,
158	sidechain_signature: SidechainSignature,
159	mainchain_signature: MainchainSignature,
160	cross_chain_signature: CrossChainSignature,
161	sidechain_pub_key: SidechainPublicKey,
162	cross_chain_pub_key: CrossChainPublicKey,
163	keys: CandidateKeys,
164	utxo_info: UtxoInfo,
165}
166
167#[derive(Clone, Debug)]
168struct ParsedCandidate {
169	utxo_info: UtxoInfo,
170	datum: RegisterValidatorDatum,
171	tx_inputs: Vec<UtxoId>,
172}
173
174impl AuthoritySelectionDataSourceImpl {
175	fn make_registration_data(c: RegisteredCandidate) -> RegistrationData {
176		RegistrationData {
177			registration_utxo: c.registration_utxo,
178			sidechain_signature: c.sidechain_signature,
179			mainchain_signature: c.mainchain_signature,
180			cross_chain_signature: c.cross_chain_signature,
181			sidechain_pub_key: c.sidechain_pub_key,
182			cross_chain_pub_key: c.cross_chain_pub_key,
183			keys: c.keys,
184			utxo_info: c.utxo_info,
185			tx_inputs: c.tx_inputs,
186		}
187	}
188
189	fn get_stake_delegation(
190		stake_map: &HashMap<MainchainKeyHash, StakeDelegation>,
191		stake_pool_pub_key: &StakePoolPublicKey,
192	) -> Option<StakeDelegation> {
193		if stake_map.is_empty() {
194			None
195		} else {
196			Some(
197				stake_map
198					.get(&MainchainKeyHash::from_vkey(&stake_pool_pub_key.0))
199					.cloned()
200					.unwrap_or(StakeDelegation(0)),
201			)
202		}
203	}
204
205	// Converters
206	async fn convert_utxos_to_candidates(
207		&self,
208		outputs: &[AddressUtxoContentInner],
209	) -> Result<Vec<RegisteredCandidate>> {
210		Self::parse_candidates(&self.client, outputs)
211			.await
212			.into_iter()
213			.map(|c| {
214				match c.datum {
215					RegisterValidatorDatum::V0 {
216						stake_ownership,
217						sidechain_pub_key,
218						sidechain_signature,
219						registration_utxo,
220						own_pkh: _own_pkh,
221						aura_pub_key,
222						grandpa_pub_key,
223					} => Ok(RegisteredCandidate {
224						stake_pool_pub_key: stake_ownership.pub_key,
225						mainchain_signature: stake_ownership.signature,
226						// For now we use the same key for both cross chain and sidechain actions
227						cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
228						cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
229						sidechain_signature,
230						sidechain_pub_key,
231						keys: CandidateKeys(vec![aura_pub_key.into(), grandpa_pub_key.into()]),
232						registration_utxo,
233						tx_inputs: c.tx_inputs,
234						utxo_info: c.utxo_info,
235					}),
236					RegisterValidatorDatum::V1 {
237						stake_ownership,
238						sidechain_pub_key,
239						sidechain_signature,
240						registration_utxo,
241						own_pkh: _own_pkh,
242						keys,
243					} => Ok(RegisteredCandidate {
244						stake_pool_pub_key: stake_ownership.pub_key,
245						mainchain_signature: stake_ownership.signature,
246						// For now we use the same key for both cross chain and sidechain actions
247						cross_chain_pub_key: CrossChainPublicKey(sidechain_pub_key.0.clone()),
248						cross_chain_signature: CrossChainSignature(sidechain_signature.0.clone()),
249						sidechain_signature,
250						sidechain_pub_key,
251						keys,
252						registration_utxo,
253						tx_inputs: c.tx_inputs,
254						utxo_info: c.utxo_info,
255					}),
256				}
257			})
258			.collect()
259	}
260
261	async fn parse_candidate(
262		client: &impl MiniBFApi,
263		output: &AddressUtxoContentInner,
264	) -> Result<ParsedCandidate> {
265		let datum_str = output.inline_datum.clone().ok_or(format!(
266			"Missing registration datum for {:?}:{:?}",
267			output.tx_hash,
268			output.clone().output_index
269		))?;
270		let datum = cardano_serialization_lib::PlutusData::from_hex(&datum_str)
271			.map_err(|e| format!("Failed to parse datum string: {e}"))?;
272		let utxo_id = UtxoId {
273			tx_hash: McTxHash::decode_hex(&output.tx_hash)?,
274			index: UtxoIndex(output.tx_index.try_into()?),
275		};
276		let register_validator_datum = RegisterValidatorDatum::try_from(datum)
277			.map_err(|_| format!("Invalid registration datum for {:?}", utxo_id))?;
278		let block = client.blocks_by_id(output.block.clone()).await?;
279		let block_txs = client.blocks_txs(output.block.clone()).await?;
280		let tx_index_within_block = block_txs
281			.into_iter()
282			.position(|tx_hash| tx_hash == output.tx_hash)
283			.map(|pos| McTxIndexInBlock(pos as u32))
284			.ok_or("output tx hash not found in blocks/txs response")?;
285		let utxos = client.transactions_utxos(utxo_id.tx_hash).await?;
286		let tx_inputs = utxos
287			.inputs
288			.into_iter()
289			.map(|input| {
290				Ok::<sidechain_domain::UtxoId, Box<dyn std::error::Error + Send + Sync>>(UtxoId {
291					tx_hash: McTxHash::decode_hex(&input.tx_hash)?,
292					index: UtxoIndex(input.output_index.try_into()?),
293				})
294			})
295			.collect::<Result<Vec<_>>>()?;
296		Ok(ParsedCandidate {
297			utxo_info: UtxoInfo {
298				utxo_id,
299				epoch_number: McEpochNumber(block.epoch.ok_or("block epoch missing")? as u32),
300				block_number: McBlockNumber(block.height.ok_or("block number missing")? as u32),
301				slot_number: McSlotNumber(block.slot.ok_or("block slot missing")? as u64),
302				tx_index_within_block,
303			},
304			datum: register_validator_datum,
305			tx_inputs,
306		})
307	}
308
309	async fn parse_candidates(
310		client: &impl MiniBFApi,
311		outputs: &[AddressUtxoContentInner],
312	) -> Vec<ParsedCandidate> {
313		let results = futures::stream::iter(outputs)
314			.then(|output| async { Self::parse_candidate(client, output).await })
315			.collect::<Vec<_>>()
316			.await;
317		results
318			.into_iter()
319			.filter_map(|r| match r {
320				Ok(candidate) => Some(candidate.clone()),
321				Err(msg) => {
322					log::error!("Failed to parse candidate: {msg}");
323					None
324				},
325			})
326			.collect()
327	}
328
329	fn get_epoch_of_data_storage(
330		&self,
331		epoch_of_data_usage: McEpochNumber,
332	) -> Result<McEpochNumber> {
333		offset_data_epoch(&epoch_of_data_usage).map_err(|offset| {
334			DataSourceError::BadRequest(format!(
335				"Minimum supported epoch of data usage is {offset}, but {} was provided",
336				epoch_of_data_usage.0
337			))
338			.into()
339		})
340	}
341
342	/// Registrations state up to this block are considered as "active", after it - as "pending".
343	async fn get_last_block_for_epoch(
344		&self,
345		epoch_number: McEpochNumber,
346	) -> Result<Option<BlockContent>> {
347		let block_option = self.client.epochs_blocks(epoch_number).await?.last().cloned();
348		let block = match block_option {
349			Some(block) => Some(self.client.blocks_by_id(block).await?),
350			None => None,
351		};
352		Ok(block)
353	}
354
355	async fn get_registered_candidates(
356		&self,
357		epoch: McEpochNumber,
358		committee_candidate_address: MainchainAddress,
359	) -> Result<Vec<RegisteredCandidate>> {
360		let registrations_block_for_epoch_opt = self.get_last_block_for_epoch(epoch).await?;
361		let utxos = self.client.addresses_utxos(committee_candidate_address).await?;
362		let active_utxos = match registrations_block_for_epoch_opt {
363			Some(registrations_block_for_epoch) => {
364				let pred = |utxo: AddressUtxoContentInner| async move {
365					let block = self.client.blocks_by_id(utxo.block.clone()).await?;
366					Ok::<bool, ResultErr>(
367						block.height.ok_or("committee candidate block height missing")? as u32
368							<= registrations_block_for_epoch
369								.height
370								.ok_or("last_block_for_epoch block height missing")? as u32,
371					)
372				};
373				let futures = utxos.into_iter().map(|item| async move {
374					match pred(item.clone()).await {
375						Ok(true) => Ok(Some(item)),
376						Ok(false) => Ok(None),
377						Err(e) => Err(e),
378					}
379				});
380				futures::future::try_join_all(futures)
381					.await?
382					.into_iter()
383					.flatten()
384					.collect::<Vec<_>>()
385			},
386			None => vec![],
387		};
388		self.convert_utxos_to_candidates(&active_utxos).await
389	}
390}