1use std::{
4 fmt::Debug,
5 path::PathBuf,
6 sync::{Arc, OnceLock},
7 time::Duration,
8};
9
10use openssl::ssl::{SslContextBuilder, SslFiletype, SslMethod, SslVerifyMode};
11use scylla::{
12 frame::Compression, serialize::row::SerializeRow, transport::iterator::QueryPager,
13 ExecutionProfile, Session, SessionBuilder,
14};
15use thiserror::Error;
16use tokio::fs;
17use tracing::{error, info};
18
19use super::{
20 queries::{
21 purge::{self, PreparedDeleteQuery},
22 FallibleQueryResults, PreparedQueries, PreparedQuery, PreparedSelectQuery,
23 PreparedUpsertQuery,
24 },
25 schema::create_schema,
26};
27use crate::settings::{cassandra_db, Settings};
28
29#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames)]
31#[strum(ascii_case_insensitive)]
32pub(crate) enum CompressionChoice {
33 Lz4,
35 Snappy,
37 None,
39}
40
41#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames, PartialEq)]
43#[strum(ascii_case_insensitive)]
44pub(crate) enum TlsChoice {
45 Disabled,
47 Verified,
49 Unverified,
51}
52
53#[derive(Debug, Error)]
55pub(crate) enum CassandraSessionError {
56 #[error("Creating session failed: {source}")]
58 CreatingSessionFailed {
59 source: anyhow::Error,
61 },
62 #[error("Schema migration failed: {source}")]
64 SchemaMigrationFailed {
65 source: anyhow::Error,
67 },
68 #[error("Preparing queries failed: {source}")]
70 PreparingQueriesFailed {
71 source: anyhow::Error,
73 },
74 #[error("Preparing purge queries failed: {source}")]
76 PreparingPurgeQueriesFailed {
77 source: anyhow::Error,
79 },
80 #[error("Session already set")]
82 SessionAlreadySet,
83}
84
85#[derive(Clone)]
87pub(crate) struct CassandraSession {
88 #[allow(dead_code)]
90 persistent: bool,
91 cfg: Arc<cassandra_db::EnvVars>,
93 session: Arc<Session>,
95 queries: Arc<PreparedQueries>,
97 purge_queries: Arc<purge::PreparedQueries>,
99}
100
101static INIT_SESSION_ERROR: OnceLock<Arc<CassandraSessionError>> = OnceLock::new();
103
104static PERSISTENT_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
106
107static VOLATILE_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
109
110impl CassandraSession {
111 pub(crate) fn init() {
113 let (persistent, volatile) = Settings::cassandra_db_cfg();
114
115 let _join_handle =
116 tokio::task::spawn(async move { Box::pin(retry_init(persistent, true)).await });
117 let _join_handle =
118 tokio::task::spawn(async move { Box::pin(retry_init(volatile, false)).await });
119 }
120
121 pub(crate) fn is_ready() -> bool {
123 PERSISTENT_SESSION.get().is_some() && VOLATILE_SESSION.get().is_some()
124 }
125
126 pub(crate) async fn wait_until_ready(
128 interval: Duration, ignore_err: bool,
129 ) -> Result<(), Arc<CassandraSessionError>> {
130 loop {
131 if !ignore_err {
132 if let Some(err) = INIT_SESSION_ERROR.get() {
133 return Err(err.clone());
134 }
135 }
136
137 if Self::is_ready() {
138 return Ok(());
139 }
140
141 tokio::time::sleep(interval).await;
142 }
143 }
144
145 pub(crate) fn get(persistent: bool) -> Option<Arc<CassandraSession>> {
147 if persistent {
148 PERSISTENT_SESSION.get().cloned()
149 } else {
150 VOLATILE_SESSION.get().cloned()
151 }
152 }
153
154 pub(crate) async fn execute_iter<P>(
159 &self, select_query: PreparedSelectQuery, params: P,
160 ) -> anyhow::Result<QueryPager>
161 where P: SerializeRow {
162 let session = self.session.clone();
163 let queries = self.queries.clone();
164
165 queries.execute_iter(session, select_query, params).await
166 }
167
168 pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
176 &self, query: PreparedQuery, values: Vec<T>,
177 ) -> FallibleQueryResults {
178 let session = self.session.clone();
179 let cfg = self.cfg.clone();
180 let queries = self.queries.clone();
181
182 queries.execute_batch(session, cfg, query, values).await
183 }
184
185 pub(crate) async fn execute_upsert<T: SerializeRow + Debug>(
188 &self, query: PreparedUpsertQuery, value: T,
189 ) -> anyhow::Result<()> {
190 let session = self.session.clone();
191 let queries = self.queries.clone();
192
193 queries.execute_upsert(session, query, value).await
194 }
195
196 pub(crate) async fn purge_execute_batch<T: SerializeRow + Debug>(
206 &self, query: PreparedDeleteQuery, values: Vec<T>,
207 ) -> FallibleQueryResults {
208 let persistent = false;
210 let Some(volatile_db) = Self::get(persistent) else {
211 anyhow::bail!("Volatile DB Session not found");
213 };
214 let cfg = self.cfg.clone();
215 let queries = self.purge_queries.clone();
216 let session = volatile_db.session.clone();
217
218 queries.execute_batch(session, cfg, query, values).await
219 }
220
221 pub(crate) async fn purge_execute_iter(
223 &self, query: purge::PreparedSelectQuery,
224 ) -> anyhow::Result<QueryPager> {
225 let persistent = false;
227 let Some(volatile_db) = Self::get(persistent) else {
228 anyhow::bail!("Volatile DB Session not found");
230 };
231 let queries = self.purge_queries.clone();
232
233 queries
234 .execute_iter(volatile_db.session.clone(), query)
235 .await
236 }
237
238 pub(crate) fn get_raw_session(&self) -> Arc<Session> {
240 self.session.clone()
241 }
242}
243
244fn make_execution_profile(_cfg: &cassandra_db::EnvVars) -> ExecutionProfile {
249 ExecutionProfile::builder()
250 .consistency(scylla::statement::Consistency::LocalQuorum)
251 .serial_consistency(Some(scylla::statement::SerialConsistency::LocalSerial))
252 .retry_policy(Arc::new(scylla::retry_policy::DefaultRetryPolicy::new()))
253 .load_balancing_policy(
254 scylla::load_balancing::DefaultPolicyBuilder::new()
255 .permit_dc_failover(true)
256 .build(),
257 )
258 .speculative_execution_policy(Some(Arc::new(
259 scylla::speculative_execution::SimpleSpeculativeExecutionPolicy {
260 max_retry_count: 3,
261 retry_interval: Duration::from_millis(100),
262 },
263 )))
264 .build()
265}
266
267async fn make_session(cfg: &cassandra_db::EnvVars) -> anyhow::Result<Arc<Session>> {
269 let cluster_urls: Vec<&str> = cfg.url.as_str().split(',').collect();
270
271 let mut sb = SessionBuilder::new()
272 .known_nodes(cluster_urls)
273 .auto_await_schema_agreement(false);
274
275 let profile_handle = make_execution_profile(cfg).into_handle();
276 sb = sb.default_execution_profile_handle(profile_handle);
277
278 sb = match cfg.compression {
279 CompressionChoice::Lz4 => sb.compression(Some(Compression::Lz4)),
280 CompressionChoice::Snappy => sb.compression(Some(Compression::Snappy)),
281 CompressionChoice::None => sb.compression(None),
282 };
283
284 if cfg.tls != TlsChoice::Disabled {
285 let mut context_builder = SslContextBuilder::new(SslMethod::tls())?;
286
287 if let Some(cert_name) = &cfg.tls_cert {
288 let certdir = fs::canonicalize(PathBuf::from(cert_name.as_str())).await?;
289 context_builder.set_certificate_file(certdir.as_path(), SslFiletype::PEM)?;
290 }
291
292 if cfg.tls == TlsChoice::Verified {
293 context_builder.set_verify(SslVerifyMode::PEER);
294 } else {
295 context_builder.set_verify(SslVerifyMode::NONE);
296 }
297
298 let ssl_context = context_builder.build();
299
300 sb = sb.ssl_context(Some(ssl_context));
301 }
302
303 if let Some(username) = &cfg.username {
305 if let Some(password) = &cfg.password {
306 sb = sb.user(username.as_str(), password.as_str());
307 }
308 }
309
310 let session = Box::pin(sb.build()).await?;
311
312 Ok(Arc::new(session))
313}
314
315async fn retry_init(cfg: cassandra_db::EnvVars, persistent: bool) {
319 let mut retry_delay = Duration::from_secs(0);
320 let db_type = if persistent { "Persistent" } else { "Volatile" };
321
322 info!(db_type = db_type, "Index DB Session Creation: Started.");
323
324 cfg.log(persistent);
325
326 loop {
327 tokio::time::sleep(retry_delay).await;
328 retry_delay = Duration::from_secs(30); info!(
331 db_type = db_type,
332 "Attempting to connect to Cassandra DB..."
333 );
334
335 let session = match make_session(&cfg).await {
337 Ok(session) => session,
338 Err(error) => {
339 error!(
340 db_type = db_type,
341 error = format!("{error:?}"),
342 "Failed to Create Cassandra DB Session"
343 );
344 drop(INIT_SESSION_ERROR.set(Arc::new(
345 CassandraSessionError::CreatingSessionFailed { source: error },
346 )));
347 continue;
348 },
349 };
350
351 if let Err(error) = create_schema(&mut session.clone(), &cfg).await {
353 error!(
354 db_type = db_type,
355 error = format!("{error:?}"),
356 "Failed to Create Cassandra DB Schema"
357 );
358 drop(
359 INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SchemaMigrationFailed {
360 source: error,
361 })),
362 );
363 continue;
364 }
365
366 let queries = match PreparedQueries::new(session.clone(), &cfg).await {
367 Ok(queries) => Arc::new(queries),
368 Err(error) => {
369 error!(
370 db_type = db_type,
371 error = %error,
372 "Failed to Create Cassandra Prepared Queries"
373 );
374 drop(INIT_SESSION_ERROR.set(Arc::new(
375 CassandraSessionError::PreparingQueriesFailed { source: error },
376 )));
377 continue;
378 },
379 };
380
381 let purge_queries = match Box::pin(purge::PreparedQueries::new(session.clone(), &cfg)).await
382 {
383 Ok(queries) => Arc::new(queries),
384 Err(error) => {
385 error!(
386 db_type = db_type,
387 error = %error,
388 "Failed to Create Cassandra Prepared Purge Queries"
389 );
390 drop(INIT_SESSION_ERROR.set(Arc::new(
391 CassandraSessionError::PreparingPurgeQueriesFailed { source: error },
392 )));
393 continue;
394 },
395 };
396
397 let cassandra_session = CassandraSession {
398 persistent,
399 cfg: Arc::new(cfg),
400 session,
401 queries,
402 purge_queries,
403 };
404
405 if persistent {
407 if PERSISTENT_SESSION.set(Arc::new(cassandra_session)).is_err() {
408 error!("Persistent Session already set. This should not happen.");
409 drop(INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SessionAlreadySet)));
410 };
411 } else if VOLATILE_SESSION.set(Arc::new(cassandra_session)).is_err() {
412 error!("Volatile Session already set. This should not happen.");
413 drop(INIT_SESSION_ERROR.set(Arc::new(CassandraSessionError::SessionAlreadySet)));
414 };
415
416 break;
418 }
419
420 info!(db_type = db_type, "Index DB Session Creation: OK.");
421}