cat_gateway/db/event/
mod.rs

1//! Catalyst Election Database crate
2use std::{
3    str::FromStr,
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        Arc, OnceLock,
7    },
8};
9
10use bb8::{Pool, PooledConnection};
11use bb8_postgres::PostgresConnectionManager;
12use error::NotFoundError;
13use futures::{Stream, StreamExt, TryStreamExt};
14use tokio_postgres::{types::ToSql, NoTls, Row};
15use tracing::{debug, debug_span, error, Instrument};
16
17use crate::{service::utilities::health::set_event_db_liveness, settings::Settings};
18
19pub(crate) mod common;
20pub(crate) mod config;
21pub(crate) mod error;
22pub(crate) mod schema_check;
23pub(crate) mod signed_docs;
24
25/// Database version this crate matches.
26/// Must equal the last Migrations Version Number from `event-db/migrations`.
27pub(crate) const DATABASE_SCHEMA_VERSION: i32 = 2;
28
29/// Postgres Connection Manager DB Pool
30type SqlDbPool = Arc<Pool<PostgresConnectionManager<NoTls>>>;
31
32/// Postgres Connection Manager DB Pool Instance
33static EVENT_DB_POOL: OnceLock<SqlDbPool> = OnceLock::new();
34
35/// Is Deep Query Analysis enabled or not?
36static DEEP_QUERY_INSPECT: AtomicBool = AtomicBool::new(false);
37
38/// The Catalyst Event SQL Database
39pub(crate) struct EventDB {}
40
41/// `EventDB` Errors
42#[derive(thiserror::Error, Debug, PartialEq, Eq)]
43pub(crate) enum EventDBConnectionError {
44    /// Failed to get a DB Pool
45    #[error("DB Pool uninitialized")]
46    DbPoolUninitialized,
47    /// Failed to get a DB Pool Connection
48    #[error("DB Pool connection is unavailable")]
49    PoolConnectionUnavailable,
50}
51
52impl EventDB {
53    /// Get a connection from the pool.
54    async fn get_pool_connection<'a>(
55    ) -> Result<PooledConnection<'a, PostgresConnectionManager<NoTls>>, EventDBConnectionError>
56    {
57        let pool = EVENT_DB_POOL
58            .get()
59            .ok_or(EventDBConnectionError::DbPoolUninitialized)?;
60        pool.get().await.map_err(|_| {
61            set_event_db_liveness(false);
62            EventDBConnectionError::PoolConnectionUnavailable
63        })
64    }
65
66    /// Determine if deep query inspection is enabled.
67    pub(crate) fn is_deep_query_enabled() -> bool {
68        DEEP_QUERY_INSPECT.load(Ordering::SeqCst)
69    }
70
71    /// Modify the deep query inspection setting.
72    ///
73    /// # Arguments
74    ///
75    /// * `enable` - Set the `DeepQueryInspection` setting to this value.
76    pub(crate) fn modify_deep_query(enable: bool) {
77        DEEP_QUERY_INSPECT.store(enable, Ordering::SeqCst);
78    }
79
80    /// Query the database.
81    ///
82    /// If deep query inspection is enabled, this will log the query plan inside a
83    /// rolled-back transaction, before running the query.
84    ///
85    /// # Arguments
86    ///
87    /// * `stmt` - `&str` SQL statement.
88    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
89    ///
90    /// # Returns
91    ///
92    /// `anyhow::Result<Vec<Row>>`
93    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
94    pub(crate) async fn query(
95        stmt: &str, params: &[&(dyn ToSql + Sync)],
96    ) -> anyhow::Result<Vec<Row>> {
97        if Self::is_deep_query_enabled() {
98            Self::explain_analyze_rollback(stmt, params).await?;
99        }
100        let conn = Self::get_pool_connection().await?;
101        let rows = conn.query(stmt, params).await?;
102        Ok(rows)
103    }
104
105    /// Query the database and return a async stream of rows.
106    ///
107    /// If deep query inspection is enabled, this will log the query plan inside a
108    /// rolled-back transaction, before running the query.
109    ///
110    /// # Arguments
111    ///
112    /// * `stmt` - `&str` SQL statement.
113    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
114    ///
115    /// # Returns
116    ///
117    /// `anyhow::Result<impl Stream<Item = anyhow::Result<Row>>>`
118    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
119    pub(crate) async fn query_stream(
120        stmt: &str, params: &[&(dyn ToSql + Sync)],
121    ) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>>> {
122        if Self::is_deep_query_enabled() {
123            Self::explain_analyze_rollback(stmt, params).await?;
124        }
125        let conn = Self::get_pool_connection().await?;
126        let rows = conn.query_raw(stmt, params.iter().copied()).await?;
127        Ok(rows.map_err(Into::into).boxed())
128    }
129
130    /// Query the database for a single row.
131    ///
132    /// # Arguments
133    ///
134    /// * `stmt` - `&str` SQL statement.
135    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
136    ///
137    /// # Returns
138    ///
139    /// `Result<Row, anyhow::Error>`
140    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
141    pub(crate) async fn query_one(
142        stmt: &str, params: &[&(dyn ToSql + Sync)],
143    ) -> anyhow::Result<Row> {
144        if Self::is_deep_query_enabled() {
145            Self::explain_analyze_rollback(stmt, params).await?;
146        }
147        let conn = Self::get_pool_connection().await?;
148        let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
149        Ok(row)
150    }
151
152    /// Modify the database.
153    ///
154    /// Use this for `UPDATE`, `DELETE`, and other DB statements that
155    /// don't return data.
156    ///
157    /// # Arguments
158    ///
159    /// * `stmt` - `&str` SQL statement.
160    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
161    ///
162    /// # Returns
163    ///
164    /// `anyhow::Result<()>`
165    pub(crate) async fn modify(stmt: &str, params: &[&(dyn ToSql + Sync)]) -> anyhow::Result<()> {
166        if Self::is_deep_query_enabled() {
167            Self::explain_analyze_commit(stmt, params).await?;
168        } else {
169            let conn = Self::get_pool_connection().await?;
170            conn.query(stmt, params).await?;
171        }
172        Ok(())
173    }
174
175    /// Checks that connection to `EventDB` is available.
176    pub(crate) async fn connection_is_ok() -> bool {
177        Self::get_pool_connection().await.is_ok()
178    }
179
180    /// Prepend `EXPLAIN ANALYZE` to the query, and rollback the transaction.
181    async fn explain_analyze_rollback(
182        stmt: &str, params: &[&(dyn ToSql + Sync)],
183    ) -> anyhow::Result<()> {
184        Self::explain_analyze(stmt, params, true).await
185    }
186
187    /// Prepend `EXPLAIN ANALYZE` to the query, and commit the transaction.
188    async fn explain_analyze_commit(
189        stmt: &str, params: &[&(dyn ToSql + Sync)],
190    ) -> anyhow::Result<()> {
191        Self::explain_analyze(stmt, params, false).await
192    }
193
194    /// Prepend `EXPLAIN ANALYZE` to the query.
195    ///
196    /// Log the query plan inside a transaction that may be committed or rolled back.
197    ///
198    /// # Arguments
199    ///
200    /// * `stmt` - `&str` SQL statement.
201    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
202    /// * `rollback` - `bool` whether to roll back the transaction or not.
203    async fn explain_analyze(
204        stmt: &str, params: &[&(dyn ToSql + Sync)], rollback: bool,
205    ) -> anyhow::Result<()> {
206        let span = debug_span!(
207            "query_plan",
208            query_statement = stmt,
209            params = format!("{:?}", params),
210            uuid = uuid::Uuid::new_v4().to_string()
211        );
212
213        async move {
214            let mut conn = Self::get_pool_connection().await?;
215            let transaction = conn.transaction().await?;
216            let explain_stmt = transaction
217                .prepare(format!("EXPLAIN ANALYZE {stmt}").as_str())
218                .await?;
219            let rows = transaction.query(&explain_stmt, params).await?;
220            for r in rows {
221                let query_plan_str: String = r.get("QUERY PLAN");
222                debug!("{}", query_plan_str);
223            }
224            if rollback {
225                transaction.rollback().await?;
226            } else {
227                transaction.commit().await?;
228            }
229            Ok(())
230        }
231        .instrument(span)
232        .await
233    }
234}
235
236/// Establish a connection to the database, and check the schema is up-to-date.
237///
238/// # Parameters
239///
240/// * `url` set to the postgres connection string needed to connect to the database.  IF
241///   it is None, then the env var "`DATABASE_URL`" will be used for this connection
242///   string. eg: "`postgres://catalyst-dev:CHANGE_ME@localhost/CatalystDev`"
243/// * `do_schema_check` boolean flag to decide whether to verify the schema version or
244///   not. If it is `true`, a query is made to verify the DB schema version.
245///
246/// # Errors
247///
248/// This function will return an error if:
249/// * `url` is None and the environment variable "`DATABASE_URL`" isn't set.
250/// * There is any error communicating the the database to check its schema.
251/// * The database schema in the DB does not 100% match the schema supported by this
252///   library.
253///
254/// # Notes
255///
256/// The env var "`DATABASE_URL`" can be set directly as an anv var, or in a
257/// `.env` file.
258///
259/// If connection to the pool is `OK`, the `LIVE_EVENT_DB` atomic flag is set to `true`.
260pub async fn establish_connection_pool() {
261    let (url, user, pass, max_connections, max_lifetime, min_idle, connection_timeout) =
262        Settings::event_db_settings();
263    debug!("Establishing connection with Event DB pool");
264
265    // This was pre-validated and can't fail, but provide default in the impossible case it
266    // does.
267    let mut config = tokio_postgres::config::Config::from_str(url).unwrap_or_else(|_| {
268        error!(url = url, "Postgres URL Pre Validation has failed.");
269        tokio_postgres::config::Config::default()
270    });
271    if let Some(user) = user {
272        config.user(user);
273    }
274    if let Some(pass) = pass {
275        config.password(pass);
276    }
277
278    let pg_mgr = PostgresConnectionManager::new(config, tokio_postgres::NoTls);
279
280    match Pool::builder()
281        .max_size(max_connections)
282        .max_lifetime(Some(core::time::Duration::from_secs(max_lifetime.into())))
283        .min_idle(min_idle)
284        .connection_timeout(core::time::Duration::from_secs(connection_timeout.into()))
285        .build(pg_mgr)
286        .await
287    {
288        Ok(pool) => {
289            debug!("Event DB pool configured.");
290            if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
291                error!("Failed to set EVENT_DB_POOL. Already set?");
292            }
293        },
294        Err(err) => {
295            error!(error = %err, "Failed to establish connection with EventDB pool");
296        },
297    }
298}