cat_gateway/db/index/schema/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
//! Index Schema

use std::sync::Arc;

use anyhow::Context;
use handlebars::Handlebars;
use scylla::Session;
use serde_json::json;
use tracing::error;

use crate::{settings::cassandra_db, utils::blake2b_hash::generate_uuid_string_from_data};

/// The version of the Index DB Schema we SHOULD BE using.
/// DO NOT change this unless you are intentionally changing the Schema.
///
/// This constant is ONLY used by Unit tests to identify when the schema version will
/// change accidentally, and is NOT to be used directly to set the schema version of the
/// table namespaces.
#[allow(dead_code)]
const SCHEMA_VERSION: &str = "75ae6ac9-ddd8-8472-8a7a-8676d04f8679";

/// Keyspace Create (Templated)
const CREATE_NAMESPACE_CQL: &str = include_str!("./cql/namespace.cql");

/// All Schema Creation Statements
const SCHEMAS: &[(&str, &str)] = &[
    (
        // Sync Status Table Schema
        include_str!("./cql/sync_status.cql"),
        "Create Sync Status Table",
    ),
    (
        // TXO by Stake Address Table Schema
        include_str!("./cql/txo_by_stake_table.cql"),
        "Create Table TXO By Stake Address",
    ),
    (
        // TXO Assets by Stake Address Table Schema
        include_str!("./cql/txo_assets_by_stake_table.cql"),
        "Create Table TXO Assets By Stake Address",
    ),
    (
        // TXO Unstaked Table Schema
        include_str!("./cql/unstaked_txo_by_txn_hash.cql"),
        "Create Table Unstaked TXO By Txn Hash",
    ),
    (
        // TXO Unstaked Assets Table Schema
        include_str!("./cql/unstaked_txo_assets_by_txn_hash.cql"),
        "Create Table Unstaked TXO Assets By Txn Hash",
    ),
    (
        // TXI by Stake Address Table Schema
        include_str!("./cql/txi_by_txn_hash_table.cql"),
        "Create Table TXI By Stake Address",
    ),
    (
        // Stake Address/Registration Table Schema
        include_str!("./cql/stake_registration.cql"),
        "Create Table Stake Registration",
    ),
    (
        // CIP-36 Registration Table Schema
        include_str!("./cql/cip36_registration.cql"),
        "Create Table CIP-36 Registration",
    ),
    (
        // CIP-36 Registration Table Schema
        include_str!("./cql/cip36_registration_invalid.cql"),
        "Create Table CIP-36 Registration Invalid",
    ),
    (
        // CIP-36 Registration Table Schema
        include_str!("./cql/cip36_registration_for_vote_key.cql"),
        "Create Table CIP-36 Registration For a stake address",
    ),
    (
        // RBAC 509 Registration Table Schema
        include_str!("./cql/rbac509_registration.cql"),
        "Create Table RBAC 509 Registration",
    ),
    (
        // RBAC 509. Chain Root For TX ID Registration Table Schema
        include_str!("./cql/chain_root_for_txn_id.cql"),
        "Create Table Chain Root For TX ID Registration",
    ),
    (
        // RBAC 509. Chain Root For Role 0 Key Registration Table Schema
        include_str!("./cql/chain_root_for_role0_key.cql"),
        "Create Table Chain Root For Role 0 Key Registration",
    ),
    (
        // RBAC 509. Chain Root For Stake Address Registration Table Schema
        include_str!("./cql/chain_root_for_stake_addr.cql"),
        "Create Table Chain Root For Stake Address Registration",
    ),
];

/// Removes all comments from each line in the input query text and joins the remaining
/// lines into a single string, reducing consecutive whitespace characters to a single
/// space. Comments are defined as any text following `--` on a line.
///
/// # Arguments
///
/// * `text`: A string slice that holds the query to be cleaned.
///
/// # Returns
///
/// A new string with comments removed and whitespace reduced, where each remaining line
/// from the original text is separated by a newline character.
fn remove_comments_and_join_query_lines(text: &str) -> String {
    // Split the input text into lines, removing any trailing empty lines
    let raw_lines: Vec<&str> = text.lines().collect();
    let mut clean_lines: Vec<String> = Vec::new();

    // Filter out comments from each line
    for line in raw_lines {
        let mut clean_line = line.to_string();
        if let Some(no_comment) = line.split_once("--") {
            clean_line = no_comment.0.to_string();
        }
        clean_line = clean_line
            .split_whitespace()
            .collect::<Vec<&str>>()
            .join(" ")
            .trim()
            .to_string();
        if !clean_line.is_empty() {
            clean_lines.push(clean_line);
        }
    }
    clean_lines.join("\n")
}

/// Generates a unique schema version identifier based on the content of all CQL schemas.
///
/// This function processes each CQL schema, removes comments from its lines and joins
/// them into a single string. It then sorts these processed strings to ensure consistency
/// in schema versions regardless of their order in the list. Finally, it generates a UUID
/// from a 127 bit hash of this sorted collection of schema contents, which serves as a
/// unique identifier for the current version of all schemas.
///
/// # Returns
///
/// A string representing the UUID derived from the concatenated and cleaned CQL
/// schema contents.
fn generate_cql_schema_version() -> String {
    // Where we will actually store the bytes we derive the UUID from.
    let mut clean_schemas: Vec<String> = Vec::new();

    // Iterate through each CQL schema and add it to the list of clean schemas documents.
    for (schema, _) in SCHEMAS {
        let schema = remove_comments_and_join_query_lines(schema);
        if !schema.is_empty() {
            clean_schemas.push(schema);
        }
    }

    // make sure any re-ordering of the schemas in the list does not effect the generated
    // schema version
    clean_schemas.sort();

    // Generate a unique hash of the clean schemas,
    // and use it to form a UUID to identify the schema version.
    generate_uuid_string_from_data("Catalyst-Gateway Index Database Schema", &clean_schemas)
}

/// Get the namespace for a particular db configuration
pub(crate) fn namespace(cfg: &cassandra_db::EnvVars) -> String {
    // Build and set the Keyspace to use.
    format!(
        "{}_{}",
        cfg.namespace.as_str(),
        generate_cql_schema_version().replace('-', "_")
    )
}

/// Create the namespace we will use for this session
/// Ok to run this if the namespace already exists.
async fn create_namespace(
    session: &mut Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<()> {
    let keyspace = namespace(cfg);

    let mut reg = Handlebars::new();
    // disable default `html_escape` function
    // which transforms `<`, `>` symbols to `&lt`, `&gt`
    reg.register_escape_fn(|s| s.into());
    let query = reg
        .render_template(CREATE_NAMESPACE_CQL, &json!({"keyspace": keyspace}))
        .context(format!("Keyspace: {keyspace}"))?;

    // Create the Keyspace if it doesn't exist already.
    let stmt = session
        .prepare(query)
        .await
        .context(format!("Keyspace: {keyspace}"))?;
    session
        .execute_unpaged(&stmt, ())
        .await
        .context(format!("Keyspace: {keyspace}"))?;

    // Wait for the Schema to be ready.
    session.await_schema_agreement().await?;

    // Set the Keyspace to use for this session.
    if let Err(error) = session.use_keyspace(keyspace.clone(), false).await {
        error!(keyspace = keyspace, error = %error, "Failed to set keyspace");
    }

    Ok(())
}

/// Create the Schema on the connected Cassandra DB
pub(crate) async fn create_schema(
    session: &mut Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<()> {
    create_namespace(session, cfg)
        .await
        .context("Creating Namespace")?;

    let mut failed = false;

    for (schema, schema_name) in SCHEMAS {
        match session.prepare(*schema).await {
            Ok(stmt) => {
                if let Err(err) = session.execute_unpaged(&stmt, ()).await {
                    failed = true;
                    error!(schema=schema_name, error=%err, "Failed to Execute Create Schema Query");
                };
            },
            Err(err) => {
                failed = true;
                error!(schema=schema_name, error=%err, "Failed to Prepare Create Schema Query");
            },
        }
    }

    anyhow::ensure!(!failed, "Failed to Create Schema");

    // Wait for the Schema to be ready.
    session.await_schema_agreement().await?;

    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    /// This test is designed to fail if the schema version has changed.
    /// It is used to help detect inadvertent schema version changes.
    /// If you did NOT intend to change the index db schema and this test fails,
    /// then revert or fix your changes to the schema files.
    fn check_schema_version_has_not_changed() {
        let calculated_version = generate_cql_schema_version();
        assert_eq!(SCHEMA_VERSION, calculated_version);
    }

    #[test]
    fn test_no_comments() {
        let input = "SELECT * FROM table1;";
        let expected_output = "SELECT * FROM table1;";
        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
    }

    #[test]
    fn test_single_line_comment() {
        let input = "SELECT -- some comment * FROM table1;";
        let expected_output = "SELECT";
        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
    }

    #[test]
    fn test_multi_line_comment() {
        let input = "SELECT -- some comment\n* FROM table1;";
        let expected_output = "SELECT\n* FROM table1;";
        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
    }

    #[test]
    fn test_multiple_lines() {
        let input = "SELECT * FROM table1;\n-- another comment\nSELECT * FROM table2;";
        let expected_output = "SELECT * FROM table1;\nSELECT * FROM table2;";
        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
    }

    #[test]
    fn test_empty_lines() {
        let input = "\n\nSELECT * FROM table1;\n-- comment here\n\n";
        let expected_output = "SELECT * FROM table1;";
        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
    }

    #[test]
    fn test_whitespace_only() {
        let input = "   \n  -- comment here\n   ";
        let expected_output = "";
        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
    }
}