1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
//! Catalyst Election Database crate
use std::{
atomic::{AtomicBool, Ordering},
Arc, OnceLock,
use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use error::NotFoundError;
use futures::{Stream, StreamExt, TryStreamExt};
use tokio_postgres::{types::ToSql, NoTls, Row};
use tracing::{debug, debug_span, error, Instrument};
use crate::settings::Settings;
pub(crate) mod common;
pub(crate) mod config;
pub(crate) mod error;
pub(crate) mod legacy;
pub(crate) mod schema_check;
pub(crate) mod signed_docs;
/// Database version this crate matches.
/// Must equal the last Migrations Version Number from `event-db/migrations`.
pub(crate) const DATABASE_SCHEMA_VERSION: i32 = 2;
/// Postgres Connection Manager DB Pool
type SqlDbPool = Arc<Pool<PostgresConnectionManager<NoTls>>>;
/// Postgres Connection Manager DB Pool Instance
static EVENT_DB_POOL: OnceLock<SqlDbPool> = OnceLock::new();
/// Is Deep Query Analysis enabled or not?
static DEEP_QUERY_INSPECT: AtomicBool = AtomicBool::new(false);
/// The Catalyst Event SQL Database
pub(crate) struct EventDB {}
/// `EventDB` Errors
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub(crate) enum Error {
/// Failed to get a DB Pool
#[error("DB Pool uninitialized")]
impl EventDB {
/// Determine if deep query inspection is enabled.
pub(crate) fn is_deep_query_enabled() -> bool {
/// Modify the deep query inspection setting.
/// # Arguments
/// * `enable` - Set the `DeepQueryInspection` setting to this value.
pub(crate) fn modify_deep_query(enable: bool) {, Ordering::SeqCst);
/// Query the database.
/// If deep query inspection is enabled, this will log the query plan inside a
/// rolled-back transaction, before running the query.
/// # Arguments
/// * `stmt` - `&str` SQL statement.
/// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
/// # Returns
/// `anyhow::Result<Vec<Row>>`
#[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
pub(crate) async fn query(
stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> anyhow::Result<Vec<Row>> {
if Self::is_deep_query_enabled() {
Self::explain_analyze_rollback(stmt, params).await?;
let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
let conn = pool.get().await?;
let rows = conn.query(stmt, params).await?;
/// Query the database and return a async stream of rows.
/// If deep query inspection is enabled, this will log the query plan inside a
/// rolled-back transaction, before running the query.
/// # Arguments
/// * `stmt` - `&str` SQL statement.
/// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
/// # Returns
/// `anyhow::Result<impl Stream<Item = anyhow::Result<Row>>>`
#[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
pub(crate) async fn query_stream(
stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>>> {
if Self::is_deep_query_enabled() {
Self::explain_analyze_rollback(stmt, params).await?;
let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
let conn = pool.get().await?;
let rows = conn.query_raw(stmt, params.iter().copied()).await?;
/// Query the database for a single row.
/// # Arguments
/// * `stmt` - `&str` SQL statement.
/// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
/// # Returns
/// `Result<Row, anyhow::Error>`
#[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
pub(crate) async fn query_one(
stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> anyhow::Result<Row> {
if Self::is_deep_query_enabled() {
Self::explain_analyze_rollback(stmt, params).await?;
let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
let conn = pool.get().await?;
let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
/// Modify the database.
/// Use this for `UPDATE`, `DELETE`, and other DB statements that
/// don't return data.
/// # Arguments
/// * `stmt` - `&str` SQL statement.
/// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
/// # Returns
/// `anyhow::Result<()>`
pub(crate) async fn modify(stmt: &str, params: &[&(dyn ToSql + Sync)]) -> anyhow::Result<()> {
if Self::is_deep_query_enabled() {
Self::explain_analyze_commit(stmt, params).await?;
} else {
let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
let conn = pool.get().await?;
conn.query(stmt, params).await?;
/// Prepend `EXPLAIN ANALYZE` to the query, and rollback the transaction.
async fn explain_analyze_rollback(
stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> anyhow::Result<()> {
Self::explain_analyze(stmt, params, true).await
/// Prepend `EXPLAIN ANALYZE` to the query, and commit the transaction.
async fn explain_analyze_commit(
stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> anyhow::Result<()> {
Self::explain_analyze(stmt, params, false).await
/// Prepend `EXPLAIN ANALYZE` to the query.
/// Log the query plan inside a transaction that may be committed or rolled back.
/// # Arguments
/// * `stmt` - `&str` SQL statement.
/// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
/// * `rollback` - `bool` whether to roll back the transaction or not.
async fn explain_analyze(
stmt: &str, params: &[&(dyn ToSql + Sync)], rollback: bool,
) -> anyhow::Result<()> {
let span = debug_span!(
query_statement = stmt,
params = format!("{:?}", params),
uuid = uuid::Uuid::new_v4().to_string()
async move {
let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
let mut conn = pool.get().await?;
let transaction = conn.transaction().await?;
let explain_stmt = transaction
.prepare(format!("EXPLAIN ANALYZE {stmt}").as_str())
let rows = transaction.query(&explain_stmt, params).await?;
for r in rows {
let query_plan_str: String = r.get("QUERY PLAN");
debug!("{}", query_plan_str);
if rollback {
} else {
/// Establish a connection to the database, and check the schema is up-to-date.
/// # Parameters
/// * `url` set to the postgres connection string needed to connect to the database. IF
/// it is None, then the env var "`DATABASE_URL`" will be used for this connection
/// string. eg: "`postgres://catalyst-dev:CHANGE_ME@localhost/CatalystDev`"
/// * `do_schema_check` boolean flag to decide whether to verify the schema version or
/// not. If it is `true`, a query is made to verify the DB schema version.
/// # Errors
/// This function will return an error if:
/// * `url` is None and the environment variable "`DATABASE_URL`" isn't set.
/// * There is any error communicating the the database to check its schema.
/// * The database schema in the DB does not 100% match the schema supported by this
/// library.
/// # Notes
/// The env var "`DATABASE_URL`" can be set directly as an anv var, or in a
/// `.env` file.
pub fn establish_connection() {
let (url, user, pass) = Settings::event_db_settings();
// This was pre-validated and can't fail, but provide default in the impossible case it
// does.
let mut config = tokio_postgres::config::Config::from_str(url).unwrap_or_else(|_| {
error!(url = url, "Postgres URL Pre Validation has failed.");
if let Some(user) = user {
if let Some(pass) = pass {
let pg_mgr = PostgresConnectionManager::new(config, tokio_postgres::NoTls);
let pool = Pool::builder().build_unchecked(pg_mgr);
if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
error!("Failed to set event db pool. Called Twice?");