partner_chains_db_sync_data_sources/bridge/
mod.rs1use crate::McFollowerMetrics;
46use crate::db_model::*;
47use crate::metrics::observed_async_trait;
48use partner_chains_plutus_data::bridge::{TokenTransferDatum, TokenTransferDatumV1};
49use sidechain_domain::McBlockHash;
50use sp_partner_chains_bridge::*;
51use sqlx::PgPool;
52use std::fmt::Debug;
53
54#[cfg(test)]
55mod tests;
56
57pub(crate) mod cache;
58
59pub struct TokenBridgeDataSourceImpl {
61 pool: PgPool,
63 metrics_opt: Option<McFollowerMetrics>,
65 db_sync_config: DbSyncConfigurationProvider,
67}
68
69impl TokenBridgeDataSourceImpl {
70 pub fn new(pool: PgPool, metrics_opt: Option<McFollowerMetrics>) -> Self {
72 Self { db_sync_config: DbSyncConfigurationProvider::new(pool.clone()), pool, metrics_opt }
73 }
74}
75
76observed_async_trait!(
77 impl<RecipientAddress> TokenBridgeDataSource<RecipientAddress> for TokenBridgeDataSourceImpl
78 where
79 RecipientAddress: Debug,
80 RecipientAddress: (for<'a> TryFrom<&'a [u8]>),
81 {
82 async fn get_transfers(
83 &self,
84 main_chain_scripts: MainChainScripts,
85 data_checkpoint: BridgeDataCheckpoint,
86 max_transfers: u32,
87 current_mc_block_hash: McBlockHash,
88 ) -> Result<
89 (Vec<BridgeTransferV1<RecipientAddress>>, BridgeDataCheckpoint),
90 Box<dyn std::error::Error + Send + Sync>,
91 > {
92 let asset = Asset {
93 policy_id: main_chain_scripts.token_policy_id.into(),
94 asset_name: main_chain_scripts.token_asset_name.into(),
95 };
96
97 let current_mc_block = get_block_by_hash(&self.pool, current_mc_block_hash.clone())
98 .await?
99 .ok_or(format!("Could not find block for hash {current_mc_block_hash:?}"))?;
100
101 let data_checkpoint = match data_checkpoint {
102 BridgeDataCheckpoint::Utxo(utxo) => {
103 let TxBlockInfo { block_number, tx_ix } =
104 get_block_info_for_utxo(&self.pool, utxo.tx_hash.into()).await?.ok_or(
105 format!(
106 "Could not find block info for data checkpoint: {data_checkpoint:?}"
107 ),
108 )?;
109 ResolvedBridgeDataCheckpoint::Utxo {
110 block_number,
111 tx_ix,
112 tx_out_ix: utxo.index.into(),
113 }
114 },
115 BridgeDataCheckpoint::Block(number) => {
116 ResolvedBridgeDataCheckpoint::Block { number: number.into() }
117 },
118 };
119
120 let utxos = get_bridge_utxos_tx(
121 self.db_sync_config.get_tx_in_config().await?,
122 &self.pool,
123 &main_chain_scripts.illiquid_circulation_supply_validator_address.into(),
124 asset,
125 data_checkpoint,
126 current_mc_block.block_no,
127 Some(max_transfers),
128 )
129 .await?;
130
131 let new_checkpoint = match utxos.last() {
132 None => BridgeDataCheckpoint::Block(current_mc_block.block_no.into()),
133 Some(_) if (utxos.len() as u32) < max_transfers => {
134 BridgeDataCheckpoint::Block(current_mc_block.block_no.into())
135 },
136 Some(utxo) => BridgeDataCheckpoint::Utxo(utxo.utxo_id()),
137 };
138
139 let transfers = utxos.into_iter().flat_map(utxo_to_transfer).collect();
140
141 Ok((transfers, new_checkpoint))
142 }
143 }
144);
145
146fn utxo_to_transfer<RecipientAddress>(
147 utxo: BridgeUtxo,
148) -> Option<BridgeTransferV1<RecipientAddress>>
149where
150 RecipientAddress: for<'a> TryFrom<&'a [u8]>,
151{
152 let token_delta = utxo.tokens_out.checked_sub(utxo.tokens_in)?;
153
154 if token_delta.is_zero() {
155 return None;
156 }
157
158 let token_amount = token_delta.0 as u64;
159
160 let Some(datum) = utxo.datum.clone() else {
161 return Some(BridgeTransferV1::InvalidTransfer { token_amount, utxo_id: utxo.utxo_id() });
162 };
163
164 let transfer = match TokenTransferDatum::try_from(datum.0) {
165 Ok(TokenTransferDatum::V1(TokenTransferDatumV1::UserTransfer { receiver })) => {
166 match RecipientAddress::try_from(receiver.0.as_ref()) {
167 Ok(recipient) => BridgeTransferV1::UserTransfer { token_amount, recipient },
168 Err(_) => {
169 BridgeTransferV1::InvalidTransfer { token_amount, utxo_id: utxo.utxo_id() }
170 },
171 }
172 },
173 Ok(TokenTransferDatum::V1(TokenTransferDatumV1::ReserveTransfer)) => {
174 BridgeTransferV1::ReserveTransfer { token_amount }
175 },
176 Err(_) => BridgeTransferV1::InvalidTransfer { token_amount, utxo_id: utxo.utxo_id() },
177 };
178
179 Some(transfer)
180}