partner_chains_db_sync_data_sources/native_token/
mod.rs

1//! Db-Sync data source used by Partner Chain Native Token Management feature
2use crate::db_model::{Block, BlockNumber};
3use crate::metrics::McFollowerMetrics;
4use crate::observed_async_trait;
5use crate::{DataSourceError, Result};
6use itertools::Itertools;
7use sidechain_domain::*;
8use sp_native_token_management::{MainChainScripts, NativeTokenManagementDataSource};
9use sqlx::PgPool;
10use std::sync::{Arc, Mutex};
11
12#[cfg(test)]
13mod tests;
14
15/// Db-Sync data source for the Native Token Management feature of the Partner Chains toolkit.
16///
17/// See documentation for [sp_native_token_management] for information about the feature.
18pub struct NativeTokenManagementDataSourceImpl {
19	/// Postgres connection pool
20	pub pool: PgPool,
21	/// Prometheus metrics client
22	pub metrics_opt: Option<McFollowerMetrics>,
23	/// Cardano security parameter, ie. the number of confirmations needed to stabilize a block
24	security_parameter: u32,
25	/// Size of internal data cache
26	cache_size: u16,
27	/// Internal data cache
28	cache: Arc<Mutex<Cache>>,
29}
30
31observed_async_trait!(
32impl NativeTokenManagementDataSource for NativeTokenManagementDataSourceImpl {
33	// after_block is always less or equal to_block
34	// to_block is always a stable block
35	async fn get_total_native_token_transfer(
36		&self,
37		after_block: Option<McBlockHash>,
38		to_block: McBlockHash,
39		scripts: MainChainScripts,
40	) -> std::result::Result<NativeTokenAmount, Box<dyn std::error::Error + Send + Sync>> {
41		if let Some(after_block) = after_block {
42			if after_block == to_block {
43				Ok(NativeTokenAmount(0))
44			} else if let Some(amount) = self.get_from_cache(&after_block, &to_block, &scripts) {
45				log::debug!(
46					"Illiquid supply transfers sum from cache after block '{:?}' to block '{:?}' is {}",
47					after_block, to_block, amount.0
48				);
49				Ok(amount)
50			} else {
51				log::debug!("Illiquid supply transfers after block '{:?}' to block '{:?}' not found in cache.", after_block, to_block);
52				let block_to_amount = self
53					.get_data_to_cache(&after_block, &to_block, &scripts)
54					.await?;
55				log::debug!("Caching illiquid supply transfers from {} blocks", block_to_amount.len());
56
57				let amount = block_to_amount
58					.iter()
59					.skip(1) // the first element is the 'after_block' which is not included in the sum
60					.take_while_inclusive(|(block_hash, _)| *block_hash != to_block)
61					.map(|(_, amount)| amount)
62					.sum();
63				log::debug!("Amount of illiquid supply transfers is {}", amount);
64
65				if let Ok(mut cache) = self.cache.lock() {
66					cache.update(block_to_amount, scripts)
67				}
68				Ok(NativeTokenAmount(amount))
69			}
70		} else {
71			let amount = self
72				.query_transfers_from_genesis(&to_block, &scripts)
73				.await?;
74			log::debug!("Amount of illiquid supply transfers from genesis to {} is {}", to_block, amount.0);
75			Ok(amount)
76		}
77	}
78});
79
80impl NativeTokenManagementDataSourceImpl {
81	/// Creates a new instance of the data source
82	pub async fn new(
83		pool: PgPool,
84		metrics_opt: Option<McFollowerMetrics>,
85		security_parameter: u32,
86		cache_size: u16,
87	) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
88		crate::db_model::create_idx_tx_out_address(&pool).await?;
89		let cache = Default::default();
90		Ok(Self { pool, metrics_opt, security_parameter, cache_size, cache })
91	}
92
93	/// Creates a new instance of the data source, reading configuration from the environment
94	pub async fn new_from_env(
95		pool: PgPool,
96		metrics_opt: Option<McFollowerMetrics>,
97	) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
98		let security_parameter: u32 = std::env::var("CARDANO_SECURITY_PARAMETER")
99			.ok()
100			.and_then(|s| s.parse().ok())
101			.ok_or("Couldn't read env variable CARDANO_SECURITY_PARAMETER as u32")?;
102		Self::new(pool, metrics_opt, security_parameter, 1000).await
103	}
104
105	fn get_from_cache(
106		&self,
107		after_block: &McBlockHash,
108		to_block: &McBlockHash,
109		scripts: &MainChainScripts,
110	) -> Option<NativeTokenAmount> {
111		let cache = self.cache.lock().ok()?;
112		if cache.scripts.as_ref() == Some(scripts) {
113			cache.get_sum_in_range(after_block, to_block).map(NativeTokenAmount)
114		} else {
115			None
116		}
117	}
118
119	// invariant: to_block is always a stable block
120	// Returned data contains the 'from_block', it is required as guard for the cache
121	async fn get_data_to_cache(
122		&self,
123		from_block: &McBlockHash,
124		to_block: &McBlockHash,
125		scripts: &MainChainScripts,
126	) -> Result<Vec<(McBlockHash, u128)>> {
127		let (from_block_no, to_block_no, latest_block) = futures::try_join!(
128			get_from_block_no(from_block, &self.pool),
129			get_to_block_no(to_block, &self.pool),
130			get_latest_block(&self.pool),
131		)?;
132		let latest_stable_block = latest_block.block_no.0.saturating_sub(self.security_parameter);
133
134		// to_block_no is always a stable block, so it is not above latest_stable_block,
135		// but from_block_no + cache_size could be above latest_stable_block, so min has to be applied
136		let cache_to_block_no = BlockNumber(std::cmp::min(
137			latest_stable_block,
138			std::cmp::max(to_block_no.0, from_block_no.0.saturating_add(self.cache_size.into())),
139		));
140		// transfers starts with block having hash equal to after_block or genesis
141		let transfers = self.query_db(from_block_no, cache_to_block_no, scripts).await?;
142		Ok(transfers.iter().map(|t| (McBlockHash(t.block_hash), t.amount.0)).collect())
143	}
144
145	async fn query_db(
146		&self,
147		from_block: BlockNumber,
148		to_block: BlockNumber,
149		scripts: &MainChainScripts,
150	) -> Result<Vec<crate::db_model::BlockTokenAmount>> {
151		let address = scripts.illiquid_supply_validator_address.clone().into();
152		let asset = to_db_asset(scripts);
153		Ok(crate::db_model::get_native_token_transfers(
154			&self.pool, from_block, to_block, asset, address,
155		)
156		.await?)
157	}
158
159	async fn query_transfers_from_genesis(
160		&self,
161		to_block: &McBlockHash,
162		scripts: &MainChainScripts,
163	) -> Result<NativeTokenAmount> {
164		let to_block = get_to_block_no(to_block, &self.pool).await?;
165		Ok(crate::db_model::get_total_native_tokens_transfered(
166			&self.pool,
167			to_block,
168			to_db_asset(scripts),
169			scripts.illiquid_supply_validator_address.clone().into(),
170		)
171		.await?
172		.into())
173	}
174}
175
176fn to_db_asset(scripts: &MainChainScripts) -> crate::db_model::Asset {
177	crate::db_model::Asset {
178		policy_id: scripts.native_token_policy_id.clone().into(),
179		asset_name: scripts.native_token_asset_name.clone().into(),
180	}
181}
182
183async fn get_from_block_no(from_block: &McBlockHash, pool: &PgPool) -> Result<BlockNumber> {
184	Ok(crate::db_model::get_block_by_hash(pool, from_block.clone())
185		.await?
186		.ok_or(DataSourceError::ExpectedDataNotFound(format!(
187			"Lower bound block {from_block} not found when querying for native token transfers"
188		)))?
189		.block_no)
190}
191
192async fn get_to_block_no(to_block: &McBlockHash, pool: &PgPool) -> Result<BlockNumber> {
193	Ok(crate::db_model::get_block_by_hash(pool, to_block.clone())
194		.await?
195		.ok_or(DataSourceError::ExpectedDataNotFound(format!(
196			"Upper bound block {to_block} not found when querying for native token transfers"
197		)))?
198		.block_no)
199}
200
201async fn get_latest_block(pool: &PgPool) -> Result<Block> {
202	crate::db_model::get_latest_block_info(pool).await?.ok_or(
203		DataSourceError::ExpectedDataNotFound(
204			"The latest block not found when querying for native token transfers".to_string(),
205		),
206	)
207}
208
209#[derive(Default)]
210pub(crate) struct Cache {
211	/// Continous blocks with their respective total native token transfer amount
212	block_hash_to_amount: Vec<(McBlockHash, u128)>,
213	pub(crate) scripts: Option<MainChainScripts>,
214}
215
216impl Cache {
217	/// Returns the sum of native token transfer amounts after `after` and to `to` block
218	/// Returns None if `after` or `to` block is not found, because it indicates that the cache
219	/// doesn't contain the required blocks interval.
220	fn get_sum_in_range(&self, after: &McBlockHash, to: &McBlockHash) -> Option<u128> {
221		let after_idx = self.block_hash_to_amount.iter().position(|(block, _)| block == after)?;
222		let to_idx = self.block_hash_to_amount.iter().position(|(block, _)| block == to)?;
223		let after_to = self.block_hash_to_amount.get((after_idx + 1)..=to_idx)?;
224		Some(after_to.iter().map(|(_, amount)| amount).sum())
225	}
226
227	pub fn update(
228		&mut self,
229		block_hash_to_amount: Vec<(McBlockHash, u128)>,
230		scripts: MainChainScripts,
231	) {
232		self.block_hash_to_amount = block_hash_to_amount;
233		self.scripts = Some(scripts);
234	}
235}