cat_gateway/db/event/
mod.rs

1//! Catalyst Election Database crate
2use std::{
3    str::FromStr,
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        Arc, OnceLock,
7    },
8};
9
10use bb8::Pool;
11use bb8_postgres::PostgresConnectionManager;
12use error::NotFoundError;
13use futures::{Stream, StreamExt, TryStreamExt};
14use tokio_postgres::{types::ToSql, NoTls, Row};
15use tracing::{debug, debug_span, error, Instrument};
16
17use crate::settings::Settings;
18
19pub(crate) mod common;
20pub(crate) mod config;
21pub(crate) mod error;
22pub(crate) mod schema_check;
23pub(crate) mod signed_docs;
24
25/// Database version this crate matches.
26/// Must equal the last Migrations Version Number from `event-db/migrations`.
27pub(crate) const DATABASE_SCHEMA_VERSION: i32 = 2;
28
29/// Postgres Connection Manager DB Pool
30type SqlDbPool = Arc<Pool<PostgresConnectionManager<NoTls>>>;
31
32/// Postgres Connection Manager DB Pool Instance
33static EVENT_DB_POOL: OnceLock<SqlDbPool> = OnceLock::new();
34
35/// Is Deep Query Analysis enabled or not?
36static DEEP_QUERY_INSPECT: AtomicBool = AtomicBool::new(false);
37
38/// The Catalyst Event SQL Database
39pub(crate) struct EventDB {}
40
41/// `EventDB` Errors
42#[derive(thiserror::Error, Debug, PartialEq, Eq)]
43pub(crate) enum Error {
44    /// Failed to get a DB Pool
45    #[error("DB Pool uninitialized")]
46    DbPoolUninitialized,
47}
48
49impl EventDB {
50    /// Determine if deep query inspection is enabled.
51    pub(crate) fn is_deep_query_enabled() -> bool {
52        DEEP_QUERY_INSPECT.load(Ordering::SeqCst)
53    }
54
55    /// Modify the deep query inspection setting.
56    ///
57    /// # Arguments
58    ///
59    /// * `enable` - Set the `DeepQueryInspection` setting to this value.
60    pub(crate) fn modify_deep_query(enable: bool) {
61        DEEP_QUERY_INSPECT.store(enable, Ordering::SeqCst);
62    }
63
64    /// Query the database.
65    ///
66    /// If deep query inspection is enabled, this will log the query plan inside a
67    /// rolled-back transaction, before running the query.
68    ///
69    /// # Arguments
70    ///
71    /// * `stmt` - `&str` SQL statement.
72    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
73    ///
74    /// # Returns
75    ///
76    /// `anyhow::Result<Vec<Row>>`
77    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
78    pub(crate) async fn query(
79        stmt: &str, params: &[&(dyn ToSql + Sync)],
80    ) -> anyhow::Result<Vec<Row>> {
81        if Self::is_deep_query_enabled() {
82            Self::explain_analyze_rollback(stmt, params).await?;
83        }
84        let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
85        let conn = pool.get().await?;
86        let rows = conn.query(stmt, params).await?;
87        Ok(rows)
88    }
89
90    /// Query the database and return a async stream of rows.
91    ///
92    /// If deep query inspection is enabled, this will log the query plan inside a
93    /// rolled-back transaction, before running the query.
94    ///
95    /// # Arguments
96    ///
97    /// * `stmt` - `&str` SQL statement.
98    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
99    ///
100    /// # Returns
101    ///
102    /// `anyhow::Result<impl Stream<Item = anyhow::Result<Row>>>`
103    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
104    pub(crate) async fn query_stream(
105        stmt: &str, params: &[&(dyn ToSql + Sync)],
106    ) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>>> {
107        if Self::is_deep_query_enabled() {
108            Self::explain_analyze_rollback(stmt, params).await?;
109        }
110        let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
111        let conn = pool.get().await?;
112        let rows = conn.query_raw(stmt, params.iter().copied()).await?;
113        Ok(rows.map_err(Into::into).boxed())
114    }
115
116    /// Query the database for a single row.
117    ///
118    /// # Arguments
119    ///
120    /// * `stmt` - `&str` SQL statement.
121    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
122    ///
123    /// # Returns
124    ///
125    /// `Result<Row, anyhow::Error>`
126    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
127    pub(crate) async fn query_one(
128        stmt: &str, params: &[&(dyn ToSql + Sync)],
129    ) -> anyhow::Result<Row> {
130        if Self::is_deep_query_enabled() {
131            Self::explain_analyze_rollback(stmt, params).await?;
132        }
133        let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
134        let conn = pool.get().await?;
135        let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
136        Ok(row)
137    }
138
139    /// Modify the database.
140    ///
141    /// Use this for `UPDATE`, `DELETE`, and other DB statements that
142    /// don't return data.
143    ///
144    /// # Arguments
145    ///
146    /// * `stmt` - `&str` SQL statement.
147    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
148    ///
149    /// # Returns
150    ///
151    /// `anyhow::Result<()>`
152    pub(crate) async fn modify(stmt: &str, params: &[&(dyn ToSql + Sync)]) -> anyhow::Result<()> {
153        if Self::is_deep_query_enabled() {
154            Self::explain_analyze_commit(stmt, params).await?;
155        } else {
156            let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
157            let conn = pool.get().await?;
158            conn.query(stmt, params).await?;
159        }
160        Ok(())
161    }
162
163    /// Prepend `EXPLAIN ANALYZE` to the query, and rollback the transaction.
164    async fn explain_analyze_rollback(
165        stmt: &str, params: &[&(dyn ToSql + Sync)],
166    ) -> anyhow::Result<()> {
167        Self::explain_analyze(stmt, params, true).await
168    }
169
170    /// Prepend `EXPLAIN ANALYZE` to the query, and commit the transaction.
171    async fn explain_analyze_commit(
172        stmt: &str, params: &[&(dyn ToSql + Sync)],
173    ) -> anyhow::Result<()> {
174        Self::explain_analyze(stmt, params, false).await
175    }
176
177    /// Prepend `EXPLAIN ANALYZE` to the query.
178    ///
179    /// Log the query plan inside a transaction that may be committed or rolled back.
180    ///
181    /// # Arguments
182    ///
183    /// * `stmt` - `&str` SQL statement.
184    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
185    /// * `rollback` - `bool` whether to roll back the transaction or not.
186    async fn explain_analyze(
187        stmt: &str, params: &[&(dyn ToSql + Sync)], rollback: bool,
188    ) -> anyhow::Result<()> {
189        let span = debug_span!(
190            "query_plan",
191            query_statement = stmt,
192            params = format!("{:?}", params),
193            uuid = uuid::Uuid::new_v4().to_string()
194        );
195
196        async move {
197            let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
198            let mut conn = pool.get().await?;
199            let transaction = conn.transaction().await?;
200            let explain_stmt = transaction
201                .prepare(format!("EXPLAIN ANALYZE {stmt}").as_str())
202                .await?;
203            let rows = transaction.query(&explain_stmt, params).await?;
204            for r in rows {
205                let query_plan_str: String = r.get("QUERY PLAN");
206                debug!("{}", query_plan_str);
207            }
208            if rollback {
209                transaction.rollback().await?;
210            } else {
211                transaction.commit().await?;
212            }
213            Ok(())
214        }
215        .instrument(span)
216        .await
217    }
218}
219
220/// Establish a connection to the database, and check the schema is up-to-date.
221///
222/// # Parameters
223///
224/// * `url` set to the postgres connection string needed to connect to the database.  IF
225///   it is None, then the env var "`DATABASE_URL`" will be used for this connection
226///   string. eg: "`postgres://catalyst-dev:CHANGE_ME@localhost/CatalystDev`"
227/// * `do_schema_check` boolean flag to decide whether to verify the schema version or
228///   not. If it is `true`, a query is made to verify the DB schema version.
229///
230/// # Errors
231///
232/// This function will return an error if:
233/// * `url` is None and the environment variable "`DATABASE_URL`" isn't set.
234/// * There is any error communicating the the database to check its schema.
235/// * The database schema in the DB does not 100% match the schema supported by this
236///   library.
237///
238/// # Notes
239///
240/// The env var "`DATABASE_URL`" can be set directly as an anv var, or in a
241/// `.env` file.
242pub fn establish_connection() {
243    let (url, user, pass) = Settings::event_db_settings();
244
245    // This was pre-validated and can't fail, but provide default in the impossible case it
246    // does.
247    let mut config = tokio_postgres::config::Config::from_str(url).unwrap_or_else(|_| {
248        error!(url = url, "Postgres URL Pre Validation has failed.");
249        tokio_postgres::config::Config::default()
250    });
251    if let Some(user) = user {
252        config.user(user);
253    }
254    if let Some(pass) = pass {
255        config.password(pass);
256    }
257
258    let pg_mgr = PostgresConnectionManager::new(config, tokio_postgres::NoTls);
259
260    let pool = Pool::builder().build_unchecked(pg_mgr);
261
262    if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
263        error!("Failed to set event db pool. Called Twice?");
264    }
265}