cat_gateway/db/event/
mod.rs1use std::{
3    str::FromStr,
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        Arc, OnceLock,
7    },
8};
9
10use bb8::{Pool, PooledConnection};
11use bb8_postgres::PostgresConnectionManager;
12use error::NotFoundError;
13use futures::{Stream, StreamExt, TryStreamExt};
14use tokio_postgres::{types::ToSql, NoTls, Row};
15use tracing::{debug, debug_span, error, Instrument};
16
17use crate::{service::utilities::health::set_event_db_liveness, settings::Settings};
18
19pub(crate) mod common;
20pub(crate) mod config;
21pub(crate) mod error;
22pub(crate) mod schema_check;
23pub(crate) mod signed_docs;
24
25pub(crate) const DATABASE_SCHEMA_VERSION: i32 = 2;
28
29type SqlDbPool = Arc<Pool<PostgresConnectionManager<NoTls>>>;
31
32static EVENT_DB_POOL: OnceLock<SqlDbPool> = OnceLock::new();
34
35static DEEP_QUERY_INSPECT: AtomicBool = AtomicBool::new(false);
37
38pub(crate) struct EventDB {}
40
41#[derive(thiserror::Error, Debug, PartialEq, Eq)]
43pub(crate) enum EventDBConnectionError {
44    #[error("DB Pool uninitialized")]
46    DbPoolUninitialized,
47    #[error("DB Pool connection is unavailable")]
49    PoolConnectionUnavailable,
50}
51
52impl EventDB {
53    async fn get_pool_connection<'a>(
55    ) -> Result<PooledConnection<'a, PostgresConnectionManager<NoTls>>, EventDBConnectionError>
56    {
57        let pool = EVENT_DB_POOL
58            .get()
59            .ok_or(EventDBConnectionError::DbPoolUninitialized)?;
60        pool.get().await.map_err(|_| {
61            set_event_db_liveness(false);
62            EventDBConnectionError::PoolConnectionUnavailable
63        })
64    }
65
66    pub(crate) fn is_deep_query_enabled() -> bool {
68        DEEP_QUERY_INSPECT.load(Ordering::SeqCst)
69    }
70
71    pub(crate) fn modify_deep_query(enable: bool) {
77        DEEP_QUERY_INSPECT.store(enable, Ordering::SeqCst);
78    }
79
80    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
94    pub(crate) async fn query(
95        stmt: &str, params: &[&(dyn ToSql + Sync)],
96    ) -> anyhow::Result<Vec<Row>> {
97        if Self::is_deep_query_enabled() {
98            Self::explain_analyze_rollback(stmt, params).await?;
99        }
100        let conn = Self::get_pool_connection().await?;
101        let rows = conn.query(stmt, params).await?;
102        Ok(rows)
103    }
104
105    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
119    pub(crate) async fn query_stream(
120        stmt: &str, params: &[&(dyn ToSql + Sync)],
121    ) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>>> {
122        if Self::is_deep_query_enabled() {
123            Self::explain_analyze_rollback(stmt, params).await?;
124        }
125        let conn = Self::get_pool_connection().await?;
126        let rows = conn.query_raw(stmt, params.iter().copied()).await?;
127        Ok(rows.map_err(Into::into).boxed())
128    }
129
130    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
141    pub(crate) async fn query_one(
142        stmt: &str, params: &[&(dyn ToSql + Sync)],
143    ) -> anyhow::Result<Row> {
144        if Self::is_deep_query_enabled() {
145            Self::explain_analyze_rollback(stmt, params).await?;
146        }
147        let conn = Self::get_pool_connection().await?;
148        let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
149        Ok(row)
150    }
151
152    pub(crate) async fn modify(stmt: &str, params: &[&(dyn ToSql + Sync)]) -> anyhow::Result<()> {
166        if Self::is_deep_query_enabled() {
167            Self::explain_analyze_commit(stmt, params).await?;
168        } else {
169            let conn = Self::get_pool_connection().await?;
170            conn.query(stmt, params).await?;
171        }
172        Ok(())
173    }
174
175    pub(crate) async fn connection_is_ok() -> bool {
177        Self::get_pool_connection().await.is_ok()
178    }
179
180    async fn explain_analyze_rollback(
182        stmt: &str, params: &[&(dyn ToSql + Sync)],
183    ) -> anyhow::Result<()> {
184        Self::explain_analyze(stmt, params, true).await
185    }
186
187    async fn explain_analyze_commit(
189        stmt: &str, params: &[&(dyn ToSql + Sync)],
190    ) -> anyhow::Result<()> {
191        Self::explain_analyze(stmt, params, false).await
192    }
193
194    async fn explain_analyze(
204        stmt: &str, params: &[&(dyn ToSql + Sync)], rollback: bool,
205    ) -> anyhow::Result<()> {
206        let span = debug_span!(
207            "query_plan",
208            query_statement = stmt,
209            params = format!("{:?}", params),
210            uuid = uuid::Uuid::new_v4().to_string()
211        );
212
213        async move {
214            let mut conn = Self::get_pool_connection().await?;
215            let transaction = conn.transaction().await?;
216            let explain_stmt = transaction
217                .prepare(format!("EXPLAIN ANALYZE {stmt}").as_str())
218                .await?;
219            let rows = transaction.query(&explain_stmt, params).await?;
220            for r in rows {
221                let query_plan_str: String = r.get("QUERY PLAN");
222                debug!("{}", query_plan_str);
223            }
224            if rollback {
225                transaction.rollback().await?;
226            } else {
227                transaction.commit().await?;
228            }
229            Ok(())
230        }
231        .instrument(span)
232        .await
233    }
234}
235
236pub async fn establish_connection_pool() {
261    let (url, user, pass, max_connections, max_lifetime, min_idle, connection_timeout) =
262        Settings::event_db_settings();
263    debug!("Establishing connection with Event DB pool");
264
265    let mut config = tokio_postgres::config::Config::from_str(url).unwrap_or_else(|_| {
268        error!(url = url, "Postgres URL Pre Validation has failed.");
269        tokio_postgres::config::Config::default()
270    });
271    if let Some(user) = user {
272        config.user(user);
273    }
274    if let Some(pass) = pass {
275        config.password(pass);
276    }
277
278    let pg_mgr = PostgresConnectionManager::new(config, tokio_postgres::NoTls);
279
280    match Pool::builder()
281        .max_size(max_connections)
282        .max_lifetime(Some(core::time::Duration::from_secs(max_lifetime.into())))
283        .min_idle(min_idle)
284        .connection_timeout(core::time::Duration::from_secs(connection_timeout.into()))
285        .build(pg_mgr)
286        .await
287    {
288        Ok(pool) => {
289            debug!("Event DB pool configured.");
290            if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
291                error!("Failed to set EVENT_DB_POOL. Already set?");
292            }
293        },
294        Err(err) => {
295            error!(error = %err, "Failed to establish connection with EventDB pool");
296        },
297    }
298}