partner_chains_db_sync_data_sources/candidates/
cached.rs

1//! Provides caches for call which results are immutable,
2//! like epoch candidates queries made by inherent data producers.
3//! Also provides a way of batching and caching incoming transactions,
4//! to allow efficient queries when syncing the chain.
5
6use crate::candidates::CandidatesDataSourceImpl;
7use async_trait::async_trait;
8use authority_selection_inherents::*;
9use figment::{Figment, providers::Env};
10use log::info;
11use lru::LruCache;
12use serde::Deserialize;
13use sidechain_domain::*;
14use std::{
15	error::Error,
16	sync::{Arc, Mutex},
17};
18
19pub type ArcMut<T> = Arc<Mutex<T>>;
20
21type AriadneParametersCacheKey = (McEpochNumber, PolicyId, PolicyId);
22type CandidatesCacheKey = (McEpochNumber, String);
23/// Cached candidate data source
24pub struct CandidateDataSourceCached {
25	inner: CandidatesDataSourceImpl,
26	get_ariadne_parameters_for_epoch_cache:
27		ArcMut<LruCache<AriadneParametersCacheKey, AriadneParameters>>,
28	get_candidates_for_epoch_cache:
29		ArcMut<LruCache<CandidatesCacheKey, Vec<CandidateRegistrations>>>,
30	security_parameter: u32,
31	highest_seen_stable_epoch: ArcMut<Option<McEpochNumber>>,
32}
33
34#[async_trait]
35impl AuthoritySelectionDataSource for CandidateDataSourceCached {
36	async fn get_ariadne_parameters(
37		&self,
38		epoch: McEpochNumber,
39		d_parameter_policy: PolicyId,
40		permissioned_candidates_policy: PolicyId,
41	) -> Result<AriadneParameters, Box<dyn std::error::Error + Send + Sync>> {
42		if self.can_use_caching_for_request(epoch).await? {
43			self.get_ariadne_parameters_with_caching(
44				epoch,
45				d_parameter_policy,
46				permissioned_candidates_policy,
47			)
48			.await
49		} else {
50			self.inner
51				.get_ariadne_parameters(epoch, d_parameter_policy, permissioned_candidates_policy)
52				.await
53		}
54	}
55
56	async fn get_candidates(
57		&self,
58		epoch: McEpochNumber,
59		committee_candidate_address: MainchainAddress,
60	) -> Result<Vec<CandidateRegistrations>, Box<dyn std::error::Error + Send + Sync>> {
61		if self.can_use_caching_for_request(epoch).await? {
62			self.get_candidates_with_caching(epoch, committee_candidate_address).await
63		} else {
64			self.inner.get_candidates(epoch, committee_candidate_address).await
65		}
66	}
67
68	async fn get_epoch_nonce(
69		&self,
70		epoch: McEpochNumber,
71	) -> Result<Option<EpochNonce>, Box<dyn std::error::Error + Send + Sync>> {
72		self.inner.get_epoch_nonce(epoch).await
73	}
74
75	async fn data_epoch(
76		&self,
77		for_epoch: McEpochNumber,
78	) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
79		self.inner.data_epoch(for_epoch).await
80	}
81}
82
83#[derive(Debug, Clone, Deserialize)]
84pub struct CandidateDataSourceCacheConfig {
85	pub cardano_security_parameter: u32,
86}
87
88impl CandidateDataSourceCacheConfig {
89	pub fn from_env() -> std::result::Result<Self, Box<dyn Error + Send + Sync + 'static>> {
90		let config: Self = Figment::new()
91			.merge(Env::raw())
92			.extract()
93			.map_err(|e| format!("Failed to read candidates data source config: {e}"))?;
94		info!("Using candidate data source configuration: {config:?}");
95		Ok(config)
96	}
97}
98
99impl CandidateDataSourceCached {
100	/// Creates new instance of the data source
101	pub fn new(
102		inner: CandidatesDataSourceImpl,
103		candidates_for_epoch_cache_size: usize,
104		security_parameter: u32,
105	) -> Self {
106		Self {
107			inner,
108			get_ariadne_parameters_for_epoch_cache: Arc::new(Mutex::new(LruCache::new(
109				candidates_for_epoch_cache_size.try_into().unwrap(),
110			))),
111			get_candidates_for_epoch_cache: Arc::new(Mutex::new(LruCache::new(
112				candidates_for_epoch_cache_size.try_into().unwrap(),
113			))),
114			security_parameter,
115			highest_seen_stable_epoch: Arc::new(Mutex::new(None)),
116		}
117	}
118
119	/// Creates a new instance of the data source, reading configuration from the environment.
120	pub fn new_from_env(
121		inner: CandidatesDataSourceImpl,
122		candidates_for_epoch_cache_size: usize,
123	) -> std::result::Result<Self, Box<dyn Error + Send + Sync + 'static>> {
124		let config = CandidateDataSourceCacheConfig::from_env()?;
125		Ok(Self::new(inner, candidates_for_epoch_cache_size, config.cardano_security_parameter))
126	}
127
128	async fn get_candidates_with_caching(
129		&self,
130		epoch: McEpochNumber,
131		committee_candidate_address: MainchainAddress,
132	) -> Result<Vec<CandidateRegistrations>, Box<dyn std::error::Error + Send + Sync>> {
133		log::debug!("get_candidates_with_caching({:?})", epoch.0);
134		let key = (epoch, committee_candidate_address.to_string());
135		if let Ok(mut cache) = self.get_candidates_for_epoch_cache.lock() {
136			if let Some(resp) = cache.get(&key) {
137				log::debug!("Serving cached candidates for epoch: {:?}", epoch.0);
138				return Ok(resp.clone());
139			}
140		}
141
142		let response = self.inner.get_candidates(epoch, committee_candidate_address).await?;
143		if let Ok(mut cache) = self.get_candidates_for_epoch_cache.lock() {
144			log::debug!("Caching candidates for epoch: {:?}", epoch.0);
145			cache.put(key, response.clone());
146		}
147		Ok(response)
148	}
149
150	// Use only for stable epochs
151	async fn get_ariadne_parameters_with_caching(
152		&self,
153		epoch: McEpochNumber,
154		d_parameter_validator: PolicyId,
155		permissioned_candidates_validator: PolicyId,
156	) -> Result<AriadneParameters, Box<dyn std::error::Error + Send + Sync>> {
157		log::debug!("get_ariadne_parameters_with_caching({:?})", epoch.0);
158		let key = (epoch, d_parameter_validator.clone(), permissioned_candidates_validator.clone());
159		if let Ok(mut cache) = self.get_ariadne_parameters_for_epoch_cache.lock() {
160			if let Some(resp) = cache.get(&key) {
161				log::debug!("Serving cached ariadne parameters for epoch: {:?}", epoch.0);
162				return Ok(resp.clone());
163			}
164		}
165
166		let response = self
167			.inner
168			.get_ariadne_parameters(epoch, d_parameter_validator, permissioned_candidates_validator)
169			.await?;
170		if let Ok(mut cache) = self.get_ariadne_parameters_for_epoch_cache.lock() {
171			log::debug!("Caching ariadne parameters for epoch: {:?}", epoch.0);
172			cache.put(key, response.clone());
173		}
174		Ok(response)
175	}
176
177	async fn can_use_caching_for_request(
178		&self,
179		request_epoch: McEpochNumber,
180	) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
181		let data_epoch = self.inner.data_epoch(request_epoch).await?;
182		if let Ok(stable_epoch) = self.highest_seen_stable_epoch.lock() {
183			if stable_epoch.map_or(false, |stable_epoch| stable_epoch >= data_epoch) {
184				return Ok(true);
185			}
186		}
187		match crate::db_model::get_latest_stable_epoch(&self.inner.pool, self.security_parameter)
188			.await?
189		{
190			Some(stable_epoch) => {
191				let stable_epoch = McEpochNumber(stable_epoch.0);
192				if let Ok(mut highest_seen_stable_epoch) = self.highest_seen_stable_epoch.lock() {
193					*highest_seen_stable_epoch = Some(stable_epoch);
194				}
195				Ok(data_epoch <= stable_epoch)
196			},
197			None => Ok(false),
198		}
199	}
200}