1use std::{
4 fmt::Debug,
5 path::PathBuf,
6 sync::{Arc, OnceLock},
7 time::Duration,
8};
9
10use cardano_blockchain_types::Network;
11use openssl::ssl::{SslContextBuilder, SslFiletype, SslMethod, SslVerifyMode};
12use scylla::{
13 frame::Compression, serialize::row::SerializeRow, transport::iterator::QueryPager,
14 ExecutionProfile, Session, SessionBuilder,
15};
16use thiserror::Error;
17use tokio::fs;
18use tracing::{error, info};
19
20use super::{
21 queries::{
22 purge::{self, PreparedDeleteQuery},
23 FallibleQueryResults, PreparedQueries, PreparedQuery, PreparedSelectQuery,
24 PreparedUpsertQuery,
25 },
26 schema::create_schema,
27};
28use crate::{
29 service::utilities::health::set_index_db_liveness,
30 settings::{cassandra_db, Settings},
31};
32
33#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames)]
35#[strum(ascii_case_insensitive)]
36pub(crate) enum CompressionChoice {
37 Lz4,
39 Snappy,
41 None,
43}
44
45#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames, PartialEq)]
47#[strum(ascii_case_insensitive)]
48pub(crate) enum TlsChoice {
49 Disabled,
51 Verified,
53 Unverified,
55}
56
57#[derive(Debug, Error)]
59pub(crate) enum CassandraSessionError {
60 #[error("Creating session failed: {source}")]
62 CreatingSessionFailed {
63 #[source]
65 source: anyhow::Error,
66 },
67 #[error("Database connection failed: {source}")]
69 ConnectionUnavailable {
70 #[source]
72 source: anyhow::Error,
73 },
74 #[error("Schema migration failed: {source}")]
76 SchemaMigrationFailed {
77 #[source]
79 source: anyhow::Error,
80 },
81 #[error("Preparing queries failed: {source}")]
83 PreparingQueriesFailed {
84 #[source]
86 source: anyhow::Error,
87 },
88 #[error("Preparing purge queries failed: {source}")]
90 PreparingPurgeQueriesFailed {
91 #[source]
93 source: anyhow::Error,
94 },
95 #[error("Session already set")]
97 SessionAlreadySet,
98 #[error("Failed acquiring database session")]
101 FailedAcquiringSession,
102}
103
104#[derive(Clone)]
106pub(crate) struct CassandraSession {
107 #[allow(dead_code)]
109 persistent: bool,
110 cfg: Arc<cassandra_db::EnvVars>,
112 session: Arc<Session>,
114 queries: Arc<PreparedQueries>,
116 purge_queries: Arc<purge::PreparedQueries>,
118}
119
120static INIT_SESSION_ERROR: OnceLock<Arc<CassandraSessionError>> = OnceLock::new();
122
123static PERSISTENT_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
125
126static VOLATILE_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
128
129impl CassandraSession {
130 pub(crate) fn init() {
132 let (persistent, volatile) = Settings::cassandra_db_cfg();
133 let network = Settings::cardano_network();
134
135 let _join_handle =
136 tokio::task::spawn(
137 async move { Box::pin(retry_init(persistent, network, true)).await },
138 );
139 let _join_handle =
140 tokio::task::spawn(async move { Box::pin(retry_init(volatile, network, false)).await });
141 }
142
143 pub(crate) fn is_ready() -> bool {
145 PERSISTENT_SESSION.get().is_some() && VOLATILE_SESSION.get().is_some()
146 }
147
148 pub(crate) async fn wait_until_ready(
150 interval: Duration, ignore_err: bool,
151 ) -> Result<(), Arc<CassandraSessionError>> {
152 loop {
153 if !ignore_err {
154 if let Some(err) = INIT_SESSION_ERROR.get() {
155 set_index_db_liveness(false);
156 return Err(err.clone());
157 }
158 }
159
160 if Self::is_ready() {
161 set_index_db_liveness(true);
162 return Ok(());
163 }
164
165 tokio::time::sleep(interval).await;
166 }
167 }
168
169 pub(crate) fn get(persistent: bool) -> Option<Arc<CassandraSession>> {
171 if persistent {
172 PERSISTENT_SESSION.get().cloned()
173 } else {
174 VOLATILE_SESSION.get().cloned()
175 }
176 }
177
178 pub(crate) async fn execute_iter<P>(
183 &self, select_query: PreparedSelectQuery, params: P,
184 ) -> anyhow::Result<QueryPager>
185 where P: SerializeRow {
186 let session = self.session.clone();
187 let queries = self.queries.clone();
188
189 queries.execute_iter(session, select_query, params).await
190 }
191
192 pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
200 &self, query: PreparedQuery, values: Vec<T>,
201 ) -> FallibleQueryResults {
202 let session = self.session.clone();
203 let cfg = self.cfg.clone();
204 let queries = self.queries.clone();
205
206 queries.execute_batch(session, cfg, query, values).await
207 }
208
209 pub(crate) async fn execute_upsert<T: SerializeRow + Debug>(
212 &self, query: PreparedUpsertQuery, value: T,
213 ) -> anyhow::Result<()> {
214 let session = self.session.clone();
215 let queries = self.queries.clone();
216
217 queries.execute_upsert(session, query, value).await
218 }
219
220 pub(crate) async fn purge_execute_batch<T: SerializeRow + Debug>(
230 &self, query: PreparedDeleteQuery, values: Vec<T>,
231 ) -> FallibleQueryResults {
232 let persistent = false;
234 let Some(volatile_db) = Self::get(persistent) else {
235 anyhow::bail!("Volatile DB Session not found");
237 };
238 let cfg = self.cfg.clone();
239 let queries = self.purge_queries.clone();
240 let session = volatile_db.session.clone();
241
242 queries.execute_batch(session, cfg, query, values).await
243 }
244
245 pub(crate) async fn purge_execute_iter(
247 &self, query: purge::PreparedSelectQuery,
248 ) -> anyhow::Result<QueryPager> {
249 let persistent = false;
251 let Some(volatile_db) = Self::get(persistent) else {
252 anyhow::bail!("Volatile DB Session not found");
254 };
255 let queries = self.purge_queries.clone();
256
257 queries
258 .execute_iter(volatile_db.session.clone(), query)
259 .await
260 }
261
262 pub(crate) fn get_raw_session(&self) -> Arc<Session> {
264 self.session.clone()
265 }
266}
267
268fn make_execution_profile(_cfg: &cassandra_db::EnvVars) -> ExecutionProfile {
273 ExecutionProfile::builder()
274 .consistency(scylla::statement::Consistency::LocalQuorum)
275 .serial_consistency(Some(scylla::statement::SerialConsistency::LocalSerial))
276 .retry_policy(Arc::new(scylla::retry_policy::DefaultRetryPolicy::new()))
277 .load_balancing_policy(
278 scylla::load_balancing::DefaultPolicyBuilder::new()
279 .permit_dc_failover(true)
280 .build(),
281 )
282 .speculative_execution_policy(Some(Arc::new(
283 scylla::speculative_execution::SimpleSpeculativeExecutionPolicy {
284 max_retry_count: 3,
285 retry_interval: Duration::from_millis(100),
286 },
287 )))
288 .build()
289}
290
291async fn make_session(cfg: &cassandra_db::EnvVars) -> anyhow::Result<Arc<Session>> {
293 let cluster_urls: Vec<&str> = cfg.url.as_str().split(',').collect();
294
295 let mut sb = SessionBuilder::new()
296 .known_nodes(cluster_urls)
297 .auto_await_schema_agreement(false);
298
299 let profile_handle = make_execution_profile(cfg).into_handle();
300 sb = sb.default_execution_profile_handle(profile_handle);
301
302 sb = match cfg.compression {
303 CompressionChoice::Lz4 => sb.compression(Some(Compression::Lz4)),
304 CompressionChoice::Snappy => sb.compression(Some(Compression::Snappy)),
305 CompressionChoice::None => sb.compression(None),
306 };
307
308 if cfg.tls != TlsChoice::Disabled {
309 let mut context_builder = SslContextBuilder::new(SslMethod::tls())?;
310
311 if let Some(cert_name) = &cfg.tls_cert {
312 let certdir = fs::canonicalize(PathBuf::from(cert_name.as_str())).await?;
313 context_builder.set_certificate_file(certdir.as_path(), SslFiletype::PEM)?;
314 }
315
316 if cfg.tls == TlsChoice::Verified {
317 context_builder.set_verify(SslVerifyMode::PEER);
318 } else {
319 context_builder.set_verify(SslVerifyMode::NONE);
320 }
321
322 let ssl_context = context_builder.build();
323
324 sb = sb.ssl_context(Some(ssl_context));
325 }
326
327 if let Some(username) = &cfg.username {
329 if let Some(password) = &cfg.password {
330 sb = sb.user(username.as_str(), password.as_str());
331 }
332 }
333
334 let session = Box::pin(sb.build()).await?;
335
336 Ok(Arc::new(session))
337}
338
339async fn retry_init(cfg: cassandra_db::EnvVars, network: Network, persistent: bool) {
343 let mut retry_delay = Duration::from_secs(0);
344 let db_type = if persistent { "Persistent" } else { "Volatile" };
345
346 info!(
347 db_type = db_type,
348 network = %network,
349 "Index DB Session Creation: Started."
350 );
351
352 cfg.log(persistent, network);
353
354 loop {
355 tokio::time::sleep(retry_delay).await;
356 retry_delay = Duration::from_secs(30); info!(
359 db_type = db_type,
360 network = %network,
361 "Attempting to connect to Cassandra DB..."
362 );
363
364 let session = match make_session(&cfg).await {
366 Ok(session) => session,
367 Err(error) => {
368 error!(
369 db_type = db_type,
370 network = %network,
371 error = format!("{error:?}"),
372 "Failed to Create Cassandra DB Session"
373 );
374 drop(INIT_SESSION_ERROR.set(Arc::new(
375 CassandraSessionError::CreatingSessionFailed { source: error },
376 )));
377 continue;
378 },
379 };
380
381 if let Err(error) = create_schema(&mut session.clone(), &cfg, persistent, network).await {
383 error!(
384 db_type = db_type,
385 network = %network,
386 error = format!("{error:?}"),
387 "Failed to Create Cassandra DB Schema"
388 );
389 drop(
390 INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SchemaMigrationFailed {
391 source: error,
392 })),
393 );
394 continue;
395 }
396
397 let queries = match PreparedQueries::new(session.clone(), &cfg).await {
398 Ok(queries) => Arc::new(queries),
399 Err(error) => {
400 error!(
401 db_type = db_type,
402 network = %network,
403 error = %error,
404 "Failed to Create Cassandra Prepared Queries"
405 );
406 drop(INIT_SESSION_ERROR.set(Arc::new(
407 CassandraSessionError::PreparingQueriesFailed { source: error },
408 )));
409 continue;
410 },
411 };
412
413 let purge_queries = match Box::pin(purge::PreparedQueries::new(session.clone(), &cfg)).await
414 {
415 Ok(queries) => Arc::new(queries),
416 Err(error) => {
417 error!(
418 db_type = db_type,
419 network = %network,
420 error = %error,
421 "Failed to Create Cassandra Prepared Purge Queries"
422 );
423 drop(INIT_SESSION_ERROR.set(Arc::new(
424 CassandraSessionError::PreparingPurgeQueriesFailed { source: error },
425 )));
426 continue;
427 },
428 };
429
430 let cassandra_session = CassandraSession {
431 persistent,
432 cfg: Arc::new(cfg),
433 session,
434 queries,
435 purge_queries,
436 };
437
438 if persistent {
440 if PERSISTENT_SESSION.set(Arc::new(cassandra_session)).is_err() {
441 error!("Persistent Session already set. This should not happen.");
442 drop(INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SessionAlreadySet)));
443 };
444 } else if VOLATILE_SESSION.set(Arc::new(cassandra_session)).is_err() {
445 error!("Volatile Session already set. This should not happen.");
446 drop(INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SessionAlreadySet)));
447 };
448
449 break;
451 }
452
453 info!(db_type = db_type, network = %network, "Index DB Session Creation: OK.");
454}