partner_chains_db_sync_data_sources/governed_map/
mod.rs

1//! Db-Sync data source used by Partner Chain Governed Map feature
2use crate::DataSourceError::ExpectedDataNotFound;
3use crate::Result;
4use crate::block::BlockDataSourceImpl;
5use crate::db_model::{DbSyncConfigurationProvider, GovernedMapAction, TxInConfiguration};
6use crate::{metrics::McFollowerMetrics, observed_async_trait};
7use db_sync_sqlx::{Asset, BlockNumber};
8use itertools::Itertools;
9use log::warn;
10use partner_chains_plutus_data::governed_map::GovernedMapDatum;
11use sidechain_domain::byte_string::ByteString;
12use sidechain_domain::*;
13use sp_governed_map::{GovernedMapDataSource, MainChainScriptsV1};
14use sqlx::PgPool;
15use std::cmp::{max, min};
16use std::sync::{Arc, Mutex};
17
18#[cfg(test)]
19mod tests;
20
21/// Data source for the Governed Map feature of Partner Chains toolkit
22///
23/// See documentation of [sp_governed_map] for a description of the feature
24pub struct GovernedMapDataSourceImpl {
25	/// Postgres connection pool
26	pub pool: PgPool,
27	/// Prometheus metrics client
28	pub metrics_opt: Option<McFollowerMetrics>,
29	/// Configuration used by Db-Sync
30	db_sync_config: DbSyncConfigurationProvider,
31}
32
33impl GovernedMapDataSourceImpl {
34	/// Creates a new instance of the data source
35	pub async fn new(
36		pool: PgPool,
37		metrics_opt: Option<McFollowerMetrics>,
38	) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
39		crate::db_model::create_idx_tx_out_address(&pool).await?;
40		Ok(Self {
41			pool: pool.clone(),
42			metrics_opt,
43			db_sync_config: DbSyncConfigurationProvider::new(pool),
44		})
45	}
46}
47
48observed_async_trait!(
49impl GovernedMapDataSource for GovernedMapDataSourceImpl {
50	async fn get_state_at_block(
51		&self,
52		mc_block: McBlockHash,
53		scripts: MainChainScriptsV1,
54	) -> std::result::Result<BTreeMap<String, ByteString>, Box<dyn std::error::Error + Send + Sync>>
55	{
56		Ok(get_mappings_entries(
57			&self.pool,
58			mc_block,
59			scripts,
60			self.db_sync_config.get_tx_in_config().await?
61		).await?.into())
62	}
63
64	async fn get_mapping_changes(
65		&self,
66		since_mc_block: Option<McBlockHash>,
67		up_to_mc_block: McBlockHash,
68		scripts: MainChainScriptsV1,
69	) -> std::result::Result<
70		Vec<(String, Option<ByteString>)>,
71		Box<dyn std::error::Error + Send + Sync>,
72	> {
73		let current_mappings = self.get_state_at_block(up_to_mc_block, scripts.clone()).await?;
74		let Some(since_mc_block) = since_mc_block else {
75			let changes =
76				current_mappings.into_iter().map(|(key, value)| (key, Some(value))).collect();
77			return Ok(changes);
78		};
79		let previous_mappings = self.get_state_at_block(since_mc_block, scripts.clone()).await?;
80		let mut changes = vec![];
81		for (key, value) in current_mappings.iter() {
82			if previous_mappings.get(key) != Some(value) {
83				changes.push((key.clone(), Some(value.clone())));
84			}
85		}
86		for key in previous_mappings.keys() {
87			if !current_mappings.contains_key(key) {
88				changes.push((key.clone(), None));
89			}
90		}
91
92		Ok(changes)
93	}
94}
95);
96
97async fn get_mappings_entries(
98	pool: &PgPool,
99	hash: McBlockHash,
100	scripts: MainChainScriptsV1,
101	tx_in_config: TxInConfiguration,
102) -> Result<BTreeMap<String, ByteString>> {
103	let Some(block) = crate::db_model::get_block_by_hash(pool, hash.clone()).await? else {
104		return Err(ExpectedDataNotFound(format!("Block hash: {hash}")));
105	};
106	let entries = crate::db_model::get_datums_at_address_with_token(
107		pool,
108		&scripts.validator_address.into(),
109		block.block_no,
110		Asset::new(scripts.asset_policy_id),
111		tx_in_config,
112	)
113	.await?;
114
115	let mut mappings = BTreeMap::new();
116	for entry in entries {
117		match GovernedMapDatum::try_from(entry.datum.0) {
118			Ok(GovernedMapDatum { key, value }) => {
119				mappings.insert(key, value);
120			},
121			Err(err) => warn!("Failed decoding map entry: {err}"),
122		}
123	}
124
125	Ok(mappings)
126}
127
128/// Cached data source serving the Governed Map feature of Partner Chains toolkit
129///
130/// See documentation of [sp_governed_map] for a description of the feature
131pub struct GovernedMapDataSourceCachedImpl {
132	/// Postgres connection pool
133	pub pool: PgPool,
134	/// Prometheus metrics client
135	pub metrics_opt: Option<McFollowerMetrics>,
136	/// Internal data cache size
137	cache_size: u16,
138	/// Internal cache
139	cache: Arc<Mutex<Cache>>,
140	/// [BlockDataSourceImpl] instance shared with other data sources for cache reuse.
141	blocks: Arc<BlockDataSourceImpl>,
142	/// Configuration used by Db-Sync
143	db_sync_config: DbSyncConfigurationProvider,
144}
145
146impl GovernedMapDataSourceCachedImpl {
147	/// Constructs a new Governed Map data source
148	pub async fn new(
149		pool: PgPool,
150		metrics_opt: Option<McFollowerMetrics>,
151		cache_size: u16,
152		blocks: Arc<BlockDataSourceImpl>,
153	) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
154		crate::db_model::create_idx_tx_out_address(&pool).await?;
155		let cache = Default::default();
156		Ok(Self {
157			pool: pool.clone(),
158			metrics_opt,
159			cache_size,
160			cache,
161			blocks,
162			db_sync_config: DbSyncConfigurationProvider::new(pool),
163		})
164	}
165}
166
167observed_async_trait!(
168impl GovernedMapDataSource for GovernedMapDataSourceCachedImpl {
169	async fn get_state_at_block(
170		&self,
171		mc_block: McBlockHash,
172		scripts: MainChainScriptsV1,
173	) -> std::result::Result<BTreeMap<String, ByteString>, Box<dyn std::error::Error + Send + Sync>>
174	{
175		Ok(get_mappings_entries(&self.pool, mc_block, scripts, self.db_sync_config.get_tx_in_config().await?).await?.into())
176	}
177
178	async fn get_mapping_changes(
179		&self,
180		since_mc_block: Option<McBlockHash>,
181		up_to_mc_block: McBlockHash,
182		scripts: MainChainScriptsV1,
183	) -> std::result::Result<
184		Vec<(String, Option<ByteString>)>,
185		Box<dyn std::error::Error + Send + Sync>,
186	> {
187		if let Ok(mut cache) = self.cache.lock() {
188			cache.set_main_chain_scripts(scripts.clone());
189		};
190
191		let since_block_number = match since_mc_block {
192			Some(hash) => Some(
193				crate::db_model::get_block_by_hash(&self.pool, hash.clone())
194					.await?
195					.ok_or_else(|| Box::new(ExpectedDataNotFound(format!("Block hash: {hash}"))))?
196					.block_no,
197			),
198			None => None,
199		};
200
201		let Some(up_to_block) = self.blocks.get_block_by_hash(up_to_mc_block.clone()).await? else {
202			return Err(Box::new(ExpectedDataNotFound(format!("Block hash: {up_to_mc_block}"))));
203		};
204
205		let up_to_block_number = BlockNumber(up_to_block.number.0);
206
207		if let Some(cached_changes) =
208			self.get_changes_from_cache(since_block_number, up_to_block_number).await?
209		{
210			return Ok(cached_changes);
211		}
212
213		let latest_block_timestamp = self.blocks.get_latest_block_info().await?.timestamp;
214		let latest_stable_block =
215			match self.blocks.get_latest_stable_block_for(latest_block_timestamp.into()).await? {
216				Some(block) => BlockNumber(block.number.0),
217				None => up_to_block_number,
218			};
219		let since_block_plus =
220			BlockNumber(since_block_number.unwrap_or(BlockNumber(0)).0 + self.cache_size as u32);
221		let max_search_block = min(latest_stable_block, max(up_to_block_number, since_block_plus));
222
223		let changes = self
224			.get_changes_in_range_to_cache(since_block_number, max_search_block, scripts)
225			.await?;
226
227		if let Ok(mut cache) = self.cache.lock() {
228			cache.update(changes.clone());
229		}
230
231		Ok(filter_changes_in_range(changes, since_block_number, up_to_block_number))
232	}
233}
234);
235
236impl GovernedMapDataSourceCachedImpl {
237	async fn get_changes_from_cache(
238		&self,
239		since_block: Option<BlockNumber>,
240		up_to_block: BlockNumber,
241	) -> Result<Option<Vec<(String, Option<ByteString>)>>> {
242		if let Ok(cache) = self.cache.lock() {
243			Ok(cache.get_changes_in_range(since_block, up_to_block))
244		} else {
245			Ok(None)
246		}
247	}
248
249	async fn get_changes_in_range_to_cache(
250		&self,
251		since_block: Option<BlockNumber>,
252		up_to_block: BlockNumber,
253		scripts: MainChainScriptsV1,
254	) -> Result<Vec<Change>> {
255		let changes = crate::db_model::get_governed_map_changes(
256			&self.pool,
257			&scripts.validator_address.into(),
258			since_block,
259			up_to_block,
260			Asset::new(scripts.asset_policy_id),
261			self.db_sync_config.get_tx_in_config().await?,
262		)
263		.await?;
264
265		let mut result = Vec::new();
266
267		for change in changes {
268			let GovernedMapDatum { key, value } = match GovernedMapDatum::try_from(change.datum.0) {
269				Ok(datum) => datum,
270				Err(err) => {
271					warn!("Failed decoding map entry: {err}");
272					continue;
273				},
274			};
275			match change.action {
276				GovernedMapAction::Spend => result.push(Change::new(change.block_no, key, None)),
277				GovernedMapAction::Create => {
278					result.push(Change::new(change.block_no, key, Some(value)))
279				},
280			};
281		}
282		Ok(result)
283	}
284}
285
286#[derive(derive_new::new, Clone)]
287struct Change {
288	block_no: BlockNumber,
289	key: String,
290	value: Option<ByteString>,
291}
292
293#[derive(Default)]
294pub(crate) struct Cache {
295	highest_block_number: Option<BlockNumber>,
296	lowest_block_number: Option<BlockNumber>,
297	changes: Vec<Change>,
298	address: Option<MainchainAddress>,
299	policy_id: Option<PolicyId>,
300}
301
302fn filter_changes_in_range(
303	changes: Vec<Change>,
304	since_block: Option<BlockNumber>,
305	up_to_block: BlockNumber,
306) -> Vec<(String, Option<ByteString>)> {
307	changes
308		.into_iter()
309		.filter(|change| {
310			change.block_no.0 <= up_to_block.0
311				&& since_block.map(|b| change.block_no.0 > b.0).unwrap_or(true)
312		})
313		.map(|change| (change.key, change.value))
314		.collect()
315}
316
317impl Cache {
318	fn get_changes_in_range(
319		&self,
320		since_block: Option<BlockNumber>,
321		up_to_block: BlockNumber,
322	) -> Option<Vec<(String, Option<ByteString>)>> {
323		let Some(highest_block_number) = self.highest_block_number else {
324			return None;
325		};
326		let Some(lowest_block_number) = self.lowest_block_number else {
327			return None;
328		};
329
330		if highest_block_number.0 < up_to_block.0
331			|| since_block.map(|b| b.0 < lowest_block_number.0).unwrap_or(false)
332		{
333			return None;
334		}
335
336		Some(filter_changes_in_range(self.changes.clone(), since_block, up_to_block))
337	}
338
339	fn update(&mut self, changes: Vec<Change>) {
340		self.changes = changes;
341		let (lowest_block_number, highest_block_number) = self
342			.changes
343			.iter()
344			.minmax_by_key(|change| change.block_no.0)
345			.into_option()
346			.map(|(min, max)| (min.block_no, max.block_no))
347			.unwrap_or((BlockNumber(0), BlockNumber(0)));
348		self.lowest_block_number = Some(lowest_block_number);
349		self.highest_block_number = Some(highest_block_number);
350	}
351
352	fn set_main_chain_scripts(&mut self, scripts: MainChainScriptsV1) {
353		if self.address != Some(scripts.validator_address.clone())
354			|| self.policy_id != Some(scripts.asset_policy_id.clone())
355		{
356			self.changes.clear();
357			self.highest_block_number = None;
358			self.lowest_block_number = None;
359			self.address = Some(scripts.validator_address);
360			self.policy_id = Some(scripts.asset_policy_id);
361		}
362	}
363}