partner_chains_db_sync_data_sources/stake_distribution/
mod.rs1use crate::db_model::{EpochNumber, StakePoolDelegationOutputRow};
4use crate::metrics::McFollowerMetrics;
5use crate::observed_async_trait;
6use lru::LruCache;
7use sidechain_domain::*;
8use sp_block_participation::inherent_data::BlockParticipationDataSource;
9use sqlx::PgPool;
10use std::sync::{Arc, Mutex};
11
12#[cfg(test)]
13mod tests;
14
15pub struct StakeDistributionDataSourceImpl {
17 pub pool: PgPool,
19 metrics_opt: Option<McFollowerMetrics>,
21 cache: Cache,
23}
24
25impl StakeDistributionDataSourceImpl {
26 pub fn new(pool: PgPool, metrics_opt: Option<McFollowerMetrics>, cache_size: usize) -> Self {
28 StakeDistributionDataSourceImpl { pool, metrics_opt, cache: Cache::new(cache_size) }
29 }
30}
31
32observed_async_trait!(
33impl BlockParticipationDataSource for StakeDistributionDataSourceImpl {
34 async fn get_stake_pool_delegation_distribution_for_pools(
35 &self,
36 epoch: McEpochNumber,
37 pool_hashes: &[MainchainKeyHash],
38 ) -> Result<StakeDistribution, Box<dyn std::error::Error + Send + Sync>> {
39 let mut pool_hashes_to_query = Vec::<[u8; 28]>::new();
40 let mut stake_distribution = BTreeMap::<MainchainKeyHash, PoolDelegation>::new();
41
42 for pool_hash in pool_hashes {
43 match self.cache.get_distribution_for_pool(epoch, pool_hash) {
44 Some(pool_delegation) => {
45 stake_distribution.insert(*pool_hash, pool_delegation);
46 },
47 None => pool_hashes_to_query.push(pool_hash.0),
48 }
49 }
50 let rows = crate::db_model::get_stake_pool_delegations_for_pools(
51 &self.pool,
52 EpochNumber::from(epoch),
53 pool_hashes_to_query,
54 )
55 .await?;
56 let mut queried_pool_delegations = rows_to_distribution(rows);
57 self.cache.put_distribution_for_pools(epoch, queried_pool_delegations.clone());
58 stake_distribution.append(&mut queried_pool_delegations.0);
59 Ok(StakeDistribution(stake_distribution))
60 }
61}
62);
63
64fn rows_to_distribution(rows: Vec<StakePoolDelegationOutputRow>) -> StakeDistribution {
65 let mut res = BTreeMap::<MainchainKeyHash, PoolDelegation>::new();
66 for row in rows {
67 match get_delegator_key(&row) {
68 Ok(delegator_key) => {
69 let pool = res.entry(MainchainKeyHash(row.pool_hash_raw)).or_default();
70 pool.delegators
71 .entry(delegator_key)
72 .or_insert(DelegatorStakeAmount(row.epoch_stake_amount.0));
73 pool.total_stake.0 += row.epoch_stake_amount.0;
74 },
75 Err(e) => {
76 log::warn!("Failed to parse StakePoolDelegationOutputRow: {}", e)
77 },
78 }
79 }
80 StakeDistribution(res)
81}
82
83fn get_delegator_key(row: &StakePoolDelegationOutputRow) -> Result<DelegatorKey, String> {
84 match &row.stake_address_hash_raw[..] {
85 [0xe0 | 0xe1, rest @ ..] => Ok(DelegatorKey::StakeKeyHash(
86 rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
87 )),
88 [0xf0 | 0xf1, rest @ ..] => Ok(DelegatorKey::ScriptKeyHash {
89 hash_raw: rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
90 script_hash: row
91 .stake_address_script_hash
92 .ok_or("stake_address_script_hash must be present for script keys")?,
93 }),
94 _ => {
95 Err(format!("invalid stake address hash: {}", hex::encode(row.stake_address_hash_raw)))
96 },
97 }
98}
99
100type DistributionPerPoolCacheKey = (McEpochNumber, MainchainKeyHash);
101struct Cache {
102 distribution_per_pool_cache: Arc<Mutex<LruCache<DistributionPerPoolCacheKey, PoolDelegation>>>,
103}
104
105impl Cache {
106 fn new(cache_size: usize) -> Self {
107 Self {
108 distribution_per_pool_cache: Arc::new(Mutex::new(LruCache::new(
109 cache_size.try_into().unwrap(),
110 ))),
111 }
112 }
113
114 fn get_distribution_for_pool(
115 &self,
116 epoch: McEpochNumber,
117 pool_hash: &MainchainKeyHash,
118 ) -> Option<PoolDelegation> {
119 if let Ok(mut cache) = self.distribution_per_pool_cache.lock() {
120 cache.get(&(epoch, *pool_hash)).map(|e| e.clone())
121 } else {
122 None
123 }
124 }
125
126 fn put_distribution_for_pools(
127 &self,
128 epoch: McEpochNumber,
129 stake_distribution: StakeDistribution,
130 ) {
131 if let Ok(mut cache) = self.distribution_per_pool_cache.lock() {
132 for (pool_hash, pool_delegation) in stake_distribution.0 {
133 cache.put((epoch, pool_hash), pool_delegation);
134 }
135 }
136 }
137}