cat_gateway/db/event/
mod.rs1use std::{
3 str::FromStr,
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc, OnceLock,
7 },
8};
9
10use bb8::Pool;
11use bb8_postgres::PostgresConnectionManager;
12use error::NotFoundError;
13use futures::{Stream, StreamExt, TryStreamExt};
14use tokio_postgres::{types::ToSql, NoTls, Row};
15use tracing::{debug, debug_span, error, Instrument};
16
17use crate::settings::Settings;
18
19pub(crate) mod common;
20pub(crate) mod config;
21pub(crate) mod error;
22pub(crate) mod schema_check;
23pub(crate) mod signed_docs;
24
25pub(crate) const DATABASE_SCHEMA_VERSION: i32 = 2;
28
29type SqlDbPool = Arc<Pool<PostgresConnectionManager<NoTls>>>;
31
32static EVENT_DB_POOL: OnceLock<SqlDbPool> = OnceLock::new();
34
35static DEEP_QUERY_INSPECT: AtomicBool = AtomicBool::new(false);
37
38pub(crate) struct EventDB {}
40
41#[derive(thiserror::Error, Debug, PartialEq, Eq)]
43pub(crate) enum Error {
44 #[error("DB Pool uninitialized")]
46 DbPoolUninitialized,
47}
48
49impl EventDB {
50 pub(crate) fn is_deep_query_enabled() -> bool {
52 DEEP_QUERY_INSPECT.load(Ordering::SeqCst)
53 }
54
55 pub(crate) fn modify_deep_query(enable: bool) {
61 DEEP_QUERY_INSPECT.store(enable, Ordering::SeqCst);
62 }
63
64 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
78 pub(crate) async fn query(
79 stmt: &str, params: &[&(dyn ToSql + Sync)],
80 ) -> anyhow::Result<Vec<Row>> {
81 if Self::is_deep_query_enabled() {
82 Self::explain_analyze_rollback(stmt, params).await?;
83 }
84 let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
85 let conn = pool.get().await?;
86 let rows = conn.query(stmt, params).await?;
87 Ok(rows)
88 }
89
90 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
104 pub(crate) async fn query_stream(
105 stmt: &str, params: &[&(dyn ToSql + Sync)],
106 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>>> {
107 if Self::is_deep_query_enabled() {
108 Self::explain_analyze_rollback(stmt, params).await?;
109 }
110 let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
111 let conn = pool.get().await?;
112 let rows = conn.query_raw(stmt, params.iter().copied()).await?;
113 Ok(rows.map_err(Into::into).boxed())
114 }
115
116 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
127 pub(crate) async fn query_one(
128 stmt: &str, params: &[&(dyn ToSql + Sync)],
129 ) -> anyhow::Result<Row> {
130 if Self::is_deep_query_enabled() {
131 Self::explain_analyze_rollback(stmt, params).await?;
132 }
133 let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
134 let conn = pool.get().await?;
135 let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
136 Ok(row)
137 }
138
139 pub(crate) async fn modify(stmt: &str, params: &[&(dyn ToSql + Sync)]) -> anyhow::Result<()> {
153 if Self::is_deep_query_enabled() {
154 Self::explain_analyze_commit(stmt, params).await?;
155 } else {
156 let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
157 let conn = pool.get().await?;
158 conn.query(stmt, params).await?;
159 }
160 Ok(())
161 }
162
163 async fn explain_analyze_rollback(
165 stmt: &str, params: &[&(dyn ToSql + Sync)],
166 ) -> anyhow::Result<()> {
167 Self::explain_analyze(stmt, params, true).await
168 }
169
170 async fn explain_analyze_commit(
172 stmt: &str, params: &[&(dyn ToSql + Sync)],
173 ) -> anyhow::Result<()> {
174 Self::explain_analyze(stmt, params, false).await
175 }
176
177 async fn explain_analyze(
187 stmt: &str, params: &[&(dyn ToSql + Sync)], rollback: bool,
188 ) -> anyhow::Result<()> {
189 let span = debug_span!(
190 "query_plan",
191 query_statement = stmt,
192 params = format!("{:?}", params),
193 uuid = uuid::Uuid::new_v4().to_string()
194 );
195
196 async move {
197 let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
198 let mut conn = pool.get().await?;
199 let transaction = conn.transaction().await?;
200 let explain_stmt = transaction
201 .prepare(format!("EXPLAIN ANALYZE {stmt}").as_str())
202 .await?;
203 let rows = transaction.query(&explain_stmt, params).await?;
204 for r in rows {
205 let query_plan_str: String = r.get("QUERY PLAN");
206 debug!("{}", query_plan_str);
207 }
208 if rollback {
209 transaction.rollback().await?;
210 } else {
211 transaction.commit().await?;
212 }
213 Ok(())
214 }
215 .instrument(span)
216 .await
217 }
218}
219
220pub fn establish_connection() {
243 let (url, user, pass) = Settings::event_db_settings();
244
245 let mut config = tokio_postgres::config::Config::from_str(url).unwrap_or_else(|_| {
248 error!(url = url, "Postgres URL Pre Validation has failed.");
249 tokio_postgres::config::Config::default()
250 });
251 if let Some(user) = user {
252 config.user(user);
253 }
254 if let Some(pass) = pass {
255 config.password(pass);
256 }
257
258 let pg_mgr = PostgresConnectionManager::new(config, tokio_postgres::NoTls);
259
260 let pool = Pool::builder().build_unchecked(pg_mgr);
261
262 if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
263 error!("Failed to set event db pool. Called Twice?");
264 }
265}