cat_gateway/db/index/schema/
mod.rs

1//! Index Schema
2
3use std::sync::Arc;
4
5use anyhow::Context;
6use handlebars::Handlebars;
7use scylla::Session;
8use serde_json::json;
9use tracing::error;
10
11use crate::{settings::cassandra_db, utils::blake2b_hash::generate_uuid_string_from_data};
12
13/// Keyspace Create (Templated)
14const CREATE_NAMESPACE_CQL: &str = include_str!("./cql/namespace.cql");
15
16/// All Schema Creation Statements
17const SCHEMAS: &[(&str, &str)] = &[
18    (
19        // Sync Status Table Schema
20        include_str!("./cql/sync_status.cql"),
21        "Create Sync Status Table",
22    ),
23    (
24        // TXO by Stake Address Table Schema
25        include_str!("./cql/txo_by_stake_table.cql"),
26        "Create Table TXO By Stake Address",
27    ),
28    (
29        // TXO Assets by Stake Address Table Schema
30        include_str!("./cql/txo_assets_by_stake_table.cql"),
31        "Create Table TXO Assets By Stake Address",
32    ),
33    (
34        // TXO Unstaked Table Schema
35        include_str!("cql/unstaked_txo_by_txn_id.cql"),
36        "Create Table Unstaked TXO By Txn Hash",
37    ),
38    (
39        // TXO Unstaked Assets Table Schema
40        include_str!("cql/unstaked_txo_assets_by_txn_id.cql"),
41        "Create Table Unstaked TXO Assets By Txn Hash",
42    ),
43    (
44        // TXI by Stake Address table schema.
45        include_str!("cql/txi_by_txn_id_table.cql"),
46        "Create Table TXI By Stake Address",
47    ),
48    (
49        // Stake Address/Registration Table Schema
50        include_str!("cql/stake_registration.cql"),
51        "Create Table Stake Registration",
52    ),
53    (
54        // CIP-36 Registration Table Schema
55        include_str!("cql/cip36_registration.cql"),
56        "Create Table CIP-36 Registration",
57    ),
58    (
59        // CIP-36 invalid registration table schema.
60        include_str!("cql/cip36_registration_invalid.cql"),
61        "Create Table CIP-36 Registration Invalid",
62    ),
63    (
64        // CIP-36 registration for vote key table schema.
65        include_str!("cql/cip36_registration_for_vote_key.cql"),
66        "Create Table CIP-36 Registration For a stake address",
67    ),
68    (
69        // RBAC registration table schema.
70        include_str!("cql/rbac_registration.cql"),
71        "Create Table RBAC Registration",
72    ),
73    (
74        // RBAC invalid registration table schema.
75        include_str!("cql/rbac_invalid_registration.cql"),
76        "Create Table Invalid RBAC Registration",
77    ),
78    (
79        // Catalyst ID for transaction ID table schema.
80        include_str!("cql/catalyst_id_for_txn_id.cql"),
81        "Create Table Catalyst ID For TX ID",
82    ),
83    (
84        // Catalyst ID for stake address table schema.
85        include_str!("cql/catalyst_id_for_stake_address.cql"),
86        "Create Table Catalyst ID For Stake Address",
87    ),
88    (
89        // Secondary index for RBAC 509 registrations table.
90        include_str!("cql/rbac_registration_cat_id_by_txn_id_index.cql"),
91        "Create secondary index Catalyst ID for transaction ID on rbac_registration",
92    ),
93];
94
95/// Removes all comments from each line in the input query text and joins the remaining
96/// lines into a single string, reducing consecutive whitespace characters to a single
97/// space. Comments are defined as any text following `--` on a line.
98///
99/// # Arguments
100///
101/// * `text`: A string slice that holds the query to be cleaned.
102///
103/// # Returns
104///
105/// A new string with comments removed and whitespace reduced, where each remaining line
106/// from the original text is separated by a newline character.
107fn remove_comments_and_join_query_lines(text: &str) -> String {
108    // Split the input text into lines, removing any trailing empty lines
109    let raw_lines: Vec<&str> = text.lines().collect();
110    let mut clean_lines: Vec<String> = Vec::new();
111
112    // Filter out comments from each line
113    for line in raw_lines {
114        let mut clean_line = line.to_string();
115        if let Some(no_comment) = line.split_once("--") {
116            clean_line = no_comment.0.to_string();
117        }
118        clean_line = clean_line
119            .split_whitespace()
120            .collect::<Vec<&str>>()
121            .join(" ")
122            .trim()
123            .to_string();
124        if !clean_line.is_empty() {
125            clean_lines.push(clean_line);
126        }
127    }
128    clean_lines.join("\n")
129}
130
131/// Generates a unique schema version identifier based on the content of all CQL schemas.
132///
133/// This function processes each CQL schema, removes comments from its lines and joins
134/// them into a single string. It then sorts these processed strings to ensure consistency
135/// in schema versions regardless of their order in the list. Finally, it generates a UUID
136/// from a 127 bit hash of this sorted collection of schema contents, which serves as a
137/// unique identifier for the current version of all schemas.
138///
139/// # Returns
140///
141/// A string representing the UUID derived from the concatenated and cleaned CQL
142/// schema contents.
143fn generate_cql_schema_version() -> String {
144    // Where we will actually store the bytes we derive the UUID from.
145    let mut clean_schemas: Vec<String> = Vec::new();
146
147    // Iterate through each CQL schema and add it to the list of clean schemas documents.
148    for (schema, _) in SCHEMAS {
149        let schema = remove_comments_and_join_query_lines(schema);
150        if !schema.is_empty() {
151            clean_schemas.push(schema);
152        }
153    }
154
155    // make sure any re-ordering of the schemas in the list does not effect the generated
156    // schema version
157    clean_schemas.sort();
158
159    // Generate a unique hash of the clean schemas,
160    // and use it to form a UUID to identify the schema version.
161    generate_uuid_string_from_data("Catalyst-Gateway Index Database Schema", &clean_schemas)
162}
163
164/// Get the namespace for a particular db configuration
165pub(crate) fn namespace(cfg: &cassandra_db::EnvVars) -> String {
166    // Build and set the Keyspace to use.
167    format!(
168        "{}_{}",
169        cfg.namespace.as_str(),
170        generate_cql_schema_version().replace('-', "_")
171    )
172}
173
174/// Create the namespace we will use for this session
175/// Ok to run this if the namespace already exists.
176async fn create_namespace(
177    session: &mut Arc<Session>, cfg: &cassandra_db::EnvVars,
178) -> anyhow::Result<()> {
179    let keyspace = namespace(cfg);
180
181    let mut reg = Handlebars::new();
182    // disable default `html_escape` function
183    // which transforms `<`, `>` symbols to `&lt`, `&gt`
184    reg.register_escape_fn(|s| s.into());
185    let query = reg
186        .render_template(
187            CREATE_NAMESPACE_CQL,
188            &json!({"keyspace": keyspace,"options": cfg.deployment.clone().to_string()}),
189        )
190        .context(format!("Keyspace: {keyspace}"))?;
191
192    // Create the Keyspace if it doesn't exist already.
193    let stmt = session
194        .prepare(query)
195        .await
196        .context(format!("Keyspace: {keyspace}"))?;
197    session
198        .execute_unpaged(&stmt, ())
199        .await
200        .context(format!("Keyspace: {keyspace}"))?;
201
202    // Wait for the Schema to be ready.
203    session.await_schema_agreement().await?;
204
205    // Set the Keyspace to use for this session.
206    if let Err(error) = session.use_keyspace(keyspace.clone(), false).await {
207        error!(keyspace = keyspace, error = %error, "Failed to set keyspace");
208    }
209
210    Ok(())
211}
212
213/// Create the Schema on the connected Cassandra DB
214pub(crate) async fn create_schema(
215    session: &mut Arc<Session>, cfg: &cassandra_db::EnvVars,
216) -> anyhow::Result<()> {
217    create_namespace(session, cfg)
218        .await
219        .context("Creating Namespace")?;
220
221    let mut errors = Vec::with_capacity(SCHEMAS.len());
222
223    for (schema, schema_name) in SCHEMAS {
224        match session.prepare(*schema).await {
225            Ok(stmt) => {
226                if let Err(err) = session.execute_unpaged(&stmt, ()).await {
227                    error!(schema=schema_name, error=%err, "Failed to Execute Create Schema Query");
228                    errors.push(anyhow::anyhow!(
229                        "Failed to Execute Create Schema Query: {err}\n--\nSchema: {schema_name}\n--\n{schema}"
230                    ));
231                };
232            },
233            Err(err) => {
234                error!(schema=schema_name, error=%err, "Failed to Prepare Create Schema Query");
235                errors.push(anyhow::anyhow!(
236                    "Failed to Prepare Create Schema Query: {err}\n--\nSchema: {schema_name}\n--\n{schema}"
237                ));
238            },
239        }
240    }
241
242    if !errors.is_empty() {
243        let fmt_err: Vec<_> = errors.into_iter().map(|err| format!("{err}")).collect();
244        return Err(anyhow::anyhow!(format!(
245            "{} Error(s): {}",
246            fmt_err.len(),
247            fmt_err.join("\n")
248        )));
249    }
250
251    // Wait for the Schema to be ready.
252    session.await_schema_agreement().await?;
253
254    Ok(())
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260
261    /// The version of the Index DB Schema we SHOULD BE using.
262    /// DO NOT change this unless you are intentionally changing the Schema.
263    ///
264    /// This constant is ONLY used by Unit tests to identify when the schema version will
265    /// change accidentally, and is NOT to be used directly to set the schema version of
266    /// the table namespaces.
267    const SCHEMA_VERSION: &str = "772902fc-d5ec-871e-aaca-5b26c96a8cf6";
268
269    #[test]
270    /// This test is designed to fail if the schema version has changed.
271    /// It is used to help detect inadvertent schema version changes.
272    /// If you did NOT intend to change the index db schema and this test fails,
273    /// then revert or fix your changes to the schema files.
274    fn check_schema_version_has_not_changed() {
275        let calculated_version = generate_cql_schema_version();
276        assert_eq!(SCHEMA_VERSION, calculated_version);
277    }
278
279    #[test]
280    fn test_no_comments() {
281        let input = "SELECT * FROM table1;";
282        let expected_output = "SELECT * FROM table1;";
283        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
284    }
285
286    #[test]
287    fn test_single_line_comment() {
288        let input = "SELECT -- some comment * FROM table1;";
289        let expected_output = "SELECT";
290        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
291    }
292
293    #[test]
294    fn test_multi_line_comment() {
295        let input = "SELECT -- some comment\n* FROM table1;";
296        let expected_output = "SELECT\n* FROM table1;";
297        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
298    }
299
300    #[test]
301    fn test_multiple_lines() {
302        let input = "SELECT * FROM table1;\n-- another comment\nSELECT * FROM table2;";
303        let expected_output = "SELECT * FROM table1;\nSELECT * FROM table2;";
304        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
305    }
306
307    #[test]
308    fn test_empty_lines() {
309        let input = "\n\nSELECT * FROM table1;\n-- comment here\n\n";
310        let expected_output = "SELECT * FROM table1;";
311        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
312    }
313
314    #[test]
315    fn test_whitespace_only() {
316        let input = "   \n  -- comment here\n   ";
317        let expected_output = "";
318        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
319    }
320}