partner_chains_db_sync_data_sources/lib.rs
1//! Crate providing implementations of Partner Chain Data Sources that read from Db-Sync Postgres.
2//!
3//! # Usage
4//!
5//! ## Adding to the node
6//!
7//! All data sources defined in this crate require a Postgres connection pool [PgPool] to run
8//! queries, which should be shared between all data sources. For convenience, this crate provides
9//! a helper function [get_connection_from_env] that will create a connection pool based on
10//! configuration read from node environment.
11//!
12//! Each data source also accepts an optional Prometheus metrics client [McFollowerMetrics] for
13//! reporting metrics to the Substrate's Prometheus metrics service. This client can be obtained
14//! using the [register_metrics_warn_errors] function.
15//!
16//! In addition to these two common arguments, some data sources depend on [BlockDataSourceImpl]
17//! which provides basic queries about blocks, and additional configuration for their data cache
18//! size.
19//!
20//! An example node code that creates the data sources can look like the following:
21//!
22//! ```rust
23//! # use std::error::Error;
24//! # use std::sync::Arc;
25//! use partner_chains_db_sync_data_sources::*;
26//!
27//! pub const CANDIDATES_FOR_EPOCH_CACHE_SIZE: usize = 64;
28//! pub const STAKE_CACHE_SIZE: usize = 100;
29//! pub const GOVERNED_MAP_CACHE_SIZE: u16 = 100;
30//!
31//! async fn create_data_sources(
32//! metrics_registry_opt: Option<&substrate_prometheus_endpoint::Registry>
33//! ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
34//! let metrics = register_metrics_warn_errors(metrics_registry_opt);
35//! let pool = get_connection_from_env().await?;
36//!
37//! // Block data source is shared by others for cache reuse
38//! let block = Arc::new(BlockDataSourceImpl::new_from_env(pool.clone()).await?);
39//!
40//! let sidechain_rpc = SidechainRpcDataSourceImpl::new(block.clone(), metrics.clone());
41//!
42//! let mc_hash = Arc::new(McHashDataSourceImpl::new(block.clone(), metrics.clone()));
43//!
44//! let authority_selection =
45//! CandidatesDataSourceImpl::new(pool.clone(), metrics.clone())
46//! .await?
47//! .cached(CANDIDATES_FOR_EPOCH_CACHE_SIZE)?;
48//!
49//! let native_token =
50//! NativeTokenManagementDataSourceImpl::new_from_env(pool.clone(), metrics.clone()).await?;
51//!
52//! let block_participation =
53//! StakeDistributionDataSourceImpl::new(pool.clone(), metrics.clone(), STAKE_CACHE_SIZE);
54//!
55//! let governed_map =
56//! GovernedMapDataSourceCachedImpl::new(pool, metrics.clone(), GOVERNED_MAP_CACHE_SIZE, block).await?;
57//! Ok(())
58//! }
59//! ```
60//!
61//! ## Cardano DB Sync configuration
62//!
63//! Partner Chains data sources require specific Db-Sync configuration to be set for them to
64//! operate correctly:
65//! - `insert_options.tx_out.value`: must be either `"enable"` (default) or `"consumed"`.
66//! The data sources in this crate that need to query transaction intputs automatically detect
67//! which option is used and adjust their queries accordingly. This requires the database to be
68//! already initialized by db-sync. When run for an uninitialized database, the data sources
69//! will default to the `"enable"` option.
70//! - `insert_options.tx_out.use_address_table`: must be `false` (default).
71//! - `insert_options.ledger`: must be `"enable"` (default).
72//! - `insert_options.multi_asset`: must be `true` (default).
73//! - `insert_options.governance`: must `"enable"` (default).
74//! - `insert_options.remove_jsonb_from_schema`: must be `"disable"` (default).
75//! - `insert_options.plutus`: must be `"enable"` (default).
76//!
77//! The default Cardano DB Sync configuration meets these requirements, so Partner Chain node
78//! operators that do not wish to use any custom configuration can use the defaults, otherwise
79//! they must preserve the values described above. See [Db-Sync configuration docs] for more
80//! information.
81//!
82//! ## Custom Indexes
83//!
84//! In addition to indexes automatically created by Db-Sync itself, data sources in this crate
85//! require additional ones to be created for some of the queries to execute efficiently. These
86//! indexes are:
87//! - `idx_ma_tx_out_ident ON ma_tx_out(ident)`
88//! - `idx_tx_out_address ON tx_out USING hash (address)`
89//!
90//! The data sources in this crate automatically create these indexes when needed at node startup.
91//!
92//! [PgPool]: sqlx::PgPool
93//! [BlockDataSourceImpl]: crate::block::BlockDataSourceImpl
94//! [McFollowerMetrics]: crate::metrics::McFollowerMetrics
95//! [get_connection_from_env]: crate::data_sources::get_connection_from_env
96//! [register_metrics_warn_errors]: crate::metrics::register_metrics_warn_errors
97//! [Db-Sync configuration docs]: https://github.com/IntersectMBO/cardano-db-sync/blob/master/doc/configuration.md
98#![deny(missing_docs)]
99#![allow(rustdoc::private_intra_doc_links)]
100
101pub use crate::{
102 data_sources::{ConnectionConfig, PgPool, get_connection_from_env},
103 metrics::{McFollowerMetrics, register_metrics_warn_errors},
104};
105
106#[cfg(feature = "block-source")]
107pub use crate::block::{BlockDataSourceImpl, DbSyncBlockDataSourceConfig};
108#[cfg(feature = "bridge")]
109pub use crate::bridge::TokenBridgeDataSourceImpl;
110#[cfg(feature = "candidate-source")]
111pub use crate::candidates::CandidatesDataSourceImpl;
112#[cfg(feature = "governed-map")]
113pub use crate::governed_map::{GovernedMapDataSourceCachedImpl, GovernedMapDataSourceImpl};
114#[cfg(feature = "mc-hash")]
115pub use crate::mc_hash::McHashDataSourceImpl;
116#[cfg(feature = "native-token")]
117pub use crate::native_token::NativeTokenManagementDataSourceImpl;
118#[cfg(feature = "sidechain-rpc")]
119pub use crate::sidechain_rpc::SidechainRpcDataSourceImpl;
120#[cfg(feature = "block-participation")]
121pub use crate::stake_distribution::StakeDistributionDataSourceImpl;
122
123mod data_sources;
124mod db_datum;
125mod db_model;
126mod metrics;
127
128#[cfg(feature = "block-source")]
129mod block;
130#[cfg(feature = "bridge")]
131mod bridge;
132#[cfg(feature = "candidate-source")]
133mod candidates;
134#[cfg(feature = "governed-map")]
135mod governed_map;
136#[cfg(feature = "mc-hash")]
137mod mc_hash;
138#[cfg(feature = "native-token")]
139mod native_token;
140#[cfg(feature = "sidechain-rpc")]
141mod sidechain_rpc;
142#[cfg(feature = "block-participation")]
143mod stake_distribution;
144
145#[derive(Debug)]
146/// Wrapper error type for [sqlx::Error]
147pub struct SqlxError(sqlx::Error);
148
149impl From<sqlx::Error> for SqlxError {
150 fn from(value: sqlx::Error) -> Self {
151 SqlxError(value)
152 }
153}
154
155impl From<SqlxError> for DataSourceError {
156 fn from(e: SqlxError) -> Self {
157 DataSourceError::InternalDataSourceError(e.0.to_string())
158 }
159}
160
161impl From<SqlxError> for Box<dyn std::error::Error + Send + Sync> {
162 fn from(e: SqlxError) -> Self {
163 e.0.into()
164 }
165}
166
167/// Error type returned by Db-Sync based data sources
168#[derive(Debug, PartialEq, thiserror::Error)]
169pub enum DataSourceError {
170 /// Indicates that the Db-Sync database rejected a request as invalid
171 #[error("Bad request: `{0}`.")]
172 BadRequest(String),
173 /// Indicates that an internal error occured when querying the Db-Sync database
174 #[error("Internal error of data source: `{0}`.")]
175 InternalDataSourceError(String),
176 /// Indicates that expected data was not found when querying the Db-Sync database
177 #[error(
178 "'{0}' not found. Possible causes: data source configuration error, db-sync not synced fully, or data not set on the main chain."
179 )]
180 ExpectedDataNotFound(String),
181 /// Indicates that data returned by the Db-Sync database is invalid
182 #[error(
183 "Invalid data. {0} Possible cause is an error in Plutus scripts or data source is outdated."
184 )]
185 InvalidData(String),
186}
187
188/// Result type used by Db-Sync data sources
189pub(crate) type Result<T> = std::result::Result<T, DataSourceError>;
190
191#[cfg(test)]
192mod tests {
193 use ctor::{ctor, dtor};
194 use std::sync::{OnceLock, mpsc};
195 use testcontainers_modules::postgres::Postgres;
196 use testcontainers_modules::testcontainers::{
197 Container, ImageExt,
198 bollard::query_parameters::{RemoveContainerOptions, StopContainerOptions},
199 core::client::docker_client_instance,
200 runners::SyncRunner,
201 };
202
203 static POSTGRES: OnceLock<Container<Postgres>> = OnceLock::new();
204
205 fn init_postgres() -> Container<Postgres> {
206 Postgres::default().with_tag("17.2").start().unwrap()
207 }
208
209 #[ctor]
210 fn on_startup() {
211 let postgres = POSTGRES.get_or_init(init_postgres);
212 let database_url = &format!(
213 "postgres://postgres:postgres@127.0.0.1:{}/postgres",
214 postgres.get_host_port_ipv4(5432).unwrap()
215 );
216 // Needed for sqlx::test macro annotation
217 unsafe {
218 std::env::set_var("DATABASE_URL", database_url);
219 }
220 }
221
222 #[dtor]
223 fn on_shutdown() {
224 let (tx, rx) = mpsc::channel();
225 std::thread::spawn(move || {
226 let runtime =
227 tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
228 runtime.block_on(async {
229 let docker = docker_client_instance().await.unwrap();
230 let id = POSTGRES.get().unwrap().id();
231 docker.stop_container(id, None::<StopContainerOptions>).await.unwrap();
232 docker.remove_container(id, None::<RemoveContainerOptions>).await.unwrap();
233 tx.send(());
234 });
235 });
236 let _: () = rx.recv().unwrap();
237 }
238}