partner_chains_dolos_data_sources/
stake_distribution.rs

1use crate::{
2	Result,
3	client::{MiniBFClient, api::MiniBFApi},
4};
5use blockfrost_openapi::models::epoch_stake_pool_content_inner::EpochStakePoolContentInner;
6use futures::StreamExt;
7use sidechain_domain::*;
8use sp_block_participation::inherent_data::BlockParticipationDataSource;
9
10pub struct StakeDistributionDataSourceImpl {
11	client: MiniBFClient,
12}
13
14impl StakeDistributionDataSourceImpl {
15	pub fn new(client: MiniBFClient) -> Self {
16		Self { client }
17	}
18}
19
20#[async_trait::async_trait]
21impl BlockParticipationDataSource for StakeDistributionDataSourceImpl {
22	async fn get_stake_pool_delegation_distribution_for_pools(
23		&self,
24		epoch_number: McEpochNumber,
25		pool_hashes: &[MainchainKeyHash],
26	) -> Result<StakeDistribution> {
27		let pool_futures = futures::stream::iter(pool_hashes)
28			.map(|pool_id| async {
29				self.client
30					.epochs_stakes_by_pool(epoch_number, *pool_id)
31					.await
32					.map(|ss| ss.iter().map(|s| (*pool_id, s.clone())).collect::<Vec<_>>())
33			})
34			.collect::<Vec<_>>()
35			.await;
36		let pools = futures::future::try_join_all(pool_futures)
37			.await?
38			.into_iter()
39			.flatten()
40			.collect::<Vec<_>>();
41		Ok(rows_to_distribution(pools))
42	}
43}
44
45fn rows_to_distribution(
46	rows: Vec<(sidechain_domain::MainchainKeyHash, EpochStakePoolContentInner)>,
47) -> StakeDistribution {
48	let mut res = BTreeMap::<MainchainKeyHash, PoolDelegation>::new();
49	for (pool_id, stake) in rows {
50		match get_delegator_key(&stake) {
51			Ok(delegator_key) => {
52				let pool = res.entry(pool_id).or_default();
53				let stake_amount = stake.amount.parse().expect("valid stake amount");
54				pool.delegators
55					.entry(delegator_key)
56					.or_insert(DelegatorStakeAmount(stake_amount));
57				pool.total_stake.0 += stake_amount;
58			},
59			Err(e) => {
60				log::warn!("Failed to parse EpochStakePoolContentInner: {}", e)
61			},
62		}
63	}
64	StakeDistribution(res)
65}
66
67fn get_delegator_key(row: &EpochStakePoolContentInner) -> Result<DelegatorKey> {
68	let (_, stake_address_hash_raw) = bech32::decode(&row.stake_address)?;
69	match &stake_address_hash_raw[..] {
70		[0xe0 | 0xe1, rest @ ..] => Ok(DelegatorKey::StakeKeyHash(
71			rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
72		)),
73		[0xf0 | 0xf1, rest @ ..] => Ok(DelegatorKey::ScriptKeyHash {
74			hash_raw: rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
75			script_hash: [0; 28], // TODO add support for this in a follow up PR
76		}),
77		_ => Err(format!("invalid stake address hash: {}", row.stake_address).into()),
78	}
79}