cat_gateway/db/index/schema/
mod.rs1use std::sync::Arc;
4
5use anyhow::Context;
6use handlebars::Handlebars;
7use scylla::Session;
8use serde_json::json;
9use tracing::error;
10
11use crate::{settings::cassandra_db, utils::blake2b_hash::generate_uuid_string_from_data};
12
13const CREATE_NAMESPACE_CQL: &str = include_str!("./cql/namespace.cql");
15
16const SCHEMAS: &[(&str, &str)] = &[
18 (
19 include_str!("./cql/sync_status.cql"),
21 "Create Sync Status Table",
22 ),
23 (
24 include_str!("./cql/txo_by_stake_table.cql"),
26 "Create Table TXO By Stake Address",
27 ),
28 (
29 include_str!("./cql/txo_assets_by_stake_table.cql"),
31 "Create Table TXO Assets By Stake Address",
32 ),
33 (
34 include_str!("cql/unstaked_txo_by_txn_id.cql"),
36 "Create Table Unstaked TXO By Txn Hash",
37 ),
38 (
39 include_str!("cql/unstaked_txo_assets_by_txn_id.cql"),
41 "Create Table Unstaked TXO Assets By Txn Hash",
42 ),
43 (
44 include_str!("cql/txi_by_txn_id_table.cql"),
46 "Create Table TXI By Stake Address",
47 ),
48 (
49 include_str!("cql/stake_registration.cql"),
51 "Create Table Stake Registration",
52 ),
53 (
54 include_str!("cql/cip36_registration.cql"),
56 "Create Table CIP-36 Registration",
57 ),
58 (
59 include_str!("cql/cip36_registration_invalid.cql"),
61 "Create Table CIP-36 Registration Invalid",
62 ),
63 (
64 include_str!("cql/cip36_registration_for_vote_key.cql"),
66 "Create Table CIP-36 Registration For a stake address",
67 ),
68 (
69 include_str!("cql/rbac_registration.cql"),
71 "Create Table RBAC Registration",
72 ),
73 (
74 include_str!("cql/rbac_invalid_registration.cql"),
76 "Create Table Invalid RBAC Registration",
77 ),
78 (
79 include_str!("cql/catalyst_id_for_txn_id.cql"),
81 "Create Table Catalyst ID For TX ID",
82 ),
83 (
84 include_str!("cql/catalyst_id_for_stake_address.cql"),
86 "Create Table Catalyst ID For Stake Address",
87 ),
88 (
89 include_str!("cql/rbac_registration_cat_id_by_txn_id_index.cql"),
91 "Create secondary index Catalyst ID for transaction ID on rbac_registration",
92 ),
93];
94
95fn remove_comments_and_join_query_lines(text: &str) -> String {
108 let raw_lines: Vec<&str> = text.lines().collect();
110 let mut clean_lines: Vec<String> = Vec::new();
111
112 for line in raw_lines {
114 let mut clean_line = line.to_string();
115 if let Some(no_comment) = line.split_once("--") {
116 clean_line = no_comment.0.to_string();
117 }
118 clean_line = clean_line
119 .split_whitespace()
120 .collect::<Vec<&str>>()
121 .join(" ")
122 .trim()
123 .to_string();
124 if !clean_line.is_empty() {
125 clean_lines.push(clean_line);
126 }
127 }
128 clean_lines.join("\n")
129}
130
131fn generate_cql_schema_version() -> String {
144 let mut clean_schemas: Vec<String> = Vec::new();
146
147 for (schema, _) in SCHEMAS {
149 let schema = remove_comments_and_join_query_lines(schema);
150 if !schema.is_empty() {
151 clean_schemas.push(schema);
152 }
153 }
154
155 clean_schemas.sort();
158
159 generate_uuid_string_from_data("Catalyst-Gateway Index Database Schema", &clean_schemas)
162}
163
164pub(crate) fn namespace(cfg: &cassandra_db::EnvVars) -> String {
166 format!(
168 "{}_{}",
169 cfg.namespace.as_str(),
170 generate_cql_schema_version().replace('-', "_")
171 )
172}
173
174async fn create_namespace(
177 session: &mut Arc<Session>, cfg: &cassandra_db::EnvVars,
178) -> anyhow::Result<()> {
179 let keyspace = namespace(cfg);
180
181 let mut reg = Handlebars::new();
182 reg.register_escape_fn(|s| s.into());
185 let query = reg
186 .render_template(
187 CREATE_NAMESPACE_CQL,
188 &json!({"keyspace": keyspace,"options": cfg.deployment.clone().to_string()}),
189 )
190 .context(format!("Keyspace: {keyspace}"))?;
191
192 let stmt = session
194 .prepare(query)
195 .await
196 .context(format!("Keyspace: {keyspace}"))?;
197 session
198 .execute_unpaged(&stmt, ())
199 .await
200 .context(format!("Keyspace: {keyspace}"))?;
201
202 session.await_schema_agreement().await?;
204
205 if let Err(error) = session.use_keyspace(keyspace.clone(), false).await {
207 error!(keyspace = keyspace, error = %error, "Failed to set keyspace");
208 }
209
210 Ok(())
211}
212
213pub(crate) async fn create_schema(
215 session: &mut Arc<Session>, cfg: &cassandra_db::EnvVars,
216) -> anyhow::Result<()> {
217 create_namespace(session, cfg)
218 .await
219 .context("Creating Namespace")?;
220
221 let mut errors = Vec::with_capacity(SCHEMAS.len());
222
223 for (schema, schema_name) in SCHEMAS {
224 match session.prepare(*schema).await {
225 Ok(stmt) => {
226 if let Err(err) = session.execute_unpaged(&stmt, ()).await {
227 error!(schema=schema_name, error=%err, "Failed to Execute Create Schema Query");
228 errors.push(anyhow::anyhow!(
229 "Failed to Execute Create Schema Query: {err}\n--\nSchema: {schema_name}\n--\n{schema}"
230 ));
231 };
232 },
233 Err(err) => {
234 error!(schema=schema_name, error=%err, "Failed to Prepare Create Schema Query");
235 errors.push(anyhow::anyhow!(
236 "Failed to Prepare Create Schema Query: {err}\n--\nSchema: {schema_name}\n--\n{schema}"
237 ));
238 },
239 }
240 }
241
242 if !errors.is_empty() {
243 let fmt_err: Vec<_> = errors.into_iter().map(|err| format!("{err}")).collect();
244 return Err(anyhow::anyhow!(format!(
245 "{} Error(s): {}",
246 fmt_err.len(),
247 fmt_err.join("\n")
248 )));
249 }
250
251 session.await_schema_agreement().await?;
253
254 Ok(())
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260
261 const SCHEMA_VERSION: &str = "772902fc-d5ec-871e-aaca-5b26c96a8cf6";
268
269 #[test]
270 fn check_schema_version_has_not_changed() {
275 let calculated_version = generate_cql_schema_version();
276 assert_eq!(SCHEMA_VERSION, calculated_version);
277 }
278
279 #[test]
280 fn test_no_comments() {
281 let input = "SELECT * FROM table1;";
282 let expected_output = "SELECT * FROM table1;";
283 assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
284 }
285
286 #[test]
287 fn test_single_line_comment() {
288 let input = "SELECT -- some comment * FROM table1;";
289 let expected_output = "SELECT";
290 assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
291 }
292
293 #[test]
294 fn test_multi_line_comment() {
295 let input = "SELECT -- some comment\n* FROM table1;";
296 let expected_output = "SELECT\n* FROM table1;";
297 assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
298 }
299
300 #[test]
301 fn test_multiple_lines() {
302 let input = "SELECT * FROM table1;\n-- another comment\nSELECT * FROM table2;";
303 let expected_output = "SELECT * FROM table1;\nSELECT * FROM table2;";
304 assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
305 }
306
307 #[test]
308 fn test_empty_lines() {
309 let input = "\n\nSELECT * FROM table1;\n-- comment here\n\n";
310 let expected_output = "SELECT * FROM table1;";
311 assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
312 }
313
314 #[test]
315 fn test_whitespace_only() {
316 let input = " \n -- comment here\n ";
317 let expected_output = "";
318 assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
319 }
320}