partner_chains_db_sync_data_sources/candidates/
cached.rs1use crate::candidates::CandidatesDataSourceImpl;
7use async_trait::async_trait;
8use authority_selection_inherents::*;
9use figment::{Figment, providers::Env};
10use log::info;
11use lru::LruCache;
12use serde::Deserialize;
13use sidechain_domain::*;
14use std::{
15 error::Error,
16 sync::{Arc, Mutex},
17};
18
19pub type ArcMut<T> = Arc<Mutex<T>>;
20
21type AriadneParametersCacheKey = (McEpochNumber, PolicyId, PolicyId);
22type CandidatesCacheKey = (McEpochNumber, String);
23pub struct CandidateDataSourceCached {
25 inner: CandidatesDataSourceImpl,
26 get_ariadne_parameters_for_epoch_cache:
27 ArcMut<LruCache<AriadneParametersCacheKey, AriadneParameters>>,
28 get_candidates_for_epoch_cache:
29 ArcMut<LruCache<CandidatesCacheKey, Vec<CandidateRegistrations>>>,
30 security_parameter: u32,
31 highest_seen_stable_epoch: ArcMut<Option<McEpochNumber>>,
32}
33
34#[async_trait]
35impl AuthoritySelectionDataSource for CandidateDataSourceCached {
36 async fn get_ariadne_parameters(
37 &self,
38 epoch: McEpochNumber,
39 d_parameter_policy: PolicyId,
40 permissioned_candidates_policy: PolicyId,
41 ) -> Result<AriadneParameters, Box<dyn std::error::Error + Send + Sync>> {
42 if self.can_use_caching_for_request(epoch).await? {
43 self.get_ariadne_parameters_with_caching(
44 epoch,
45 d_parameter_policy,
46 permissioned_candidates_policy,
47 )
48 .await
49 } else {
50 self.inner
51 .get_ariadne_parameters(epoch, d_parameter_policy, permissioned_candidates_policy)
52 .await
53 }
54 }
55
56 async fn get_candidates(
57 &self,
58 epoch: McEpochNumber,
59 committee_candidate_address: MainchainAddress,
60 ) -> Result<Vec<CandidateRegistrations>, Box<dyn std::error::Error + Send + Sync>> {
61 if self.can_use_caching_for_request(epoch).await? {
62 self.get_candidates_with_caching(epoch, committee_candidate_address).await
63 } else {
64 self.inner.get_candidates(epoch, committee_candidate_address).await
65 }
66 }
67
68 async fn get_epoch_nonce(
69 &self,
70 epoch: McEpochNumber,
71 ) -> Result<Option<EpochNonce>, Box<dyn std::error::Error + Send + Sync>> {
72 self.inner.get_epoch_nonce(epoch).await
73 }
74
75 async fn data_epoch(
76 &self,
77 for_epoch: McEpochNumber,
78 ) -> Result<McEpochNumber, Box<dyn std::error::Error + Send + Sync>> {
79 self.inner.data_epoch(for_epoch).await
80 }
81}
82
83#[derive(Debug, Clone, Deserialize)]
84pub struct CandidateDataSourceCacheConfig {
85 pub cardano_security_parameter: u32,
86}
87
88impl CandidateDataSourceCacheConfig {
89 pub fn from_env() -> std::result::Result<Self, Box<dyn Error + Send + Sync + 'static>> {
90 let config: Self = Figment::new()
91 .merge(Env::raw())
92 .extract()
93 .map_err(|e| format!("Failed to read candidates data source config: {e}"))?;
94 info!("Using candidate data source configuration: {config:?}");
95 Ok(config)
96 }
97}
98
99impl CandidateDataSourceCached {
100 pub fn new(
102 inner: CandidatesDataSourceImpl,
103 candidates_for_epoch_cache_size: usize,
104 security_parameter: u32,
105 ) -> Self {
106 Self {
107 inner,
108 get_ariadne_parameters_for_epoch_cache: Arc::new(Mutex::new(LruCache::new(
109 candidates_for_epoch_cache_size.try_into().unwrap(),
110 ))),
111 get_candidates_for_epoch_cache: Arc::new(Mutex::new(LruCache::new(
112 candidates_for_epoch_cache_size.try_into().unwrap(),
113 ))),
114 security_parameter,
115 highest_seen_stable_epoch: Arc::new(Mutex::new(None)),
116 }
117 }
118
119 pub fn new_from_env(
121 inner: CandidatesDataSourceImpl,
122 candidates_for_epoch_cache_size: usize,
123 ) -> std::result::Result<Self, Box<dyn Error + Send + Sync + 'static>> {
124 let config = CandidateDataSourceCacheConfig::from_env()?;
125 Ok(Self::new(inner, candidates_for_epoch_cache_size, config.cardano_security_parameter))
126 }
127
128 async fn get_candidates_with_caching(
129 &self,
130 epoch: McEpochNumber,
131 committee_candidate_address: MainchainAddress,
132 ) -> Result<Vec<CandidateRegistrations>, Box<dyn std::error::Error + Send + Sync>> {
133 log::debug!("get_candidates_with_caching({:?})", epoch.0);
134 let key = (epoch, committee_candidate_address.to_string());
135 if let Ok(mut cache) = self.get_candidates_for_epoch_cache.lock() {
136 if let Some(resp) = cache.get(&key) {
137 log::debug!("Serving cached candidates for epoch: {:?}", epoch.0);
138 return Ok(resp.clone());
139 }
140 }
141
142 let response = self.inner.get_candidates(epoch, committee_candidate_address).await?;
143 if let Ok(mut cache) = self.get_candidates_for_epoch_cache.lock() {
144 log::debug!("Caching candidates for epoch: {:?}", epoch.0);
145 cache.put(key, response.clone());
146 }
147 Ok(response)
148 }
149
150 async fn get_ariadne_parameters_with_caching(
152 &self,
153 epoch: McEpochNumber,
154 d_parameter_validator: PolicyId,
155 permissioned_candidates_validator: PolicyId,
156 ) -> Result<AriadneParameters, Box<dyn std::error::Error + Send + Sync>> {
157 log::debug!("get_ariadne_parameters_with_caching({:?})", epoch.0);
158 let key = (epoch, d_parameter_validator.clone(), permissioned_candidates_validator.clone());
159 if let Ok(mut cache) = self.get_ariadne_parameters_for_epoch_cache.lock() {
160 if let Some(resp) = cache.get(&key) {
161 log::debug!("Serving cached ariadne parameters for epoch: {:?}", epoch.0);
162 return Ok(resp.clone());
163 }
164 }
165
166 let response = self
167 .inner
168 .get_ariadne_parameters(epoch, d_parameter_validator, permissioned_candidates_validator)
169 .await?;
170 if let Ok(mut cache) = self.get_ariadne_parameters_for_epoch_cache.lock() {
171 log::debug!("Caching ariadne parameters for epoch: {:?}", epoch.0);
172 cache.put(key, response.clone());
173 }
174 Ok(response)
175 }
176
177 async fn can_use_caching_for_request(
178 &self,
179 request_epoch: McEpochNumber,
180 ) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
181 let data_epoch = self.inner.data_epoch(request_epoch).await?;
182 if let Ok(stable_epoch) = self.highest_seen_stable_epoch.lock() {
183 if stable_epoch.map_or(false, |stable_epoch| stable_epoch >= data_epoch) {
184 return Ok(true);
185 }
186 }
187 match crate::db_model::get_latest_stable_epoch(&self.inner.pool, self.security_parameter)
188 .await?
189 {
190 Some(stable_epoch) => {
191 let stable_epoch = McEpochNumber(stable_epoch.0);
192 if let Ok(mut highest_seen_stable_epoch) = self.highest_seen_stable_epoch.lock() {
193 *highest_seen_stable_epoch = Some(stable_epoch);
194 }
195 Ok(data_epoch <= stable_epoch)
196 },
197 None => Ok(false),
198 }
199 }
200}