cat_gateway/db/event/
mod.rs1use std::{
3 str::FromStr,
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc, OnceLock,
7 },
8};
9
10use bb8::{Pool, PooledConnection};
11use bb8_postgres::PostgresConnectionManager;
12use error::NotFoundError;
13use futures::{Stream, StreamExt, TryStreamExt};
14use tokio_postgres::{types::ToSql, NoTls, Row};
15use tracing::{debug, debug_span, error, Instrument};
16
17use crate::{service::utilities::health::set_event_db_liveness, settings::Settings};
18
19pub(crate) mod common;
20pub(crate) mod config;
21pub(crate) mod error;
22pub(crate) mod schema_check;
23pub(crate) mod signed_docs;
24
25pub(crate) const DATABASE_SCHEMA_VERSION: i32 = 2;
28
29type SqlDbPool = Arc<Pool<PostgresConnectionManager<NoTls>>>;
31
32static EVENT_DB_POOL: OnceLock<SqlDbPool> = OnceLock::new();
34
35static DEEP_QUERY_INSPECT: AtomicBool = AtomicBool::new(false);
37
38pub(crate) struct EventDB {}
40
41#[derive(thiserror::Error, Debug, PartialEq, Eq)]
43pub(crate) enum EventDBConnectionError {
44 #[error("DB Pool uninitialized")]
46 DbPoolUninitialized,
47 #[error("DB Pool connection is unavailable")]
49 PoolConnectionUnavailable,
50}
51
52impl EventDB {
53 async fn get_pool_connection<'a>(
55 ) -> Result<PooledConnection<'a, PostgresConnectionManager<NoTls>>, EventDBConnectionError>
56 {
57 let pool = EVENT_DB_POOL
58 .get()
59 .ok_or(EventDBConnectionError::DbPoolUninitialized)?;
60 pool.get().await.map_err(|_| {
61 set_event_db_liveness(false);
62 EventDBConnectionError::PoolConnectionUnavailable
63 })
64 }
65
66 pub(crate) fn is_deep_query_enabled() -> bool {
68 DEEP_QUERY_INSPECT.load(Ordering::SeqCst)
69 }
70
71 pub(crate) fn modify_deep_query(enable: bool) {
77 DEEP_QUERY_INSPECT.store(enable, Ordering::SeqCst);
78 }
79
80 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
94 pub(crate) async fn query(
95 stmt: &str, params: &[&(dyn ToSql + Sync)],
96 ) -> anyhow::Result<Vec<Row>> {
97 if Self::is_deep_query_enabled() {
98 Self::explain_analyze_rollback(stmt, params).await?;
99 }
100 let conn = Self::get_pool_connection().await?;
101 let rows = conn.query(stmt, params).await?;
102 Ok(rows)
103 }
104
105 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
119 pub(crate) async fn query_stream(
120 stmt: &str, params: &[&(dyn ToSql + Sync)],
121 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>>> {
122 if Self::is_deep_query_enabled() {
123 Self::explain_analyze_rollback(stmt, params).await?;
124 }
125 let conn = Self::get_pool_connection().await?;
126 let rows = conn.query_raw(stmt, params.iter().copied()).await?;
127 Ok(rows.map_err(Into::into).boxed())
128 }
129
130 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
141 pub(crate) async fn query_one(
142 stmt: &str, params: &[&(dyn ToSql + Sync)],
143 ) -> anyhow::Result<Row> {
144 if Self::is_deep_query_enabled() {
145 Self::explain_analyze_rollback(stmt, params).await?;
146 }
147 let conn = Self::get_pool_connection().await?;
148 let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
149 Ok(row)
150 }
151
152 pub(crate) async fn modify(stmt: &str, params: &[&(dyn ToSql + Sync)]) -> anyhow::Result<()> {
166 if Self::is_deep_query_enabled() {
167 Self::explain_analyze_commit(stmt, params).await?;
168 } else {
169 let conn = Self::get_pool_connection().await?;
170 conn.query(stmt, params).await?;
171 }
172 Ok(())
173 }
174
175 pub(crate) async fn connection_is_ok() -> bool {
177 Self::get_pool_connection().await.is_ok()
178 }
179
180 async fn explain_analyze_rollback(
182 stmt: &str, params: &[&(dyn ToSql + Sync)],
183 ) -> anyhow::Result<()> {
184 Self::explain_analyze(stmt, params, true).await
185 }
186
187 async fn explain_analyze_commit(
189 stmt: &str, params: &[&(dyn ToSql + Sync)],
190 ) -> anyhow::Result<()> {
191 Self::explain_analyze(stmt, params, false).await
192 }
193
194 async fn explain_analyze(
204 stmt: &str, params: &[&(dyn ToSql + Sync)], rollback: bool,
205 ) -> anyhow::Result<()> {
206 let span = debug_span!(
207 "query_plan",
208 query_statement = stmt,
209 params = format!("{:?}", params),
210 uuid = uuid::Uuid::new_v4().to_string()
211 );
212
213 async move {
214 let mut conn = Self::get_pool_connection().await?;
215 let transaction = conn.transaction().await?;
216 let explain_stmt = transaction
217 .prepare(format!("EXPLAIN ANALYZE {stmt}").as_str())
218 .await?;
219 let rows = transaction.query(&explain_stmt, params).await?;
220 for r in rows {
221 let query_plan_str: String = r.get("QUERY PLAN");
222 debug!("{}", query_plan_str);
223 }
224 if rollback {
225 transaction.rollback().await?;
226 } else {
227 transaction.commit().await?;
228 }
229 Ok(())
230 }
231 .instrument(span)
232 .await
233 }
234}
235
236pub async fn establish_connection_pool() {
261 let (url, user, pass, max_connections, max_lifetime, min_idle, connection_timeout) =
262 Settings::event_db_settings();
263 debug!("Establishing connection with Event DB pool");
264
265 let mut config = tokio_postgres::config::Config::from_str(url).unwrap_or_else(|_| {
268 error!(url = url, "Postgres URL Pre Validation has failed.");
269 tokio_postgres::config::Config::default()
270 });
271 if let Some(user) = user {
272 config.user(user);
273 }
274 if let Some(pass) = pass {
275 config.password(pass);
276 }
277
278 let pg_mgr = PostgresConnectionManager::new(config, tokio_postgres::NoTls);
279
280 match Pool::builder()
281 .max_size(max_connections)
282 .max_lifetime(Some(core::time::Duration::from_secs(max_lifetime.into())))
283 .min_idle(min_idle)
284 .connection_timeout(core::time::Duration::from_secs(connection_timeout.into()))
285 .build(pg_mgr)
286 .await
287 {
288 Ok(pool) => {
289 debug!("Event DB pool configured.");
290 if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
291 error!("Failed to set EVENT_DB_POOL. Already set?");
292 }
293 },
294 Err(err) => {
295 error!(error = %err, "Failed to establish connection with EventDB pool");
296 },
297 }
298}