cat_gateway/db/index/
session.rs

1//! Session creation and storage
2
3use std::{
4    fmt::Debug,
5    path::PathBuf,
6    sync::{Arc, OnceLock},
7    time::Duration,
8};
9
10use openssl::ssl::{SslContextBuilder, SslFiletype, SslMethod, SslVerifyMode};
11use scylla::{
12    frame::Compression, serialize::row::SerializeRow, transport::iterator::QueryPager,
13    ExecutionProfile, Session, SessionBuilder,
14};
15use thiserror::Error;
16use tokio::fs;
17use tracing::{error, info};
18
19use super::{
20    queries::{
21        purge::{self, PreparedDeleteQuery},
22        FallibleQueryResults, PreparedQueries, PreparedQuery, PreparedSelectQuery,
23        PreparedUpsertQuery,
24    },
25    schema::create_schema,
26};
27use crate::settings::{cassandra_db, Settings};
28
29/// Configuration Choices for compression
30#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames)]
31#[strum(ascii_case_insensitive)]
32pub(crate) enum CompressionChoice {
33    /// LZ4 link data compression.
34    Lz4,
35    /// Snappy link data compression.
36    Snappy,
37    /// No compression.
38    None,
39}
40
41/// Configuration Choices for TLS.
42#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames, PartialEq)]
43#[strum(ascii_case_insensitive)]
44pub(crate) enum TlsChoice {
45    /// Disable TLS.
46    Disabled,
47    /// Verifies that the peer's certificate is trusted.
48    Verified,
49    /// Disables verification of the peer's certificate.
50    Unverified,
51}
52
53/// Represents errors that can occur while interacting with a Cassandra session.
54#[derive(Debug, Error)]
55pub(crate) enum CassandraSessionError {
56    /// Error when creating a session fails.
57    #[error("Creating session failed: {source}")]
58    CreatingSessionFailed {
59        /// The underlying error that caused the session creation to fail.
60        source: anyhow::Error,
61    },
62    /// Error when schema migration fails.
63    #[error("Schema migration failed: {source}")]
64    SchemaMigrationFailed {
65        /// The underlying error that caused the schema migration to fail.
66        source: anyhow::Error,
67    },
68    /// Error when preparing queries fails.
69    #[error("Preparing queries failed: {source}")]
70    PreparingQueriesFailed {
71        /// The underlying error that caused query preparation to fail.
72        source: anyhow::Error,
73    },
74    /// Error when preparing purge queries fails.
75    #[error("Preparing purge queries failed: {source}")]
76    PreparingPurgeQueriesFailed {
77        /// The underlying error that caused purge query preparation to fail.
78        source: anyhow::Error,
79    },
80    /// Error indicating that the session has already been set.
81    #[error("Session already set")]
82    SessionAlreadySet,
83}
84
85/// All interaction with cassandra goes through this struct.
86#[derive(Clone)]
87pub(crate) struct CassandraSession {
88    /// Is the session to the persistent or volatile DB?
89    #[allow(dead_code)]
90    persistent: bool,
91    /// Configuration for this session.
92    cfg: Arc<cassandra_db::EnvVars>,
93    /// The actual session.
94    session: Arc<Session>,
95    /// All prepared queries we can use on this session.
96    queries: Arc<PreparedQueries>,
97    /// All prepared purge queries we can use on this session.
98    purge_queries: Arc<purge::PreparedQueries>,
99}
100
101/// Session error while initialization.
102static INIT_SESSION_ERROR: OnceLock<Arc<CassandraSessionError>> = OnceLock::new();
103
104/// Persistent DB Session.
105static PERSISTENT_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
106
107/// Volatile DB Session.
108static VOLATILE_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
109
110impl CassandraSession {
111    /// Initialise the Cassandra Cluster Connections.
112    pub(crate) fn init() {
113        let (persistent, volatile) = Settings::cassandra_db_cfg();
114
115        let _join_handle =
116            tokio::task::spawn(async move { Box::pin(retry_init(persistent, true)).await });
117        let _join_handle =
118            tokio::task::spawn(async move { Box::pin(retry_init(volatile, false)).await });
119    }
120
121    /// Check to see if the Cassandra Indexing DB is ready for use
122    pub(crate) fn is_ready() -> bool {
123        PERSISTENT_SESSION.get().is_some() && VOLATILE_SESSION.get().is_some()
124    }
125
126    /// Wait for the Cassandra Indexing DB to be ready before continuing
127    pub(crate) async fn wait_until_ready(
128        interval: Duration, ignore_err: bool,
129    ) -> Result<(), Arc<CassandraSessionError>> {
130        loop {
131            if !ignore_err {
132                if let Some(err) = INIT_SESSION_ERROR.get() {
133                    return Err(err.clone());
134                }
135            }
136
137            if Self::is_ready() {
138                return Ok(());
139            }
140
141            tokio::time::sleep(interval).await;
142        }
143    }
144
145    /// Get the session needed to perform a query.
146    pub(crate) fn get(persistent: bool) -> Option<Arc<CassandraSession>> {
147        if persistent {
148            PERSISTENT_SESSION.get().cloned()
149        } else {
150            VOLATILE_SESSION.get().cloned()
151        }
152    }
153
154    /// Executes a select query with the given parameters.
155    ///
156    /// Returns an iterator that iterates over all the result pages that the query
157    /// returns.
158    pub(crate) async fn execute_iter<P>(
159        &self, select_query: PreparedSelectQuery, params: P,
160    ) -> anyhow::Result<QueryPager>
161    where P: SerializeRow {
162        let session = self.session.clone();
163        let queries = self.queries.clone();
164
165        queries.execute_iter(session, select_query, params).await
166    }
167
168    /// Execute a Batch query with the given parameters.
169    ///
170    /// Values should be a Vec of values which implement `SerializeRow` and they MUST be
171    /// the same, and must match the query being executed.
172    ///
173    /// This will divide the batch into optimal sized chunks and execute them until all
174    /// values have been executed or the first error is encountered.
175    pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
176        &self, query: PreparedQuery, values: Vec<T>,
177    ) -> FallibleQueryResults {
178        let session = self.session.clone();
179        let cfg = self.cfg.clone();
180        let queries = self.queries.clone();
181
182        queries.execute_batch(session, cfg, query, values).await
183    }
184
185    /// Execute a query which returns no results, except an error if it fails.
186    /// Can not be batched, takes a single set of parameters.
187    pub(crate) async fn execute_upsert<T: SerializeRow + Debug>(
188        &self, query: PreparedUpsertQuery, value: T,
189    ) -> anyhow::Result<()> {
190        let session = self.session.clone();
191        let queries = self.queries.clone();
192
193        queries.execute_upsert(session, query, value).await
194    }
195
196    /// Execute a purge query with the given parameters.
197    ///
198    /// Values should be a Vec of values which implement `SerializeRow` and they MUST be
199    /// the same, and must match the query being executed.
200    ///
201    /// This will divide the batch into optimal sized chunks and execute them until all
202    /// values have been executed or the first error is encountered.
203    ///
204    /// NOTE: This is currently only used to purge volatile data.
205    pub(crate) async fn purge_execute_batch<T: SerializeRow + Debug>(
206        &self, query: PreparedDeleteQuery, values: Vec<T>,
207    ) -> FallibleQueryResults {
208        // Only execute purge queries on the volatile session
209        let persistent = false;
210        let Some(volatile_db) = Self::get(persistent) else {
211            // This should never happen
212            anyhow::bail!("Volatile DB Session not found");
213        };
214        let cfg = self.cfg.clone();
215        let queries = self.purge_queries.clone();
216        let session = volatile_db.session.clone();
217
218        queries.execute_batch(session, cfg, query, values).await
219    }
220
221    /// Execute a select query to gather primary keys for purging.
222    pub(crate) async fn purge_execute_iter(
223        &self, query: purge::PreparedSelectQuery,
224    ) -> anyhow::Result<QueryPager> {
225        // Only execute purge queries on the volatile session
226        let persistent = false;
227        let Some(volatile_db) = Self::get(persistent) else {
228            // This should never happen
229            anyhow::bail!("Volatile DB Session not found");
230        };
231        let queries = self.purge_queries.clone();
232
233        queries
234            .execute_iter(volatile_db.session.clone(), query)
235            .await
236    }
237
238    /// Get underlying Raw Cassandra Session.
239    pub(crate) fn get_raw_session(&self) -> Arc<Session> {
240        self.session.clone()
241    }
242}
243
244/// Create a new execution profile based on the given configuration.
245///
246/// The intention here is that we should be able to tune this based on configuration,
247/// but for now we don't so the `cfg` is not used yet.
248fn make_execution_profile(_cfg: &cassandra_db::EnvVars) -> ExecutionProfile {
249    ExecutionProfile::builder()
250        .consistency(scylla::statement::Consistency::LocalQuorum)
251        .serial_consistency(Some(scylla::statement::SerialConsistency::LocalSerial))
252        .retry_policy(Arc::new(scylla::retry_policy::DefaultRetryPolicy::new()))
253        .load_balancing_policy(
254            scylla::load_balancing::DefaultPolicyBuilder::new()
255                .permit_dc_failover(true)
256                .build(),
257        )
258        .speculative_execution_policy(Some(Arc::new(
259            scylla::speculative_execution::SimpleSpeculativeExecutionPolicy {
260                max_retry_count: 3,
261                retry_interval: Duration::from_millis(100),
262            },
263        )))
264        .build()
265}
266
267/// Construct a session based on the given configuration.
268async fn make_session(cfg: &cassandra_db::EnvVars) -> anyhow::Result<Arc<Session>> {
269    let cluster_urls: Vec<&str> = cfg.url.as_str().split(',').collect();
270
271    let mut sb = SessionBuilder::new()
272        .known_nodes(cluster_urls)
273        .auto_await_schema_agreement(false);
274
275    let profile_handle = make_execution_profile(cfg).into_handle();
276    sb = sb.default_execution_profile_handle(profile_handle);
277
278    sb = match cfg.compression {
279        CompressionChoice::Lz4 => sb.compression(Some(Compression::Lz4)),
280        CompressionChoice::Snappy => sb.compression(Some(Compression::Snappy)),
281        CompressionChoice::None => sb.compression(None),
282    };
283
284    if cfg.tls != TlsChoice::Disabled {
285        let mut context_builder = SslContextBuilder::new(SslMethod::tls())?;
286
287        if let Some(cert_name) = &cfg.tls_cert {
288            let certdir = fs::canonicalize(PathBuf::from(cert_name.as_str())).await?;
289            context_builder.set_certificate_file(certdir.as_path(), SslFiletype::PEM)?;
290        }
291
292        if cfg.tls == TlsChoice::Verified {
293            context_builder.set_verify(SslVerifyMode::PEER);
294        } else {
295            context_builder.set_verify(SslVerifyMode::NONE);
296        }
297
298        let ssl_context = context_builder.build();
299
300        sb = sb.ssl_context(Some(ssl_context));
301    }
302
303    // Set the username and password, if required.
304    if let Some(username) = &cfg.username {
305        if let Some(password) = &cfg.password {
306            sb = sb.user(username.as_str(), password.as_str());
307        }
308    }
309
310    let session = Box::pin(sb.build()).await?;
311
312    Ok(Arc::new(session))
313}
314
315/// Continuously try and init the DB, if it fails, backoff.
316///
317/// Display reasonable logs to help diagnose DB connection issues.
318async fn retry_init(cfg: cassandra_db::EnvVars, persistent: bool) {
319    let mut retry_delay = Duration::from_secs(0);
320    let db_type = if persistent { "Persistent" } else { "Volatile" };
321
322    info!(db_type = db_type, "Index DB Session Creation: Started.");
323
324    cfg.log(persistent);
325
326    loop {
327        tokio::time::sleep(retry_delay).await;
328        retry_delay = Duration::from_secs(30); // 30 seconds if we every try again.
329
330        info!(
331            db_type = db_type,
332            "Attempting to connect to Cassandra DB..."
333        );
334
335        // Create a Session to the Cassandra DB.
336        let session = match make_session(&cfg).await {
337            Ok(session) => session,
338            Err(error) => {
339                error!(
340                    db_type = db_type,
341                    error = format!("{error:?}"),
342                    "Failed to Create Cassandra DB Session"
343                );
344                drop(INIT_SESSION_ERROR.set(Arc::new(
345                    CassandraSessionError::CreatingSessionFailed { source: error },
346                )));
347                continue;
348            },
349        };
350
351        // Set up the Schema for it.
352        if let Err(error) = create_schema(&mut session.clone(), &cfg).await {
353            error!(
354                db_type = db_type,
355                error = format!("{error:?}"),
356                "Failed to Create Cassandra DB Schema"
357            );
358            drop(
359                INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SchemaMigrationFailed {
360                    source: error,
361                })),
362            );
363            continue;
364        }
365
366        let queries = match PreparedQueries::new(session.clone(), &cfg).await {
367            Ok(queries) => Arc::new(queries),
368            Err(error) => {
369                error!(
370                    db_type = db_type,
371                    error = %error,
372                    "Failed to Create Cassandra Prepared Queries"
373                );
374                drop(INIT_SESSION_ERROR.set(Arc::new(
375                    CassandraSessionError::PreparingQueriesFailed { source: error },
376                )));
377                continue;
378            },
379        };
380
381        let purge_queries = match Box::pin(purge::PreparedQueries::new(session.clone(), &cfg)).await
382        {
383            Ok(queries) => Arc::new(queries),
384            Err(error) => {
385                error!(
386                    db_type = db_type,
387                    error = %error,
388                    "Failed to Create Cassandra Prepared Purge Queries"
389                );
390                drop(INIT_SESSION_ERROR.set(Arc::new(
391                    CassandraSessionError::PreparingPurgeQueriesFailed { source: error },
392                )));
393                continue;
394            },
395        };
396
397        let cassandra_session = CassandraSession {
398            persistent,
399            cfg: Arc::new(cfg),
400            session,
401            queries,
402            purge_queries,
403        };
404
405        // Save the session so we can execute queries on the DB
406        if persistent {
407            if PERSISTENT_SESSION.set(Arc::new(cassandra_session)).is_err() {
408                error!("Persistent Session already set.  This should not happen.");
409                drop(INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SessionAlreadySet)));
410            };
411        } else if VOLATILE_SESSION.set(Arc::new(cassandra_session)).is_err() {
412            error!("Volatile Session already set.  This should not happen.");
413            drop(INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SessionAlreadySet)));
414        };
415
416        // IF we get here, then everything seems to have worked, so finish init.
417        break;
418    }
419
420    info!(db_type = db_type, "Index DB Session Creation: OK.");
421}