partner_chains_db_sync_data_sources/
lib.rs

1//! Crate providing implementations of Partner Chain Data Sources that read from Db-Sync Postgres.
2//!
3//! # Usage
4//!
5//! ## Adding to the node
6//!
7//! All data sources defined in this crate require a Postgres connection pool [PgPool] to run
8//! queries, which should be shared between all data sources. For convenience, this crate provides
9//! a helper function [get_connection_from_env] that will create a connection pool based on
10//! configuration read from node environment.
11//!
12//! Each data source also accepts an optional Prometheus metrics client [McFollowerMetrics] for
13//! reporting metrics to the Substrate's Prometheus metrics service. This client can be obtained
14//! using the [register_metrics_warn_errors] function.
15//!
16//! In addition to these two common arguments, some data sources depend on [BlockDataSourceImpl]
17//! which provides basic queries about blocks, and additional configuration for their data cache
18//! size.
19//!
20//! An example node code that creates the data sources can look like the following:
21//!
22//! ```rust
23//! # use std::error::Error;
24//! # use std::sync::Arc;
25//! use partner_chains_data_source_metrics::*;
26//! use partner_chains_db_sync_data_sources::*;
27//!
28//! pub const CANDIDATES_FOR_EPOCH_CACHE_SIZE: usize = 64;
29//! pub const STAKE_CACHE_SIZE: usize = 100;
30//! pub const GOVERNED_MAP_CACHE_SIZE: u16 = 100;
31//!
32//! async fn create_data_sources(
33//!     metrics_registry_opt: Option<&MetricsRegistry>
34//! ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
35//!     let metrics = register_metrics_warn_errors(metrics_registry_opt);
36//!     let pool = get_connection_from_env().await?;
37//!
38//!     // Block data source is shared by others for cache reuse
39//!     let block = Arc::new(BlockDataSourceImpl::new_from_env(pool.clone()).await?);
40//!
41//!     let sidechain_rpc = SidechainRpcDataSourceImpl::new(block.clone(), metrics.clone());
42//!
43//!     let mc_hash = Arc::new(McHashDataSourceImpl::new(block.clone(), metrics.clone()));
44//!
45//!     let authority_selection =
46//!         CandidatesDataSourceImpl::new(pool.clone(), metrics.clone())
47//!     	.await?
48//!     	.cached(CANDIDATES_FOR_EPOCH_CACHE_SIZE)?;
49//!
50//!     let block_participation =
51//!     	StakeDistributionDataSourceImpl::new(pool.clone(), metrics.clone(), STAKE_CACHE_SIZE);
52//!
53//!     let governed_map =
54//!         GovernedMapDataSourceCachedImpl::new(pool, metrics.clone(), GOVERNED_MAP_CACHE_SIZE, block).await?;
55//!     Ok(())
56//! }
57//! ```
58//!
59//! ## Cardano DB Sync configuration
60//!
61//! Partner Chains data sources require specific Db-Sync configuration to be set for them to
62//! operate correctly:
63//! - `insert_options.tx_out.value`: must be either `"enable"` (default) or `"consumed"`.
64//!   The data sources in this crate that need to query transaction inputs automatically detect
65//!   which option is used and adjust their queries accordingly. This requires the database to be
66//!   already initialized by db-sync. When run for an uninitialized database, the data sources
67//!   will default to the `"enable"` option.
68//! - `insert_options.tx_out.use_address_table`: must be `false` (default).
69//! - `insert_options.ledger`: must be `"enable"` (default).
70//! - `insert_options.multi_asset`: must be `true` (default).
71//! - `insert_options.governance`: must `"enable"` (default).
72//! - `insert_options.remove_jsonb_from_schema`: must be `"disable"` (default).
73//! - `insert_options.plutus`: must be `"enable"` (default).
74//!
75//! The default Cardano DB Sync configuration meets these requirements, so Partner Chain node
76//! operators that do not wish to use any custom configuration can use the defaults, otherwise
77//! they must preserve the values described above. See [Db-Sync configuration docs] for more
78//! information.
79//!
80//! ## Custom Indexes
81//!
82//! In addition to indexes automatically created by Db-Sync itself, data sources in this crate
83//! require additional ones to be created for some of the queries to execute efficiently. These
84//! indexes are:
85//! - `idx_ma_tx_out_ident ON ma_tx_out(ident)`
86//! - `idx_tx_out_address ON tx_out USING hash (address)`
87//!
88//! The data sources in this crate automatically create these indexes when needed at node startup.
89//!
90//! [PgPool]: sqlx::PgPool
91//! [BlockDataSourceImpl]: crate::block::BlockDataSourceImpl
92//! [McFollowerMetrics]: crate::metrics::McFollowerMetrics
93//! [get_connection_from_env]: crate::data_sources::get_connection_from_env
94//! [register_metrics_warn_errors]: crate::metrics::register_metrics_warn_errors
95//! [Db-Sync configuration docs]: https://github.com/IntersectMBO/cardano-db-sync/blob/master/doc/configuration.md
96#![deny(missing_docs)]
97#![allow(rustdoc::private_intra_doc_links)]
98
99pub use crate::data_sources::{ConnectionConfig, PgPool, get_connection_from_env};
100
101#[cfg(feature = "block-source")]
102pub use crate::block::{BlockDataSourceImpl, DbSyncBlockDataSourceConfig};
103#[cfg(feature = "bridge")]
104pub use crate::bridge::{TokenBridgeDataSourceImpl, cache::CachedTokenBridgeDataSourceImpl};
105#[cfg(feature = "candidate-source")]
106pub use crate::candidates::CandidatesDataSourceImpl;
107#[cfg(feature = "governed-map")]
108pub use crate::governed_map::{GovernedMapDataSourceCachedImpl, GovernedMapDataSourceImpl};
109#[cfg(feature = "mc-hash")]
110pub use crate::mc_hash::McHashDataSourceImpl;
111#[cfg(feature = "sidechain-rpc")]
112pub use crate::sidechain_rpc::SidechainRpcDataSourceImpl;
113#[cfg(feature = "block-participation")]
114pub use crate::stake_distribution::StakeDistributionDataSourceImpl;
115
116mod data_sources;
117mod db_datum;
118mod db_model;
119
120#[cfg(feature = "block-source")]
121mod block;
122#[cfg(feature = "bridge")]
123mod bridge;
124#[cfg(feature = "candidate-source")]
125mod candidates;
126#[cfg(feature = "governed-map")]
127mod governed_map;
128#[cfg(feature = "mc-hash")]
129mod mc_hash;
130#[cfg(feature = "sidechain-rpc")]
131mod sidechain_rpc;
132#[cfg(feature = "block-participation")]
133mod stake_distribution;
134
135#[derive(Debug)]
136/// Wrapper error type for [sqlx::Error]
137pub struct SqlxError(sqlx::Error);
138
139impl From<sqlx::Error> for SqlxError {
140	fn from(value: sqlx::Error) -> Self {
141		SqlxError(value)
142	}
143}
144
145impl From<SqlxError> for DataSourceError {
146	fn from(e: SqlxError) -> Self {
147		DataSourceError::InternalDataSourceError(e.0.to_string())
148	}
149}
150
151impl From<SqlxError> for Box<dyn std::error::Error + Send + Sync> {
152	fn from(e: SqlxError) -> Self {
153		e.0.into()
154	}
155}
156
157/// Error type returned by Db-Sync based data sources
158#[derive(Debug, PartialEq, thiserror::Error)]
159pub enum DataSourceError {
160	/// Indicates that the Db-Sync database rejected a request as invalid
161	#[error("Bad request: `{0}`.")]
162	BadRequest(String),
163	/// Indicates that an internal error occured when querying the Db-Sync database
164	#[error("Internal error of data source: `{0}`.")]
165	InternalDataSourceError(String),
166	/// Indicates that expected data was not found when querying the Db-Sync database
167	#[error(
168		"'{0}' not found. Possible causes: data source configuration error, db-sync not synced fully, or data not set on the main chain."
169	)]
170	ExpectedDataNotFound(String),
171	/// Indicates that data returned by the Db-Sync database is invalid
172	#[error(
173		"Invalid data. {0} Possible cause is an error in Plutus scripts or data source is outdated."
174	)]
175	InvalidData(String),
176}
177
178/// Result type used by Db-Sync data sources
179pub(crate) type Result<T> = std::result::Result<T, DataSourceError>;
180
181#[cfg(test)]
182mod tests {
183	use ctor::{ctor, dtor};
184	use std::sync::{OnceLock, mpsc};
185	use testcontainers_modules::postgres::Postgres;
186	use testcontainers_modules::testcontainers::{
187		Container, ImageExt,
188		bollard::query_parameters::{RemoveContainerOptions, StopContainerOptions},
189		core::client::docker_client_instance,
190		runners::SyncRunner,
191	};
192
193	static POSTGRES: OnceLock<Container<Postgres>> = OnceLock::new();
194
195	fn init_postgres() -> Container<Postgres> {
196		Postgres::default().with_tag("17.2").start().unwrap()
197	}
198
199	#[ctor]
200	fn on_startup() {
201		let postgres = POSTGRES.get_or_init(init_postgres);
202		let database_url = &format!(
203			"postgres://postgres:postgres@127.0.0.1:{}/postgres",
204			postgres.get_host_port_ipv4(5432).unwrap()
205		);
206		// Needed for sqlx::test macro annotation
207		unsafe {
208			std::env::set_var("DATABASE_URL", database_url);
209		}
210	}
211
212	#[dtor]
213	fn on_shutdown() {
214		let (tx, rx) = mpsc::channel();
215		std::thread::spawn(move || {
216			let runtime =
217				tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
218			runtime.block_on(async {
219				let docker = docker_client_instance().await.unwrap();
220				let id = POSTGRES.get().unwrap().id();
221				docker.stop_container(id, None::<StopContainerOptions>).await.unwrap();
222				docker.remove_container(id, None::<RemoveContainerOptions>).await.unwrap();
223				tx.send(());
224			});
225		});
226		let _: () = rx.recv().unwrap();
227	}
228}