partner_chains_db_sync_data_sources/bridge/
mod.rs

1//! Db-Sync data source used by the Partner Chain token bridge observability
2//!
3//! # Assumptions
4//!
5//! The data source implementation assumes that the utxos found at the illiquid circulating
6//! supply address conform to rules that are enforced by the Partner Chains smart contracts.
7//!
8//! Most importantly, transactions that spend any UTXOs from the ICS can only create at most
9//! one new UTXO at the ICS address. Conversely, transactions that create more than one UTXO
10//! at the illiquid supply address can only spend UTXOs from outside of it. This guarantees
11//! that the observability layer can always correctly identify the number of tokens transfered
12//! by calculating the delta of `tokens in the new UTXO` - `tokens in the old ICS UTXOs`.
13//!
14//! # Usage
15//!
16//! ```rust
17//! use partner_chains_db_sync_data_sources::*;
18//! use sqlx::PgPool;
19//! use std::{ error::Error, sync::Arc };
20//!
21//! // Number of stable blocks ahead the bridge data source should try to cache.
22//! // This is only possible when the node is catching up and speeds up syncing.
23//! const BRIDGE_TRANSFER_CACHE_LOOKAHEAD: u32 = 128;
24//!
25//! pub async fn create_data_sources(
26//!     pool: PgPool,
27//!     metrics_opt: Option<McFollowerMetrics>,
28//! ) -> Result<(/* other data sources */ CachedTokenBridgeDataSourceImpl), Box<dyn Error + Send + Sync>> {
29//!     // block data source is reused between various other data sources
30//!     let block = Arc::new(BlockDataSourceImpl::new_from_env(pool.clone()).await?);
31//!
32//!     // create other data sources
33//!
34//!     let bridge = CachedTokenBridgeDataSourceImpl::new(
35//!         pool,
36//!         metrics_opt,
37//!         block,
38//!         BRIDGE_TRANSFER_CACHE_LOOKAHEAD,
39//!	    );
40//!
41//!     Ok((/* other data sources */ bridge))
42//! }
43//! ```
44
45use crate::McFollowerMetrics;
46use crate::db_model::*;
47use crate::metrics::observed_async_trait;
48use partner_chains_plutus_data::bridge::{TokenTransferDatum, TokenTransferDatumV1};
49use sidechain_domain::McBlockHash;
50use sp_partner_chains_bridge::*;
51use sqlx::PgPool;
52use std::fmt::Debug;
53
54#[cfg(test)]
55mod tests;
56
57pub(crate) mod cache;
58
59/// Db-Sync data source serving data for Partner Chains token bridge
60pub struct TokenBridgeDataSourceImpl {
61	/// Postgres connection pool
62	pool: PgPool,
63	/// Prometheus metrics client
64	metrics_opt: Option<McFollowerMetrics>,
65	/// Configuration used by Db-Sync
66	db_sync_config: DbSyncConfigurationProvider,
67}
68
69impl TokenBridgeDataSourceImpl {
70	/// Crates a new token bridge data source
71	pub fn new(pool: PgPool, metrics_opt: Option<McFollowerMetrics>) -> Self {
72		Self { db_sync_config: DbSyncConfigurationProvider::new(pool.clone()), pool, metrics_opt }
73	}
74}
75
76observed_async_trait!(
77	impl<RecipientAddress> TokenBridgeDataSource<RecipientAddress> for TokenBridgeDataSourceImpl
78	where
79		RecipientAddress: Debug,
80		RecipientAddress: (for<'a> TryFrom<&'a [u8]>),
81	{
82		async fn get_transfers(
83			&self,
84			main_chain_scripts: MainChainScripts,
85			data_checkpoint: BridgeDataCheckpoint,
86			max_transfers: u32,
87			current_mc_block_hash: McBlockHash,
88		) -> Result<
89			(Vec<BridgeTransferV1<RecipientAddress>>, BridgeDataCheckpoint),
90			Box<dyn std::error::Error + Send + Sync>,
91		> {
92			let asset = Asset {
93				policy_id: main_chain_scripts.token_policy_id.into(),
94				asset_name: main_chain_scripts.token_asset_name.into(),
95			};
96
97			let current_mc_block = get_block_by_hash(&self.pool, current_mc_block_hash.clone())
98				.await?
99				.ok_or(format!("Could not find block for hash {current_mc_block_hash:?}"))?;
100
101			let data_checkpoint = match data_checkpoint {
102				BridgeDataCheckpoint::Utxo(utxo) => {
103					let TxBlockInfo { block_number, tx_ix } =
104						get_block_info_for_utxo(&self.pool, utxo.tx_hash.into()).await?.ok_or(
105							format!(
106								"Could not find block info for data checkpoint: {data_checkpoint:?}"
107							),
108						)?;
109					ResolvedBridgeDataCheckpoint::Utxo {
110						block_number,
111						tx_ix,
112						tx_out_ix: utxo.index.into(),
113					}
114				},
115				BridgeDataCheckpoint::Block(number) => {
116					ResolvedBridgeDataCheckpoint::Block { number: number.into() }
117				},
118			};
119
120			let utxos = get_bridge_utxos_tx(
121				self.db_sync_config.get_tx_in_config().await?,
122				&self.pool,
123				&main_chain_scripts.illiquid_circulation_supply_validator_address.into(),
124				asset,
125				data_checkpoint,
126				current_mc_block.block_no,
127				Some(max_transfers),
128			)
129			.await?;
130
131			let new_checkpoint = match utxos.last() {
132				None => BridgeDataCheckpoint::Block(current_mc_block.block_no.into()),
133				Some(_) if (utxos.len() as u32) < max_transfers => {
134					BridgeDataCheckpoint::Block(current_mc_block.block_no.into())
135				},
136				Some(utxo) => BridgeDataCheckpoint::Utxo(utxo.utxo_id()),
137			};
138
139			let transfers = utxos.into_iter().flat_map(utxo_to_transfer).collect();
140
141			Ok((transfers, new_checkpoint))
142		}
143	}
144);
145
146fn utxo_to_transfer<RecipientAddress>(
147	utxo: BridgeUtxo,
148) -> Option<BridgeTransferV1<RecipientAddress>>
149where
150	RecipientAddress: for<'a> TryFrom<&'a [u8]>,
151{
152	let token_delta = utxo.tokens_out.checked_sub(utxo.tokens_in)?;
153
154	if token_delta.is_zero() {
155		return None;
156	}
157
158	let token_amount = token_delta.0 as u64;
159
160	let Some(datum) = utxo.datum.clone() else {
161		return Some(BridgeTransferV1::InvalidTransfer { token_amount, utxo_id: utxo.utxo_id() });
162	};
163
164	let transfer = match TokenTransferDatum::try_from(datum.0) {
165		Ok(TokenTransferDatum::V1(TokenTransferDatumV1::UserTransfer { receiver })) => {
166			match RecipientAddress::try_from(receiver.0.as_ref()) {
167				Ok(recipient) => BridgeTransferV1::UserTransfer { token_amount, recipient },
168				Err(_) => {
169					BridgeTransferV1::InvalidTransfer { token_amount, utxo_id: utxo.utxo_id() }
170				},
171			}
172		},
173		Ok(TokenTransferDatum::V1(TokenTransferDatumV1::ReserveTransfer)) => {
174			BridgeTransferV1::ReserveTransfer { token_amount }
175		},
176		Err(_) => BridgeTransferV1::InvalidTransfer { token_amount, utxo_id: utxo.utxo_id() },
177	};
178
179	Some(transfer)
180}