partner_chains_dolos_data_sources/
stake_distribution.rs1use crate::{
2 Result,
3 client::{MiniBFClient, api::MiniBFApi},
4};
5use blockfrost_openapi::models::epoch_stake_pool_content_inner::EpochStakePoolContentInner;
6use futures::StreamExt;
7use sidechain_domain::*;
8use sp_block_participation::inherent_data::BlockParticipationDataSource;
9
10pub struct StakeDistributionDataSourceImpl {
11 client: MiniBFClient,
12}
13
14impl StakeDistributionDataSourceImpl {
15 pub fn new(client: MiniBFClient) -> Self {
16 Self { client }
17 }
18}
19
20#[async_trait::async_trait]
21impl BlockParticipationDataSource for StakeDistributionDataSourceImpl {
22 async fn get_stake_pool_delegation_distribution_for_pools(
23 &self,
24 epoch_number: McEpochNumber,
25 pool_hashes: &[MainchainKeyHash],
26 ) -> Result<StakeDistribution> {
27 let pool_futures = futures::stream::iter(pool_hashes)
28 .map(|pool_id| async {
29 self.client
30 .epochs_stakes_by_pool(epoch_number, *pool_id)
31 .await
32 .map(|ss| ss.iter().map(|s| (*pool_id, s.clone())).collect::<Vec<_>>())
33 })
34 .collect::<Vec<_>>()
35 .await;
36 let pools = futures::future::try_join_all(pool_futures)
37 .await?
38 .into_iter()
39 .flatten()
40 .collect::<Vec<_>>();
41 Ok(rows_to_distribution(pools))
42 }
43}
44
45fn rows_to_distribution(
46 rows: Vec<(sidechain_domain::MainchainKeyHash, EpochStakePoolContentInner)>,
47) -> StakeDistribution {
48 let mut res = BTreeMap::<MainchainKeyHash, PoolDelegation>::new();
49 for (pool_id, stake) in rows {
50 match get_delegator_key(&stake) {
51 Ok(delegator_key) => {
52 let pool = res.entry(pool_id).or_default();
53 let stake_amount = stake.amount.parse().expect("valid stake amount");
54 pool.delegators
55 .entry(delegator_key)
56 .or_insert(DelegatorStakeAmount(stake_amount));
57 pool.total_stake.0 += stake_amount;
58 },
59 Err(e) => {
60 log::warn!("Failed to parse EpochStakePoolContentInner: {}", e)
61 },
62 }
63 }
64 StakeDistribution(res)
65}
66
67fn get_delegator_key(row: &EpochStakePoolContentInner) -> Result<DelegatorKey> {
68 let (_, stake_address_hash_raw) = bech32::decode(&row.stake_address)?;
69 match &stake_address_hash_raw[..] {
70 [0xe0 | 0xe1, rest @ ..] => Ok(DelegatorKey::StakeKeyHash(
71 rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
72 )),
73 [0xf0 | 0xf1, rest @ ..] => Ok(DelegatorKey::ScriptKeyHash {
74 hash_raw: rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
75 script_hash: [0; 28], }),
77 _ => Err(format!("invalid stake address hash: {}", row.stake_address).into()),
78 }
79}