cat_gateway/db/index/schema/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
//! Index Schema
use std::sync::Arc;
use anyhow::Context;
use handlebars::Handlebars;
use scylla::Session;
use serde_json::json;
use tracing::error;
use crate::{settings::cassandra_db, utils::blake2b_hash::generate_uuid_string_from_data};
/// The version of the Index DB Schema we SHOULD BE using.
/// DO NOT change this unless you are intentionally changing the Schema.
///
/// This constant is ONLY used by Unit tests to identify when the schema version will
/// change accidentally, and is NOT to be used directly to set the schema version of the
/// table namespaces.
#[allow(dead_code)]
const SCHEMA_VERSION: &str = "75ae6ac9-ddd8-8472-8a7a-8676d04f8679";
/// Keyspace Create (Templated)
const CREATE_NAMESPACE_CQL: &str = include_str!("./cql/namespace.cql");
/// All Schema Creation Statements
const SCHEMAS: &[(&str, &str)] = &[
(
// Sync Status Table Schema
include_str!("./cql/sync_status.cql"),
"Create Sync Status Table",
),
(
// TXO by Stake Address Table Schema
include_str!("./cql/txo_by_stake_table.cql"),
"Create Table TXO By Stake Address",
),
(
// TXO Assets by Stake Address Table Schema
include_str!("./cql/txo_assets_by_stake_table.cql"),
"Create Table TXO Assets By Stake Address",
),
(
// TXO Unstaked Table Schema
include_str!("./cql/unstaked_txo_by_txn_hash.cql"),
"Create Table Unstaked TXO By Txn Hash",
),
(
// TXO Unstaked Assets Table Schema
include_str!("./cql/unstaked_txo_assets_by_txn_hash.cql"),
"Create Table Unstaked TXO Assets By Txn Hash",
),
(
// TXI by Stake Address Table Schema
include_str!("./cql/txi_by_txn_hash_table.cql"),
"Create Table TXI By Stake Address",
),
(
// Stake Address/Registration Table Schema
include_str!("./cql/stake_registration.cql"),
"Create Table Stake Registration",
),
(
// CIP-36 Registration Table Schema
include_str!("./cql/cip36_registration.cql"),
"Create Table CIP-36 Registration",
),
(
// CIP-36 Registration Table Schema
include_str!("./cql/cip36_registration_invalid.cql"),
"Create Table CIP-36 Registration Invalid",
),
(
// CIP-36 Registration Table Schema
include_str!("./cql/cip36_registration_for_vote_key.cql"),
"Create Table CIP-36 Registration For a stake address",
),
(
// RBAC 509 Registration Table Schema
include_str!("./cql/rbac509_registration.cql"),
"Create Table RBAC 509 Registration",
),
(
// RBAC 509. Chain Root For TX ID Registration Table Schema
include_str!("./cql/chain_root_for_txn_id.cql"),
"Create Table Chain Root For TX ID Registration",
),
(
// RBAC 509. Chain Root For Role 0 Key Registration Table Schema
include_str!("./cql/chain_root_for_role0_key.cql"),
"Create Table Chain Root For Role 0 Key Registration",
),
(
// RBAC 509. Chain Root For Stake Address Registration Table Schema
include_str!("./cql/chain_root_for_stake_addr.cql"),
"Create Table Chain Root For Stake Address Registration",
),
];
/// Removes all comments from each line in the input query text and joins the remaining
/// lines into a single string, reducing consecutive whitespace characters to a single
/// space. Comments are defined as any text following `--` on a line.
///
/// # Arguments
///
/// * `text`: A string slice that holds the query to be cleaned.
///
/// # Returns
///
/// A new string with comments removed and whitespace reduced, where each remaining line
/// from the original text is separated by a newline character.
fn remove_comments_and_join_query_lines(text: &str) -> String {
// Split the input text into lines, removing any trailing empty lines
let raw_lines: Vec<&str> = text.lines().collect();
let mut clean_lines: Vec<String> = Vec::new();
// Filter out comments from each line
for line in raw_lines {
let mut clean_line = line.to_string();
if let Some(no_comment) = line.split_once("--") {
clean_line = no_comment.0.to_string();
}
clean_line = clean_line
.split_whitespace()
.collect::<Vec<&str>>()
.join(" ")
.trim()
.to_string();
if !clean_line.is_empty() {
clean_lines.push(clean_line);
}
}
clean_lines.join("\n")
}
/// Generates a unique schema version identifier based on the content of all CQL schemas.
///
/// This function processes each CQL schema, removes comments from its lines and joins
/// them into a single string. It then sorts these processed strings to ensure consistency
/// in schema versions regardless of their order in the list. Finally, it generates a UUID
/// from a 127 bit hash of this sorted collection of schema contents, which serves as a
/// unique identifier for the current version of all schemas.
///
/// # Returns
///
/// A string representing the UUID derived from the concatenated and cleaned CQL
/// schema contents.
fn generate_cql_schema_version() -> String {
// Where we will actually store the bytes we derive the UUID from.
let mut clean_schemas: Vec<String> = Vec::new();
// Iterate through each CQL schema and add it to the list of clean schemas documents.
for (schema, _) in SCHEMAS {
let schema = remove_comments_and_join_query_lines(schema);
if !schema.is_empty() {
clean_schemas.push(schema);
}
}
// make sure any re-ordering of the schemas in the list does not effect the generated
// schema version
clean_schemas.sort();
// Generate a unique hash of the clean schemas,
// and use it to form a UUID to identify the schema version.
generate_uuid_string_from_data("Catalyst-Gateway Index Database Schema", &clean_schemas)
}
/// Get the namespace for a particular db configuration
pub(crate) fn namespace(cfg: &cassandra_db::EnvVars) -> String {
// Build and set the Keyspace to use.
format!(
"{}_{}",
cfg.namespace.as_str(),
generate_cql_schema_version().replace('-', "_")
)
}
/// Create the namespace we will use for this session
/// Ok to run this if the namespace already exists.
async fn create_namespace(
session: &mut Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<()> {
let keyspace = namespace(cfg);
let mut reg = Handlebars::new();
// disable default `html_escape` function
// which transforms `<`, `>` symbols to `<`, `>`
reg.register_escape_fn(|s| s.into());
let query = reg
.render_template(CREATE_NAMESPACE_CQL, &json!({"keyspace": keyspace}))
.context(format!("Keyspace: {keyspace}"))?;
// Create the Keyspace if it doesn't exist already.
let stmt = session
.prepare(query)
.await
.context(format!("Keyspace: {keyspace}"))?;
session
.execute_unpaged(&stmt, ())
.await
.context(format!("Keyspace: {keyspace}"))?;
// Wait for the Schema to be ready.
session.await_schema_agreement().await?;
// Set the Keyspace to use for this session.
if let Err(error) = session.use_keyspace(keyspace.clone(), false).await {
error!(keyspace = keyspace, error = %error, "Failed to set keyspace");
}
Ok(())
}
/// Create the Schema on the connected Cassandra DB
pub(crate) async fn create_schema(
session: &mut Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<()> {
create_namespace(session, cfg)
.await
.context("Creating Namespace")?;
let mut failed = false;
for (schema, schema_name) in SCHEMAS {
match session.prepare(*schema).await {
Ok(stmt) => {
if let Err(err) = session.execute_unpaged(&stmt, ()).await {
failed = true;
error!(schema=schema_name, error=%err, "Failed to Execute Create Schema Query");
};
},
Err(err) => {
failed = true;
error!(schema=schema_name, error=%err, "Failed to Prepare Create Schema Query");
},
}
}
anyhow::ensure!(!failed, "Failed to Create Schema");
// Wait for the Schema to be ready.
session.await_schema_agreement().await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
/// This test is designed to fail if the schema version has changed.
/// It is used to help detect inadvertent schema version changes.
/// If you did NOT intend to change the index db schema and this test fails,
/// then revert or fix your changes to the schema files.
fn check_schema_version_has_not_changed() {
let calculated_version = generate_cql_schema_version();
assert_eq!(SCHEMA_VERSION, calculated_version);
}
#[test]
fn test_no_comments() {
let input = "SELECT * FROM table1;";
let expected_output = "SELECT * FROM table1;";
assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
}
#[test]
fn test_single_line_comment() {
let input = "SELECT -- some comment * FROM table1;";
let expected_output = "SELECT";
assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
}
#[test]
fn test_multi_line_comment() {
let input = "SELECT -- some comment\n* FROM table1;";
let expected_output = "SELECT\n* FROM table1;";
assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
}
#[test]
fn test_multiple_lines() {
let input = "SELECT * FROM table1;\n-- another comment\nSELECT * FROM table2;";
let expected_output = "SELECT * FROM table1;\nSELECT * FROM table2;";
assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
}
#[test]
fn test_empty_lines() {
let input = "\n\nSELECT * FROM table1;\n-- comment here\n\n";
let expected_output = "SELECT * FROM table1;";
assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
}
#[test]
fn test_whitespace_only() {
let input = " \n -- comment here\n ";
let expected_output = "";
assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
}
}