partner_chains_db_sync_data_sources/stake_distribution/
mod.rs1use crate::db_model::{EpochNumber, StakePoolDelegationOutputRow};
4use crate::metrics::{McFollowerMetrics, observed_async_trait};
5use lru::LruCache;
6use sidechain_domain::*;
7use sp_block_participation::inherent_data::BlockParticipationDataSource;
8use sqlx::PgPool;
9use std::sync::{Arc, Mutex};
10
11#[cfg(test)]
12mod tests;
13
14pub struct StakeDistributionDataSourceImpl {
16 pub pool: PgPool,
18 metrics_opt: Option<McFollowerMetrics>,
20 cache: Cache,
22}
23
24impl StakeDistributionDataSourceImpl {
25 pub fn new(pool: PgPool, metrics_opt: Option<McFollowerMetrics>, cache_size: usize) -> Self {
27 StakeDistributionDataSourceImpl { pool, metrics_opt, cache: Cache::new(cache_size) }
28 }
29}
30
31observed_async_trait!(
32impl BlockParticipationDataSource for StakeDistributionDataSourceImpl {
33 async fn get_stake_pool_delegation_distribution_for_pools(
34 &self,
35 epoch: McEpochNumber,
36 pool_hashes: &[MainchainKeyHash],
37 ) -> Result<StakeDistribution, Box<dyn std::error::Error + Send + Sync>> {
38 let mut pool_hashes_to_query = Vec::<[u8; 28]>::new();
39 let mut stake_distribution = BTreeMap::<MainchainKeyHash, PoolDelegation>::new();
40
41 for pool_hash in pool_hashes {
42 match self.cache.get_distribution_for_pool(epoch, pool_hash) {
43 Some(pool_delegation) => {
44 stake_distribution.insert(*pool_hash, pool_delegation);
45 },
46 None => pool_hashes_to_query.push(pool_hash.0),
47 }
48 }
49 let rows = crate::db_model::get_stake_pool_delegations_for_pools(
50 &self.pool,
51 EpochNumber::from(epoch),
52 pool_hashes_to_query,
53 )
54 .await?;
55 let mut queried_pool_delegations = rows_to_distribution(rows);
56 self.cache.put_distribution_for_pools(epoch, queried_pool_delegations.clone());
57 stake_distribution.append(&mut queried_pool_delegations.0);
58 Ok(StakeDistribution(stake_distribution))
59 }
60}
61);
62
63fn rows_to_distribution(rows: Vec<StakePoolDelegationOutputRow>) -> StakeDistribution {
64 let mut res = BTreeMap::<MainchainKeyHash, PoolDelegation>::new();
65 for row in rows {
66 match get_delegator_key(&row) {
67 Ok(delegator_key) => {
68 let pool = res.entry(MainchainKeyHash(row.pool_hash_raw)).or_default();
69 pool.delegators
70 .entry(delegator_key)
71 .or_insert(DelegatorStakeAmount(row.epoch_stake_amount.0));
72 pool.total_stake.0 += row.epoch_stake_amount.0;
73 },
74 Err(e) => {
75 log::warn!("Failed to parse StakePoolDelegationOutputRow: {}", e)
76 },
77 }
78 }
79 StakeDistribution(res)
80}
81
82fn get_delegator_key(row: &StakePoolDelegationOutputRow) -> Result<DelegatorKey, String> {
83 match &row.stake_address_hash_raw[..] {
84 [0xe0 | 0xe1, rest @ ..] => Ok(DelegatorKey::StakeKeyHash(
85 rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
86 )),
87 [0xf0 | 0xf1, rest @ ..] => Ok(DelegatorKey::ScriptKeyHash {
88 hash_raw: rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
89 script_hash: row
90 .stake_address_script_hash
91 .ok_or("stake_address_script_hash must be present for script keys")?,
92 }),
93 _ => {
94 Err(format!("invalid stake address hash: {}", hex::encode(row.stake_address_hash_raw)))
95 },
96 }
97}
98
99type DistributionPerPoolCacheKey = (McEpochNumber, MainchainKeyHash);
100struct Cache {
101 distribution_per_pool_cache: Arc<Mutex<LruCache<DistributionPerPoolCacheKey, PoolDelegation>>>,
102}
103
104impl Cache {
105 fn new(cache_size: usize) -> Self {
106 Self {
107 distribution_per_pool_cache: Arc::new(Mutex::new(LruCache::new(
108 cache_size.try_into().unwrap(),
109 ))),
110 }
111 }
112
113 fn get_distribution_for_pool(
114 &self,
115 epoch: McEpochNumber,
116 pool_hash: &MainchainKeyHash,
117 ) -> Option<PoolDelegation> {
118 if let Ok(mut cache) = self.distribution_per_pool_cache.lock() {
119 cache.get(&(epoch, *pool_hash)).map(|e| e.clone())
120 } else {
121 None
122 }
123 }
124
125 fn put_distribution_for_pools(
126 &self,
127 epoch: McEpochNumber,
128 stake_distribution: StakeDistribution,
129 ) {
130 if let Ok(mut cache) = self.distribution_per_pool_cache.lock() {
131 for (pool_hash, pool_delegation) in stake_distribution.0 {
132 cache.put((epoch, pool_hash), pool_delegation);
133 }
134 }
135 }
136}