partner_chains_db_sync_data_sources/stake_distribution/
mod.rs

1//! Db-Sync data source serving information about Cardano stake delegation.
2//! Used by the Partner Chain Block Participation feature.
3use crate::db_model::{EpochNumber, StakePoolDelegationOutputRow};
4use crate::metrics::{McFollowerMetrics, observed_async_trait};
5use lru::LruCache;
6use sidechain_domain::*;
7use sp_block_participation::inherent_data::BlockParticipationDataSource;
8use sqlx::PgPool;
9use std::sync::{Arc, Mutex};
10
11#[cfg(test)]
12mod tests;
13
14/// Db-Sync data source serving data about Cardano delegations
15pub struct StakeDistributionDataSourceImpl {
16	/// Postgres connection poool
17	pub pool: PgPool,
18	/// Prometheus metrics client
19	metrics_opt: Option<McFollowerMetrics>,
20	/// Internal data cache
21	cache: Cache,
22}
23
24impl StakeDistributionDataSourceImpl {
25	/// Creates a Stake Distribution new data source instance
26	pub fn new(pool: PgPool, metrics_opt: Option<McFollowerMetrics>, cache_size: usize) -> Self {
27		StakeDistributionDataSourceImpl { pool, metrics_opt, cache: Cache::new(cache_size) }
28	}
29}
30
31observed_async_trait!(
32impl BlockParticipationDataSource for StakeDistributionDataSourceImpl {
33	async fn get_stake_pool_delegation_distribution_for_pools(
34		&self,
35		epoch: McEpochNumber,
36		pool_hashes: &[MainchainKeyHash],
37	) -> Result<StakeDistribution, Box<dyn std::error::Error + Send + Sync>> {
38		let mut pool_hashes_to_query = Vec::<[u8; 28]>::new();
39		let mut stake_distribution = BTreeMap::<MainchainKeyHash, PoolDelegation>::new();
40
41		for pool_hash in pool_hashes {
42			match self.cache.get_distribution_for_pool(epoch, pool_hash) {
43				Some(pool_delegation) => {
44					stake_distribution.insert(*pool_hash, pool_delegation);
45				},
46				None => pool_hashes_to_query.push(pool_hash.0),
47			}
48		}
49		let rows = crate::db_model::get_stake_pool_delegations_for_pools(
50			&self.pool,
51			EpochNumber::from(epoch),
52			pool_hashes_to_query,
53		)
54		.await?;
55		let mut queried_pool_delegations = rows_to_distribution(rows);
56		self.cache.put_distribution_for_pools(epoch, queried_pool_delegations.clone());
57		stake_distribution.append(&mut queried_pool_delegations.0);
58		Ok(StakeDistribution(stake_distribution))
59	}
60}
61);
62
63fn rows_to_distribution(rows: Vec<StakePoolDelegationOutputRow>) -> StakeDistribution {
64	let mut res = BTreeMap::<MainchainKeyHash, PoolDelegation>::new();
65	for row in rows {
66		match get_delegator_key(&row) {
67			Ok(delegator_key) => {
68				let pool = res.entry(MainchainKeyHash(row.pool_hash_raw)).or_default();
69				pool.delegators
70					.entry(delegator_key)
71					.or_insert(DelegatorStakeAmount(row.epoch_stake_amount.0));
72				pool.total_stake.0 += row.epoch_stake_amount.0;
73			},
74			Err(e) => {
75				log::warn!("Failed to parse StakePoolDelegationOutputRow: {}", e)
76			},
77		}
78	}
79	StakeDistribution(res)
80}
81
82fn get_delegator_key(row: &StakePoolDelegationOutputRow) -> Result<DelegatorKey, String> {
83	match &row.stake_address_hash_raw[..] {
84		[0xe0 | 0xe1, rest @ ..] => Ok(DelegatorKey::StakeKeyHash(
85			rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
86		)),
87		[0xf0 | 0xf1, rest @ ..] => Ok(DelegatorKey::ScriptKeyHash {
88			hash_raw: rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"),
89			script_hash: row
90				.stake_address_script_hash
91				.ok_or("stake_address_script_hash must be present for script keys")?,
92		}),
93		_ => {
94			Err(format!("invalid stake address hash: {}", hex::encode(row.stake_address_hash_raw)))
95		},
96	}
97}
98
99type DistributionPerPoolCacheKey = (McEpochNumber, MainchainKeyHash);
100struct Cache {
101	distribution_per_pool_cache: Arc<Mutex<LruCache<DistributionPerPoolCacheKey, PoolDelegation>>>,
102}
103
104impl Cache {
105	fn new(cache_size: usize) -> Self {
106		Self {
107			distribution_per_pool_cache: Arc::new(Mutex::new(LruCache::new(
108				cache_size.try_into().unwrap(),
109			))),
110		}
111	}
112
113	fn get_distribution_for_pool(
114		&self,
115		epoch: McEpochNumber,
116		pool_hash: &MainchainKeyHash,
117	) -> Option<PoolDelegation> {
118		if let Ok(mut cache) = self.distribution_per_pool_cache.lock() {
119			cache.get(&(epoch, *pool_hash)).map(|e| e.clone())
120		} else {
121			None
122		}
123	}
124
125	fn put_distribution_for_pools(
126		&self,
127		epoch: McEpochNumber,
128		stake_distribution: StakeDistribution,
129	) {
130		if let Ok(mut cache) = self.distribution_per_pool_cache.lock() {
131			for (pool_hash, pool_delegation) in stake_distribution.0 {
132				cache.put((epoch, pool_hash), pool_delegation);
133			}
134		}
135	}
136}