partner_chains_db_sync_data_sources/stake_distribution/
mod.rs

1//! Db-Sync data source serving information about Cardano stake delegation.
2//! Used by the Partner Chain Block Participation feature.
3use crate::db_model::{EpochNumber, StakePoolDelegationOutputRow};
4use crate::metrics::McFollowerMetrics;
5use crate::observed_async_trait;
6use lru::LruCache;
7use sidechain_domain::*;
8use sp_block_participation::inherent_data::BlockParticipationDataSource;
9use sqlx::PgPool;
10use std::sync::{Arc, Mutex};
11
12#[cfg(test)]
13mod tests;
14
15/// Db-Sync data source serving data about Cardano delegations
16pub struct StakeDistributionDataSourceImpl {
17	/// Postgres connection poool
18	pub pool: PgPool,
19	/// Prometheus metrics client
20	metrics_opt: Option<McFollowerMetrics>,
21	/// Internal data cache
22	cache: Cache,
23}
24
25impl StakeDistributionDataSourceImpl {
26	/// Creates a Stake Distribution new data source instance
27	pub fn new(pool: PgPool, metrics_opt: Option<McFollowerMetrics>, cache_size: usize) -> Self {
28		StakeDistributionDataSourceImpl { pool, metrics_opt, cache: Cache::new(cache_size) }
29	}
30}
31
32observed_async_trait!(
33impl BlockParticipationDataSource for StakeDistributionDataSourceImpl {
34	async fn get_stake_pool_delegation_distribution_for_pools(
35		&self,
36		epoch: McEpochNumber,
37		pool_hashes: &[MainchainKeyHash],
38	) -> Result<StakeDistribution, Box<dyn std::error::Error + Send + Sync>> {
39		let mut pool_hashes_to_query = Vec::<[u8; 28]>::new();
40		let mut stake_distribution = BTreeMap::<MainchainKeyHash, PoolDelegation>::new();
41
42		for pool_hash in pool_hashes {
43			match self.cache.get_distribution_for_pool(epoch, pool_hash) {
44				Some(pool_delegation) => {
45					stake_distribution.insert(*pool_hash, pool_delegation);
46				},
47				None => pool_hashes_to_query.push(pool_hash.0),
48			}
49		}
50		let rows = crate::db_model::get_stake_pool_delegations_for_pools(
51			&self.pool,
52			EpochNumber::from(epoch),
53			pool_hashes_to_query,
54		)
55		.await?;
56		let mut queried_pool_delegations = rows_to_distribution(rows);
57		self.cache.put_distribution_for_pools(epoch, queried_pool_delegations.clone());
58		stake_distribution.append(&mut queried_pool_delegations.0);
59		Ok(StakeDistribution(stake_distribution))
60	}
61}
62);
63
64fn rows_to_distribution(rows: Vec<StakePoolDelegationOutputRow>) -> StakeDistribution {
65	let mut res = BTreeMap::<MainchainKeyHash, PoolDelegation>::new();
66	for row in rows {
67		match get_delegator_key(&row) {
68			Ok(delegator_key) => {
69				let pool = res.entry(MainchainKeyHash(row.pool_hash_raw)).or_default();
70				pool.delegators
71					.entry(delegator_key)
72					.or_insert(DelegatorStakeAmount(row.epoch_stake_amount.0));
73				pool.total_stake.0 += row.epoch_stake_amount.0;
74			},
75			Err(e) => {
76				log::warn!("Failed to parse StakePoolDelegationOutputRow: {}", e)
77			},
78		}
79	}
80	StakeDistribution(res)
81}
82
83fn get_delegator_key(row: &StakePoolDelegationOutputRow) -> Result<DelegatorKey, String> {
84	match &row.stake_address_hash_raw[..] {
85		[0xe0 | 0xe1, rest @ ..] => Ok(DelegatorKey::StakeKeyHash(
86			rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
87		)),
88		[0xf0 | 0xf1, rest @ ..] => Ok(DelegatorKey::ScriptKeyHash {
89			hash_raw: rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
90			script_hash: row
91				.stake_address_script_hash
92				.ok_or("stake_address_script_hash must be present for script keys")?,
93		}),
94		_ => {
95			Err(format!("invalid stake address hash: {}", hex::encode(row.stake_address_hash_raw)))
96		},
97	}
98}
99
100type DistributionPerPoolCacheKey = (McEpochNumber, MainchainKeyHash);
101struct Cache {
102	distribution_per_pool_cache: Arc<Mutex<LruCache<DistributionPerPoolCacheKey, PoolDelegation>>>,
103}
104
105impl Cache {
106	fn new(cache_size: usize) -> Self {
107		Self {
108			distribution_per_pool_cache: Arc::new(Mutex::new(LruCache::new(
109				cache_size.try_into().unwrap(),
110			))),
111		}
112	}
113
114	fn get_distribution_for_pool(
115		&self,
116		epoch: McEpochNumber,
117		pool_hash: &MainchainKeyHash,
118	) -> Option<PoolDelegation> {
119		if let Ok(mut cache) = self.distribution_per_pool_cache.lock() {
120			cache.get(&(epoch, *pool_hash)).map(|e| e.clone())
121		} else {
122			None
123		}
124	}
125
126	fn put_distribution_for_pools(
127		&self,
128		epoch: McEpochNumber,
129		stake_distribution: StakeDistribution,
130	) {
131		if let Ok(mut cache) = self.distribution_per_pool_cache.lock() {
132			for (pool_hash, pool_delegation) in stake_distribution.0 {
133				cache.put((epoch, pool_hash), pool_delegation);
134			}
135		}
136	}
137}