partner_chains_db_sync_data_sources/native_token/
mod.rs1use crate::db_model::{Block, BlockNumber};
3use crate::metrics::McFollowerMetrics;
4use crate::observed_async_trait;
5use crate::{DataSourceError, Result};
6use itertools::Itertools;
7use sidechain_domain::*;
8use sp_native_token_management::{MainChainScripts, NativeTokenManagementDataSource};
9use sqlx::PgPool;
10use std::sync::{Arc, Mutex};
11
12#[cfg(test)]
13mod tests;
14
15pub struct NativeTokenManagementDataSourceImpl {
19 pub pool: PgPool,
21 pub metrics_opt: Option<McFollowerMetrics>,
23 security_parameter: u32,
25 cache_size: u16,
27 cache: Arc<Mutex<Cache>>,
29}
30
31observed_async_trait!(
32impl NativeTokenManagementDataSource for NativeTokenManagementDataSourceImpl {
33 async fn get_total_native_token_transfer(
36 &self,
37 after_block: Option<McBlockHash>,
38 to_block: McBlockHash,
39 scripts: MainChainScripts,
40 ) -> std::result::Result<NativeTokenAmount, Box<dyn std::error::Error + Send + Sync>> {
41 if let Some(after_block) = after_block {
42 if after_block == to_block {
43 Ok(NativeTokenAmount(0))
44 } else if let Some(amount) = self.get_from_cache(&after_block, &to_block, &scripts) {
45 log::debug!(
46 "Illiquid supply transfers sum from cache after block '{:?}' to block '{:?}' is {}",
47 after_block, to_block, amount.0
48 );
49 Ok(amount)
50 } else {
51 log::debug!("Illiquid supply transfers after block '{:?}' to block '{:?}' not found in cache.", after_block, to_block);
52 let block_to_amount = self
53 .get_data_to_cache(&after_block, &to_block, &scripts)
54 .await?;
55 log::debug!("Caching illiquid supply transfers from {} blocks", block_to_amount.len());
56
57 let amount = block_to_amount
58 .iter()
59 .skip(1) .take_while_inclusive(|(block_hash, _)| *block_hash != to_block)
61 .map(|(_, amount)| amount)
62 .sum();
63 log::debug!("Amount of illiquid supply transfers is {}", amount);
64
65 if let Ok(mut cache) = self.cache.lock() {
66 cache.update(block_to_amount, scripts)
67 }
68 Ok(NativeTokenAmount(amount))
69 }
70 } else {
71 let amount = self
72 .query_transfers_from_genesis(&to_block, &scripts)
73 .await?;
74 log::debug!("Amount of illiquid supply transfers from genesis to {} is {}", to_block, amount.0);
75 Ok(amount)
76 }
77 }
78});
79
80impl NativeTokenManagementDataSourceImpl {
81 pub async fn new(
83 pool: PgPool,
84 metrics_opt: Option<McFollowerMetrics>,
85 security_parameter: u32,
86 cache_size: u16,
87 ) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
88 crate::db_model::create_idx_tx_out_address(&pool).await?;
89 let cache = Default::default();
90 Ok(Self { pool, metrics_opt, security_parameter, cache_size, cache })
91 }
92
93 pub async fn new_from_env(
95 pool: PgPool,
96 metrics_opt: Option<McFollowerMetrics>,
97 ) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
98 let security_parameter: u32 = std::env::var("CARDANO_SECURITY_PARAMETER")
99 .ok()
100 .and_then(|s| s.parse().ok())
101 .ok_or("Couldn't read env variable CARDANO_SECURITY_PARAMETER as u32")?;
102 Self::new(pool, metrics_opt, security_parameter, 1000).await
103 }
104
105 fn get_from_cache(
106 &self,
107 after_block: &McBlockHash,
108 to_block: &McBlockHash,
109 scripts: &MainChainScripts,
110 ) -> Option<NativeTokenAmount> {
111 let cache = self.cache.lock().ok()?;
112 if cache.scripts.as_ref() == Some(scripts) {
113 cache.get_sum_in_range(after_block, to_block).map(NativeTokenAmount)
114 } else {
115 None
116 }
117 }
118
119 async fn get_data_to_cache(
122 &self,
123 from_block: &McBlockHash,
124 to_block: &McBlockHash,
125 scripts: &MainChainScripts,
126 ) -> Result<Vec<(McBlockHash, u128)>> {
127 let (from_block_no, to_block_no, latest_block) = futures::try_join!(
128 get_from_block_no(from_block, &self.pool),
129 get_to_block_no(to_block, &self.pool),
130 get_latest_block(&self.pool),
131 )?;
132 let latest_stable_block = latest_block.block_no.0.saturating_sub(self.security_parameter);
133
134 let cache_to_block_no = BlockNumber(std::cmp::min(
137 latest_stable_block,
138 std::cmp::max(to_block_no.0, from_block_no.0.saturating_add(self.cache_size.into())),
139 ));
140 let transfers = self.query_db(from_block_no, cache_to_block_no, scripts).await?;
142 Ok(transfers.iter().map(|t| (McBlockHash(t.block_hash), t.amount.0)).collect())
143 }
144
145 async fn query_db(
146 &self,
147 from_block: BlockNumber,
148 to_block: BlockNumber,
149 scripts: &MainChainScripts,
150 ) -> Result<Vec<crate::db_model::BlockTokenAmount>> {
151 let address = scripts.illiquid_supply_validator_address.clone().into();
152 let asset = to_db_asset(scripts);
153 Ok(crate::db_model::get_native_token_transfers(
154 &self.pool, from_block, to_block, asset, address,
155 )
156 .await?)
157 }
158
159 async fn query_transfers_from_genesis(
160 &self,
161 to_block: &McBlockHash,
162 scripts: &MainChainScripts,
163 ) -> Result<NativeTokenAmount> {
164 let to_block = get_to_block_no(to_block, &self.pool).await?;
165 Ok(crate::db_model::get_total_native_tokens_transfered(
166 &self.pool,
167 to_block,
168 to_db_asset(scripts),
169 scripts.illiquid_supply_validator_address.clone().into(),
170 )
171 .await?
172 .into())
173 }
174}
175
176fn to_db_asset(scripts: &MainChainScripts) -> crate::db_model::Asset {
177 crate::db_model::Asset {
178 policy_id: scripts.native_token_policy_id.clone().into(),
179 asset_name: scripts.native_token_asset_name.clone().into(),
180 }
181}
182
183async fn get_from_block_no(from_block: &McBlockHash, pool: &PgPool) -> Result<BlockNumber> {
184 Ok(crate::db_model::get_block_by_hash(pool, from_block.clone())
185 .await?
186 .ok_or(DataSourceError::ExpectedDataNotFound(format!(
187 "Lower bound block {from_block} not found when querying for native token transfers"
188 )))?
189 .block_no)
190}
191
192async fn get_to_block_no(to_block: &McBlockHash, pool: &PgPool) -> Result<BlockNumber> {
193 Ok(crate::db_model::get_block_by_hash(pool, to_block.clone())
194 .await?
195 .ok_or(DataSourceError::ExpectedDataNotFound(format!(
196 "Upper bound block {to_block} not found when querying for native token transfers"
197 )))?
198 .block_no)
199}
200
201async fn get_latest_block(pool: &PgPool) -> Result<Block> {
202 crate::db_model::get_latest_block_info(pool).await?.ok_or(
203 DataSourceError::ExpectedDataNotFound(
204 "The latest block not found when querying for native token transfers".to_string(),
205 ),
206 )
207}
208
209#[derive(Default)]
210pub(crate) struct Cache {
211 block_hash_to_amount: Vec<(McBlockHash, u128)>,
213 pub(crate) scripts: Option<MainChainScripts>,
214}
215
216impl Cache {
217 fn get_sum_in_range(&self, after: &McBlockHash, to: &McBlockHash) -> Option<u128> {
221 let after_idx = self.block_hash_to_amount.iter().position(|(block, _)| block == after)?;
222 let to_idx = self.block_hash_to_amount.iter().position(|(block, _)| block == to)?;
223 let after_to = self.block_hash_to_amount.get((after_idx + 1)..=to_idx)?;
224 Some(after_to.iter().map(|(_, amount)| amount).sum())
225 }
226
227 pub fn update(
228 &mut self,
229 block_hash_to_amount: Vec<(McBlockHash, u128)>,
230 scripts: MainChainScripts,
231 ) {
232 self.block_hash_to_amount = block_hash_to_amount;
233 self.scripts = Some(scripts);
234 }
235}