partner_chains_db_sync_data_sources/governed_map/
mod.rs

1//! Db-Sync data source used by Partner Chain Governed Map feature
2use crate::DataSourceError::ExpectedDataNotFound;
3use crate::Result;
4use crate::block::BlockDataSourceImpl;
5use crate::db_model::GovernedMapAction;
6use crate::{metrics::McFollowerMetrics, observed_async_trait};
7use db_sync_sqlx::{Asset, BlockNumber};
8use itertools::Itertools;
9use log::warn;
10use partner_chains_plutus_data::governed_map::GovernedMapDatum;
11use sidechain_domain::byte_string::ByteString;
12use sidechain_domain::*;
13use sp_governed_map::{GovernedMapDataSource, MainChainScriptsV1};
14use sqlx::PgPool;
15use std::cmp::{max, min};
16use std::sync::{Arc, Mutex};
17
18#[cfg(test)]
19mod tests;
20
21/// Data source for the Governed Map feature of Partner Chains toolkit
22///
23/// See documentation of [sp_governed_map] for a description of the feature
24pub struct GovernedMapDataSourceImpl {
25	/// Postgres connection pool
26	pub pool: PgPool,
27	/// Prometheus metrics client
28	pub metrics_opt: Option<McFollowerMetrics>,
29}
30
31impl GovernedMapDataSourceImpl {
32	/// Creates a new instance of the data source
33	pub async fn new(
34		pool: PgPool,
35		metrics_opt: Option<McFollowerMetrics>,
36	) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
37		crate::db_model::create_idx_tx_out_address(&pool).await?;
38		Ok(Self { pool, metrics_opt })
39	}
40}
41
42observed_async_trait!(
43impl GovernedMapDataSource for GovernedMapDataSourceImpl {
44	async fn get_state_at_block(
45		&self,
46		mc_block: McBlockHash,
47		scripts: MainChainScriptsV1,
48	) -> std::result::Result<BTreeMap<String, ByteString>, Box<dyn std::error::Error + Send + Sync>>
49	{
50		Ok(get_mappings_entries(&self.pool, mc_block, scripts).await?.into())
51	}
52
53	async fn get_mapping_changes(
54		&self,
55		since_mc_block: Option<McBlockHash>,
56		up_to_mc_block: McBlockHash,
57		scripts: MainChainScriptsV1,
58	) -> std::result::Result<
59		Vec<(String, Option<ByteString>)>,
60		Box<dyn std::error::Error + Send + Sync>,
61	> {
62		let current_mappings = self.get_state_at_block(up_to_mc_block, scripts.clone()).await?;
63		let Some(since_mc_block) = since_mc_block else {
64			let changes =
65				current_mappings.into_iter().map(|(key, value)| (key, Some(value))).collect();
66			return Ok(changes);
67		};
68		let previous_mappings = self.get_state_at_block(since_mc_block, scripts.clone()).await?;
69		let mut changes = vec![];
70		for (key, value) in current_mappings.iter() {
71			if previous_mappings.get(key) != Some(value) {
72				changes.push((key.clone(), Some(value.clone())));
73			}
74		}
75		for key in previous_mappings.keys() {
76			if !current_mappings.contains_key(key) {
77				changes.push((key.clone(), None));
78			}
79		}
80
81		Ok(changes)
82	}
83}
84);
85
86async fn get_mappings_entries(
87	pool: &PgPool,
88	hash: McBlockHash,
89	scripts: MainChainScriptsV1,
90) -> Result<BTreeMap<String, ByteString>> {
91	let Some(block) = crate::db_model::get_block_by_hash(pool, hash.clone()).await? else {
92		return Err(ExpectedDataNotFound(format!("Block hash: {hash}")));
93	};
94	let entries = crate::db_model::get_datums_at_address_with_token(
95		pool,
96		&scripts.validator_address.into(),
97		block.block_no,
98		Asset::new(scripts.asset_policy_id),
99	)
100	.await?;
101
102	let mut mappings = BTreeMap::new();
103	for entry in entries {
104		match GovernedMapDatum::try_from(entry.datum.0) {
105			Ok(GovernedMapDatum { key, value }) => {
106				mappings.insert(key, value);
107			},
108			Err(err) => warn!("Failed decoding map entry: {err}"),
109		}
110	}
111
112	Ok(mappings)
113}
114
115/// Cached data source serving the Governed Map feature of Partner Chains toolkit
116///
117/// See documentation of [sp_governed_map] for a description of the feature
118pub struct GovernedMapDataSourceCachedImpl {
119	/// Postgres connection pool
120	pub pool: PgPool,
121	/// Prometheus metrics client
122	pub metrics_opt: Option<McFollowerMetrics>,
123	/// Internal data cache size
124	cache_size: u16,
125	/// Internal cache
126	cache: Arc<Mutex<Cache>>,
127	/// [BlockDataSourceImpl] instance shared with other data sources for cache reuse.
128	blocks: Arc<BlockDataSourceImpl>,
129}
130
131impl GovernedMapDataSourceCachedImpl {
132	/// Constructs a new Governed Map data source
133	pub async fn new(
134		pool: PgPool,
135		metrics_opt: Option<McFollowerMetrics>,
136		cache_size: u16,
137		blocks: Arc<BlockDataSourceImpl>,
138	) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
139		crate::db_model::create_idx_tx_out_address(&pool).await?;
140		let cache = Default::default();
141		Ok(Self { pool, metrics_opt, cache_size, cache, blocks })
142	}
143}
144
145observed_async_trait!(
146impl GovernedMapDataSource for GovernedMapDataSourceCachedImpl {
147	async fn get_state_at_block(
148		&self,
149		mc_block: McBlockHash,
150		scripts: MainChainScriptsV1,
151	) -> std::result::Result<BTreeMap<String, ByteString>, Box<dyn std::error::Error + Send + Sync>>
152	{
153		Ok(get_mappings_entries(&self.pool, mc_block, scripts).await?.into())
154	}
155
156	async fn get_mapping_changes(
157		&self,
158		since_mc_block: Option<McBlockHash>,
159		up_to_mc_block: McBlockHash,
160		scripts: MainChainScriptsV1,
161	) -> std::result::Result<
162		Vec<(String, Option<ByteString>)>,
163		Box<dyn std::error::Error + Send + Sync>,
164	> {
165		if let Ok(mut cache) = self.cache.lock() {
166			cache.set_main_chain_scripts(scripts.clone());
167		};
168
169		let since_block_number = match since_mc_block {
170			Some(hash) => Some(
171				crate::db_model::get_block_by_hash(&self.pool, hash.clone())
172					.await?
173					.ok_or_else(|| Box::new(ExpectedDataNotFound(format!("Block hash: {hash}"))))?
174					.block_no,
175			),
176			None => None,
177		};
178
179		let Some(up_to_block) = self.blocks.get_block_by_hash(up_to_mc_block.clone()).await? else {
180			return Err(Box::new(ExpectedDataNotFound(format!("Block hash: {up_to_mc_block}"))));
181		};
182
183		let up_to_block_number = BlockNumber(up_to_block.number.0);
184
185		if let Some(cached_changes) =
186			self.get_changes_from_cache(since_block_number, up_to_block_number).await?
187		{
188			return Ok(cached_changes);
189		}
190
191		let latest_block_timestamp = self.blocks.get_latest_block_info().await?.timestamp;
192		let latest_stable_block =
193			match self.blocks.get_latest_stable_block_for(latest_block_timestamp.into()).await? {
194				Some(block) => BlockNumber(block.number.0),
195				None => up_to_block_number,
196			};
197		let since_block_plus =
198			BlockNumber(since_block_number.unwrap_or(BlockNumber(0)).0 + self.cache_size as u32);
199		let max_search_block = min(latest_stable_block, max(up_to_block_number, since_block_plus));
200
201		let changes = self
202			.get_changes_in_range_to_cache(since_block_number, max_search_block, scripts)
203			.await?;
204
205		if let Ok(mut cache) = self.cache.lock() {
206			cache.update(changes.clone());
207		}
208
209		Ok(filter_changes_in_range(changes, since_block_number, up_to_block_number))
210	}
211}
212);
213
214impl GovernedMapDataSourceCachedImpl {
215	async fn get_changes_from_cache(
216		&self,
217		since_block: Option<BlockNumber>,
218		up_to_block: BlockNumber,
219	) -> Result<Option<Vec<(String, Option<ByteString>)>>> {
220		if let Ok(cache) = self.cache.lock() {
221			Ok(cache.get_changes_in_range(since_block, up_to_block))
222		} else {
223			Ok(None)
224		}
225	}
226
227	async fn get_changes_in_range_to_cache(
228		&self,
229		since_block: Option<BlockNumber>,
230		up_to_block: BlockNumber,
231		scripts: MainChainScriptsV1,
232	) -> Result<Vec<Change>> {
233		let changes = crate::db_model::get_changes(
234			&self.pool,
235			&scripts.validator_address.into(),
236			since_block,
237			up_to_block,
238			Asset::new(scripts.asset_policy_id),
239		)
240		.await?;
241
242		let mut result = Vec::new();
243
244		for change in changes {
245			let GovernedMapDatum { key, value } = match GovernedMapDatum::try_from(change.datum.0) {
246				Ok(datum) => datum,
247				Err(err) => {
248					warn!("Failed decoding map entry: {err}");
249					continue;
250				},
251			};
252			match change.action {
253				GovernedMapAction::Spend => result.push(Change::new(change.block_no, key, None)),
254				GovernedMapAction::Create => {
255					result.push(Change::new(change.block_no, key, Some(value)))
256				},
257			};
258		}
259		Ok(result)
260	}
261}
262
263#[derive(derive_new::new, Clone)]
264struct Change {
265	block_no: BlockNumber,
266	key: String,
267	value: Option<ByteString>,
268}
269
270#[derive(Default)]
271pub(crate) struct Cache {
272	highest_block_number: Option<BlockNumber>,
273	lowest_block_number: Option<BlockNumber>,
274	changes: Vec<Change>,
275	address: Option<MainchainAddress>,
276	policy_id: Option<PolicyId>,
277}
278
279fn filter_changes_in_range(
280	changes: Vec<Change>,
281	since_block: Option<BlockNumber>,
282	up_to_block: BlockNumber,
283) -> Vec<(String, Option<ByteString>)> {
284	changes
285		.into_iter()
286		.filter(|change| {
287			change.block_no.0 <= up_to_block.0
288				&& since_block.map(|b| change.block_no.0 > b.0).unwrap_or(true)
289		})
290		.map(|change| (change.key, change.value))
291		.collect()
292}
293
294impl Cache {
295	fn get_changes_in_range(
296		&self,
297		since_block: Option<BlockNumber>,
298		up_to_block: BlockNumber,
299	) -> Option<Vec<(String, Option<ByteString>)>> {
300		let Some(highest_block_number) = self.highest_block_number else {
301			return None;
302		};
303		let Some(lowest_block_number) = self.lowest_block_number else {
304			return None;
305		};
306
307		if highest_block_number.0 < up_to_block.0
308			|| since_block.map(|b| b.0 < lowest_block_number.0).unwrap_or(false)
309		{
310			return None;
311		}
312
313		Some(filter_changes_in_range(self.changes.clone(), since_block, up_to_block))
314	}
315
316	fn update(&mut self, changes: Vec<Change>) {
317		self.changes = changes;
318		let (lowest_block_number, highest_block_number) = self
319			.changes
320			.iter()
321			.minmax_by_key(|change| change.block_no.0)
322			.into_option()
323			.map(|(min, max)| (min.block_no, max.block_no))
324			.unwrap_or((BlockNumber(0), BlockNumber(0)));
325		self.lowest_block_number = Some(lowest_block_number);
326		self.highest_block_number = Some(highest_block_number);
327	}
328
329	fn set_main_chain_scripts(&mut self, scripts: MainChainScriptsV1) {
330		if self.address != Some(scripts.validator_address.clone())
331			|| self.policy_id != Some(scripts.asset_policy_id.clone())
332		{
333			self.changes.clear();
334			self.highest_block_number = None;
335			self.lowest_block_number = None;
336			self.address = Some(scripts.validator_address);
337			self.policy_id = Some(scripts.asset_policy_id);
338		}
339	}
340}