cat_gateway/db/index/
session.rs

1//! Session creation and storage
2
3use std::{
4    fmt::Debug,
5    path::PathBuf,
6    sync::{Arc, OnceLock},
7    time::Duration,
8};
9
10use cardano_blockchain_types::Network;
11use openssl::ssl::{SslContextBuilder, SslFiletype, SslMethod, SslVerifyMode};
12use scylla::{
13    frame::Compression, serialize::row::SerializeRow, transport::iterator::QueryPager,
14    ExecutionProfile, Session, SessionBuilder,
15};
16use thiserror::Error;
17use tokio::fs;
18use tracing::{error, info};
19
20use super::{
21    queries::{
22        purge::{self, PreparedDeleteQuery},
23        FallibleQueryResults, PreparedQueries, PreparedQuery, PreparedSelectQuery,
24        PreparedUpsertQuery,
25    },
26    schema::create_schema,
27};
28use crate::{
29    service::utilities::health::set_index_db_liveness,
30    settings::{cassandra_db, Settings},
31};
32
33/// Configuration Choices for compression
34#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames)]
35#[strum(ascii_case_insensitive)]
36pub(crate) enum CompressionChoice {
37    /// LZ4 link data compression.
38    Lz4,
39    /// Snappy link data compression.
40    Snappy,
41    /// No compression.
42    None,
43}
44
45/// Configuration Choices for TLS.
46#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames, PartialEq)]
47#[strum(ascii_case_insensitive)]
48pub(crate) enum TlsChoice {
49    /// Disable TLS.
50    Disabled,
51    /// Verifies that the peer's certificate is trusted.
52    Verified,
53    /// Disables verification of the peer's certificate.
54    Unverified,
55}
56
57/// Represents errors that can occur while interacting with a Cassandra session.
58#[derive(Debug, Error)]
59pub(crate) enum CassandraSessionError {
60    /// Error when creating a session fails.
61    #[error("Creating session failed: {source}")]
62    CreatingSessionFailed {
63        /// The underlying error that caused the session creation to fail.
64        #[source]
65        source: anyhow::Error,
66    },
67    /// Error when connecting to database.
68    #[error("Database connection failed: {source}")]
69    ConnectionUnavailable {
70        /// The underlying error that caused the connection failure.
71        #[source]
72        source: anyhow::Error,
73    },
74    /// Error when schema migration fails.
75    #[error("Schema migration failed: {source}")]
76    SchemaMigrationFailed {
77        /// The underlying error that caused the schema migration to fail.
78        #[source]
79        source: anyhow::Error,
80    },
81    /// Error when preparing queries fails.
82    #[error("Preparing queries failed: {source}")]
83    PreparingQueriesFailed {
84        /// The underlying error that caused query preparation to fail.
85        #[source]
86        source: anyhow::Error,
87    },
88    /// Error when preparing purge queries fails.
89    #[error("Preparing purge queries failed: {source}")]
90    PreparingPurgeQueriesFailed {
91        /// The underlying error that caused purge query preparation to fail.
92        #[source]
93        source: anyhow::Error,
94    },
95    /// Error indicating that the session has already been set.
96    #[error("Session already set")]
97    SessionAlreadySet,
98    /// Should be used by the caller when it fails to acquire the initialized database
99    /// session.
100    #[error("Failed acquiring database session")]
101    FailedAcquiringSession,
102}
103
104/// All interaction with cassandra goes through this struct.
105#[derive(Clone)]
106pub(crate) struct CassandraSession {
107    /// Is the session to the persistent or volatile DB?
108    #[allow(dead_code)]
109    persistent: bool,
110    /// Configuration for this session.
111    cfg: Arc<cassandra_db::EnvVars>,
112    /// The actual session.
113    session: Arc<Session>,
114    /// All prepared queries we can use on this session.
115    queries: Arc<PreparedQueries>,
116    /// All prepared purge queries we can use on this session.
117    purge_queries: Arc<purge::PreparedQueries>,
118}
119
120/// Session error while initialization.
121static INIT_SESSION_ERROR: OnceLock<Arc<CassandraSessionError>> = OnceLock::new();
122
123/// Persistent DB Session.
124static PERSISTENT_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
125
126/// Volatile DB Session.
127static VOLATILE_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
128
129impl CassandraSession {
130    /// Initialise the Cassandra Cluster Connections.
131    pub(crate) fn init() {
132        let (persistent, volatile) = Settings::cassandra_db_cfg();
133        let network = Settings::cardano_network();
134
135        let _join_handle =
136            tokio::task::spawn(
137                async move { Box::pin(retry_init(persistent, network, true)).await },
138            );
139        let _join_handle =
140            tokio::task::spawn(async move { Box::pin(retry_init(volatile, network, false)).await });
141    }
142
143    /// Check to see if the Cassandra Indexing DB is ready for use
144    pub(crate) fn is_ready() -> bool {
145        PERSISTENT_SESSION.get().is_some() && VOLATILE_SESSION.get().is_some()
146    }
147
148    /// Wait for the Cassandra Indexing DB to be ready before continuing
149    pub(crate) async fn wait_until_ready(
150        interval: Duration, ignore_err: bool,
151    ) -> Result<(), Arc<CassandraSessionError>> {
152        loop {
153            if !ignore_err {
154                if let Some(err) = INIT_SESSION_ERROR.get() {
155                    set_index_db_liveness(false);
156                    return Err(err.clone());
157                }
158            }
159
160            if Self::is_ready() {
161                set_index_db_liveness(true);
162                return Ok(());
163            }
164
165            tokio::time::sleep(interval).await;
166        }
167    }
168
169    /// Get the session needed to perform a query.
170    pub(crate) fn get(persistent: bool) -> Option<Arc<CassandraSession>> {
171        if persistent {
172            PERSISTENT_SESSION.get().cloned()
173        } else {
174            VOLATILE_SESSION.get().cloned()
175        }
176    }
177
178    /// Executes a select query with the given parameters.
179    ///
180    /// Returns an iterator that iterates over all the result pages that the query
181    /// returns.
182    pub(crate) async fn execute_iter<P>(
183        &self, select_query: PreparedSelectQuery, params: P,
184    ) -> anyhow::Result<QueryPager>
185    where P: SerializeRow {
186        let session = self.session.clone();
187        let queries = self.queries.clone();
188
189        queries.execute_iter(session, select_query, params).await
190    }
191
192    /// Execute a Batch query with the given parameters.
193    ///
194    /// Values should be a Vec of values which implement `SerializeRow` and they MUST be
195    /// the same, and must match the query being executed.
196    ///
197    /// This will divide the batch into optimal sized chunks and execute them until all
198    /// values have been executed or the first error is encountered.
199    pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
200        &self, query: PreparedQuery, values: Vec<T>,
201    ) -> FallibleQueryResults {
202        let session = self.session.clone();
203        let cfg = self.cfg.clone();
204        let queries = self.queries.clone();
205
206        queries.execute_batch(session, cfg, query, values).await
207    }
208
209    /// Execute a query which returns no results, except an error if it fails.
210    /// Can not be batched, takes a single set of parameters.
211    pub(crate) async fn execute_upsert<T: SerializeRow + Debug>(
212        &self, query: PreparedUpsertQuery, value: T,
213    ) -> anyhow::Result<()> {
214        let session = self.session.clone();
215        let queries = self.queries.clone();
216
217        queries.execute_upsert(session, query, value).await
218    }
219
220    /// Execute a purge query with the given parameters.
221    ///
222    /// Values should be a Vec of values which implement `SerializeRow` and they MUST be
223    /// the same, and must match the query being executed.
224    ///
225    /// This will divide the batch into optimal sized chunks and execute them until all
226    /// values have been executed or the first error is encountered.
227    ///
228    /// NOTE: This is currently only used to purge volatile data.
229    pub(crate) async fn purge_execute_batch<T: SerializeRow + Debug>(
230        &self, query: PreparedDeleteQuery, values: Vec<T>,
231    ) -> FallibleQueryResults {
232        // Only execute purge queries on the volatile session
233        let persistent = false;
234        let Some(volatile_db) = Self::get(persistent) else {
235            // This should never happen
236            anyhow::bail!("Volatile DB Session not found");
237        };
238        let cfg = self.cfg.clone();
239        let queries = self.purge_queries.clone();
240        let session = volatile_db.session.clone();
241
242        queries.execute_batch(session, cfg, query, values).await
243    }
244
245    /// Execute a select query to gather primary keys for purging.
246    pub(crate) async fn purge_execute_iter(
247        &self, query: purge::PreparedSelectQuery,
248    ) -> anyhow::Result<QueryPager> {
249        // Only execute purge queries on the volatile session
250        let persistent = false;
251        let Some(volatile_db) = Self::get(persistent) else {
252            // This should never happen
253            anyhow::bail!("Volatile DB Session not found");
254        };
255        let queries = self.purge_queries.clone();
256
257        queries
258            .execute_iter(volatile_db.session.clone(), query)
259            .await
260    }
261
262    /// Get underlying Raw Cassandra Session.
263    pub(crate) fn get_raw_session(&self) -> Arc<Session> {
264        self.session.clone()
265    }
266}
267
268/// Create a new execution profile based on the given configuration.
269///
270/// The intention here is that we should be able to tune this based on configuration,
271/// but for now we don't so the `cfg` is not used yet.
272fn make_execution_profile(_cfg: &cassandra_db::EnvVars) -> ExecutionProfile {
273    ExecutionProfile::builder()
274        .consistency(scylla::statement::Consistency::LocalQuorum)
275        .serial_consistency(Some(scylla::statement::SerialConsistency::LocalSerial))
276        .retry_policy(Arc::new(scylla::retry_policy::DefaultRetryPolicy::new()))
277        .load_balancing_policy(
278            scylla::load_balancing::DefaultPolicyBuilder::new()
279                .permit_dc_failover(true)
280                .build(),
281        )
282        .speculative_execution_policy(Some(Arc::new(
283            scylla::speculative_execution::SimpleSpeculativeExecutionPolicy {
284                max_retry_count: 3,
285                retry_interval: Duration::from_millis(100),
286            },
287        )))
288        .build()
289}
290
291/// Construct a session based on the given configuration.
292async fn make_session(cfg: &cassandra_db::EnvVars) -> anyhow::Result<Arc<Session>> {
293    let cluster_urls: Vec<&str> = cfg.url.as_str().split(',').collect();
294
295    let mut sb = SessionBuilder::new()
296        .known_nodes(cluster_urls)
297        .auto_await_schema_agreement(false);
298
299    let profile_handle = make_execution_profile(cfg).into_handle();
300    sb = sb.default_execution_profile_handle(profile_handle);
301
302    sb = match cfg.compression {
303        CompressionChoice::Lz4 => sb.compression(Some(Compression::Lz4)),
304        CompressionChoice::Snappy => sb.compression(Some(Compression::Snappy)),
305        CompressionChoice::None => sb.compression(None),
306    };
307
308    if cfg.tls != TlsChoice::Disabled {
309        let mut context_builder = SslContextBuilder::new(SslMethod::tls())?;
310
311        if let Some(cert_name) = &cfg.tls_cert {
312            let certdir = fs::canonicalize(PathBuf::from(cert_name.as_str())).await?;
313            context_builder.set_certificate_file(certdir.as_path(), SslFiletype::PEM)?;
314        }
315
316        if cfg.tls == TlsChoice::Verified {
317            context_builder.set_verify(SslVerifyMode::PEER);
318        } else {
319            context_builder.set_verify(SslVerifyMode::NONE);
320        }
321
322        let ssl_context = context_builder.build();
323
324        sb = sb.ssl_context(Some(ssl_context));
325    }
326
327    // Set the username and password, if required.
328    if let Some(username) = &cfg.username {
329        if let Some(password) = &cfg.password {
330            sb = sb.user(username.as_str(), password.as_str());
331        }
332    }
333
334    let session = Box::pin(sb.build()).await?;
335
336    Ok(Arc::new(session))
337}
338
339/// Continuously try and init the DB, if it fails, backoff.
340///
341/// Display reasonable logs to help diagnose DB connection issues.
342async fn retry_init(cfg: cassandra_db::EnvVars, network: Network, persistent: bool) {
343    let mut retry_delay = Duration::from_secs(0);
344    let db_type = if persistent { "Persistent" } else { "Volatile" };
345
346    info!(
347        db_type = db_type,
348        network = %network,
349        "Index DB Session Creation: Started."
350    );
351
352    cfg.log(persistent, network);
353
354    loop {
355        tokio::time::sleep(retry_delay).await;
356        retry_delay = Duration::from_secs(30); // 30 seconds if we every try again.
357
358        info!(
359            db_type = db_type,
360            network = %network,
361            "Attempting to connect to Cassandra DB..."
362        );
363
364        // Create a Session to the Cassandra DB.
365        let session = match make_session(&cfg).await {
366            Ok(session) => session,
367            Err(error) => {
368                error!(
369                    db_type = db_type,
370                    network = %network,
371                    error = format!("{error:?}"),
372                    "Failed to Create Cassandra DB Session"
373                );
374                drop(INIT_SESSION_ERROR.set(Arc::new(
375                    CassandraSessionError::CreatingSessionFailed { source: error },
376                )));
377                continue;
378            },
379        };
380
381        // Set up the Schema for it.
382        if let Err(error) = create_schema(&mut session.clone(), &cfg, persistent, network).await {
383            error!(
384                db_type = db_type,
385                network = %network,
386                error = format!("{error:?}"),
387                "Failed to Create Cassandra DB Schema"
388            );
389            drop(
390                INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SchemaMigrationFailed {
391                    source: error,
392                })),
393            );
394            continue;
395        }
396
397        let queries = match PreparedQueries::new(session.clone(), &cfg).await {
398            Ok(queries) => Arc::new(queries),
399            Err(error) => {
400                error!(
401                    db_type = db_type,
402                    network = %network,
403                    error = %error,
404                    "Failed to Create Cassandra Prepared Queries"
405                );
406                drop(INIT_SESSION_ERROR.set(Arc::new(
407                    CassandraSessionError::PreparingQueriesFailed { source: error },
408                )));
409                continue;
410            },
411        };
412
413        let purge_queries = match Box::pin(purge::PreparedQueries::new(session.clone(), &cfg)).await
414        {
415            Ok(queries) => Arc::new(queries),
416            Err(error) => {
417                error!(
418                    db_type = db_type,
419                    network = %network,
420                    error = %error,
421                    "Failed to Create Cassandra Prepared Purge Queries"
422                );
423                drop(INIT_SESSION_ERROR.set(Arc::new(
424                    CassandraSessionError::PreparingPurgeQueriesFailed { source: error },
425                )));
426                continue;
427            },
428        };
429
430        let cassandra_session = CassandraSession {
431            persistent,
432            cfg: Arc::new(cfg),
433            session,
434            queries,
435            purge_queries,
436        };
437
438        // Save the session so we can execute queries on the DB
439        if persistent {
440            if PERSISTENT_SESSION.set(Arc::new(cassandra_session)).is_err() {
441                error!("Persistent Session already set.  This should not happen.");
442                drop(INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SessionAlreadySet)));
443            };
444        } else if VOLATILE_SESSION.set(Arc::new(cassandra_session)).is_err() {
445            error!("Volatile Session already set.  This should not happen.");
446            drop(INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SessionAlreadySet)));
447        };
448
449        // IF we get here, then everything seems to have worked, so finish init.
450        break;
451    }
452
453    info!(db_type = db_type, network = %network, "Index DB Session Creation: OK.");
454}