1use std::io::Write;
18
19use anyhow::Result;
20
21use crate::session::CqlSession;
22
23pub async fn execute(session: &CqlSession, args: &str, writer: &mut dyn Write) -> Result<()> {
27 let args = args.trim();
28 let upper = args.to_uppercase();
29
30 if args.is_empty() {
32 writeln!(
33 writer,
34 "Usage: DESCRIBE [CLUSTER | KEYSPACES | KEYSPACE [name] | TABLES | TABLE <name> | SCHEMA | FULL SCHEMA | INDEX <name> | MATERIALIZED VIEW <name> | TYPES | TYPE <name> | FUNCTIONS | FUNCTION <name> | AGGREGATES | AGGREGATE <name>]"
35 )?;
36 return Ok(());
37 }
38
39 if upper == "CLUSTER" {
40 describe_cluster(session, writer).await
41 } else if upper == "KEYSPACES" {
42 describe_keyspaces(session, writer).await
43 } else if upper == "TABLES" {
44 describe_tables(session, writer).await
45 } else if upper == "FULL SCHEMA" {
46 describe_full_schema(session, writer).await
47 } else if upper == "SCHEMA" {
48 describe_schema(session, writer).await
49 } else if upper == "KEYSPACE" {
50 describe_keyspace(session, session.current_keyspace(), writer).await
52 } else if upper.starts_with("KEYSPACE ") {
53 let ks_name = args["KEYSPACE ".len()..].trim();
55 let ks_name = strip_quotes(ks_name);
56 describe_keyspace(session, Some(ks_name), writer).await
57 } else if upper == "TABLE" {
58 writeln!(writer, "DESCRIBE TABLE requires a table name.")?;
59 Ok(())
60 } else if upper.starts_with("TABLE ") {
61 let table_spec = args["TABLE ".len()..].trim();
63 let table_spec = strip_quotes(table_spec);
64 describe_table(session, table_spec, writer).await
65 } else if upper == "INDEX" {
66 writeln!(writer, "DESCRIBE INDEX requires an index name.")?;
67 Ok(())
68 } else if upper.starts_with("INDEX ") {
69 let index_spec = args["INDEX ".len()..].trim();
70 let index_spec = strip_quotes(index_spec);
71 describe_index(session, index_spec, writer).await
72 } else if upper == "MATERIALIZED VIEW" {
73 writeln!(writer, "DESCRIBE MATERIALIZED VIEW requires a view name.")?;
74 Ok(())
75 } else if upper.starts_with("MATERIALIZED VIEW ") {
76 let view_spec = args["MATERIALIZED VIEW ".len()..].trim();
77 let view_spec = strip_quotes(view_spec);
78 describe_materialized_view(session, view_spec, writer).await
79 } else if upper == "TYPES" {
80 describe_types(session, writer).await
81 } else if upper == "TYPE" {
82 writeln!(writer, "DESCRIBE TYPE requires a type name.")?;
83 Ok(())
84 } else if upper.starts_with("TYPE ") {
85 let type_spec = args["TYPE ".len()..].trim();
86 let type_spec = strip_quotes(type_spec);
87 describe_type(session, type_spec, writer).await
88 } else if upper == "FUNCTIONS" {
89 describe_functions(session, writer).await
90 } else if upper == "FUNCTION" {
91 writeln!(writer, "DESCRIBE FUNCTION requires a function name.")?;
92 Ok(())
93 } else if upper.starts_with("FUNCTION ") {
94 let func_spec = args["FUNCTION ".len()..].trim();
95 let func_spec = strip_quotes(func_spec);
96 describe_function(session, func_spec, writer).await
97 } else if upper == "AGGREGATES" {
98 describe_aggregates(session, writer).await
99 } else if upper == "AGGREGATE" {
100 writeln!(writer, "DESCRIBE AGGREGATE requires an aggregate name.")?;
101 Ok(())
102 } else if upper.starts_with("AGGREGATE ") {
103 let agg_spec = args["AGGREGATE ".len()..].trim();
104 let agg_spec = strip_quotes(agg_spec);
105 describe_aggregate(session, agg_spec, writer).await
106 } else {
107 let name = strip_quotes(args);
110 if name.contains('.') {
111 describe_table(session, name, writer).await
113 } else {
114 let keyspaces = session.get_keyspaces().await?;
116 if keyspaces.iter().any(|ks| ks.name == name) {
117 describe_keyspace(session, Some(name), writer).await
118 } else {
119 describe_table(session, name, writer).await
121 }
122 }
123 }
124}
125
126async fn describe_cluster(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
128 let cluster_name = session.cluster_name.as_deref().unwrap_or("Unknown Cluster");
129
130 writeln!(writer)?;
131 writeln!(writer, "Cluster: {cluster_name}")?;
132 writeln!(writer, "Partitioner: Murmur3Partitioner")?;
133
134 match session
136 .execute_query("SELECT snitch FROM system.local")
137 .await
138 {
139 Ok(result) => {
140 if let Some(row) = result.rows.first() {
141 if let Some(snitch) = row.get(0) {
142 writeln!(writer, "Snitch: {snitch}")?;
143 }
144 }
145 }
146 Err(_) => {
147 }
149 }
150 writeln!(writer)?;
151 Ok(())
152}
153
154async fn describe_keyspaces(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
156 let keyspaces = session.get_keyspaces().await?;
157 writeln!(writer)?;
158 for ks in &keyspaces {
159 write!(writer, "{}", ks.name)?;
160 write!(writer, " ")?;
162 }
163 writeln!(writer)?;
164 writeln!(writer)?;
165 Ok(())
166}
167
168async fn describe_keyspace(
170 session: &CqlSession,
171 keyspace: Option<&str>,
172 writer: &mut dyn Write,
173) -> Result<()> {
174 let ks_name = match keyspace {
175 Some(name) => name,
176 None => {
177 writeln!(
178 writer,
179 "No keyspace specified and no current keyspace. Use DESCRIBE KEYSPACE <name>."
180 )?;
181 return Ok(());
182 }
183 };
184
185 let query = format!(
187 "SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = '{}'",
188 ks_name.replace('\'', "''")
189 );
190 let result = session.execute_query(&query).await?;
191
192 if result.rows.is_empty() {
193 writeln!(writer, "Keyspace '{ks_name}' not found.")?;
194 return Ok(());
195 }
196
197 let dw_query = format!(
199 "SELECT durable_writes FROM system_schema.keyspaces WHERE keyspace_name = '{}'",
200 ks_name.replace('\'', "''")
201 );
202 let dw_result = session.execute_query(&dw_query).await?;
203 let durable_writes = dw_result
204 .rows
205 .first()
206 .and_then(|r| r.get(0))
207 .map(|v| v.to_string() == "True")
208 .unwrap_or(true);
209
210 let replication_str = result
212 .rows
213 .first()
214 .and_then(|r| r.get(0))
215 .map(|v| v.to_string())
216 .unwrap_or_else(|| "{}".to_string());
217
218 writeln!(writer)?;
219 writeln!(
220 writer,
221 "CREATE KEYSPACE {ks_name} WITH replication = {replication_str} AND durable_writes = {durable_writes};"
222 )?;
223
224 let tables = session.get_tables(ks_name).await?;
226 for table in &tables {
227 writeln!(writer)?;
228 write_create_table(writer, table)?;
229 write_table_indexes(session, ks_name, &table.name, writer).await?;
230 }
231
232 writeln!(writer)?;
233 Ok(())
234}
235
236async fn describe_tables(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
238 let keyspace = match session.current_keyspace() {
239 Some(ks) => ks.to_string(),
240 None => {
241 writeln!(
242 writer,
243 "No keyspace selected. Use USE <keyspace> first, or DESCRIBE KEYSPACE <name>."
244 )?;
245 return Ok(());
246 }
247 };
248
249 let tables = session.get_tables(&keyspace).await?;
250 if tables.is_empty() {
251 writeln!(writer)?;
252 writeln!(writer, "Keyspace '{keyspace}' has no tables.")?;
253 writeln!(writer)?;
254 return Ok(());
255 }
256
257 writeln!(writer)?;
258 for table in &tables {
259 write!(writer, "{}", table.name)?;
260 write!(writer, " ")?;
261 }
262 writeln!(writer)?;
263 writeln!(writer)?;
264 Ok(())
265}
266
267async fn describe_table(
269 session: &CqlSession,
270 table_spec: &str,
271 writer: &mut dyn Write,
272) -> Result<()> {
273 let (keyspace, table_name) = if table_spec.contains('.') {
274 let parts: Vec<&str> = table_spec.splitn(2, '.').collect();
275 (parts[0].to_string(), parts[1].to_string())
276 } else {
277 match session.current_keyspace() {
278 Some(ks) => (ks.to_string(), table_spec.to_string()),
279 None => {
280 writeln!(
281 writer,
282 "No keyspace selected. Use a fully qualified name: DESCRIBE TABLE keyspace.table"
283 )?;
284 return Ok(());
285 }
286 }
287 };
288
289 let table = session.get_table_metadata(&keyspace, &table_name).await?;
290
291 match table {
292 Some(meta) => {
293 writeln!(writer)?;
294 write_create_table(writer, &meta)?;
295 writeln!(writer)?;
296 }
297 None => {
298 writeln!(writer, "Table '{keyspace}.{table_name}' not found.")?;
299 }
300 }
301
302 Ok(())
303}
304
305async fn describe_schema(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
307 describe_schema_inner(session, writer, false).await
308}
309
310async fn describe_full_schema(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
312 describe_schema_inner(session, writer, true).await
313}
314
315async fn describe_schema_inner(
317 session: &CqlSession,
318 writer: &mut dyn Write,
319 include_system: bool,
320) -> Result<()> {
321 let keyspaces = session.get_keyspaces().await?;
322
323 let filtered_keyspaces: Vec<_> = if include_system {
324 keyspaces.iter().collect()
325 } else {
326 keyspaces
327 .iter()
328 .filter(|ks| !is_system_keyspace(&ks.name))
329 .collect()
330 };
331
332 if filtered_keyspaces.is_empty() {
333 writeln!(writer)?;
334 writeln!(writer, "No user-defined keyspaces found.")?;
335 writeln!(writer)?;
336 return Ok(());
337 }
338
339 for ks in filtered_keyspaces {
340 describe_keyspace(session, Some(&ks.name), writer).await?;
342
343 let tables = session.get_tables(&ks.name).await?;
345 for table in &tables {
346 writeln!(writer)?;
347 write_create_table(writer, table)?;
348 write_table_indexes(session, &ks.name, &table.name, writer).await?;
349 }
350 }
351
352 writeln!(writer)?;
353 Ok(())
354}
355
356async fn describe_index(
358 session: &CqlSession,
359 index_spec: &str,
360 writer: &mut dyn Write,
361) -> Result<()> {
362 let (keyspace, index_name) = resolve_qualified_name(session, index_spec, writer)?;
363 let keyspace = match keyspace {
364 Some(ks) => ks,
365 None => return Ok(()),
366 };
367
368 let query = format!(
372 "SELECT index_name, table_name, kind, options FROM system_schema.indexes WHERE keyspace_name = '{}'",
373 keyspace.replace('\'', "''"),
374 );
375 let result = session.execute_query(&query).await?;
376
377 let row = result.rows.iter().find(|r| {
378 r.get_by_name("index_name", &result.columns)
379 .map(|v| v.to_string().to_lowercase())
380 .as_deref()
381 == Some(index_name.to_lowercase().as_str())
382 });
383
384 let row = match row {
385 Some(r) => r,
386 None => {
387 writeln!(writer, "Index '{keyspace}.{index_name}' not found.")?;
388 return Ok(());
389 }
390 };
391
392 let idx_name = row
393 .get_by_name("index_name", &result.columns)
394 .map(|v| v.to_string())
395 .unwrap_or_default();
396 let table_name = row
397 .get_by_name("table_name", &result.columns)
398 .map(|v| v.to_string())
399 .unwrap_or_default();
400 let options = row
401 .get_by_name("options", &result.columns)
402 .map(|v| v.to_string())
403 .unwrap_or_default();
404
405 let target = extract_map_value(&options, "target").unwrap_or_else(|| "unknown".to_string());
408
409 write!(
410 writer,
411 "{}",
412 format_index_ddl(&keyspace, &idx_name, &table_name, &target)
413 )?;
414 Ok(())
415}
416
417async fn describe_materialized_view(
419 session: &CqlSession,
420 view_spec: &str,
421 writer: &mut dyn Write,
422) -> Result<()> {
423 let (keyspace, view_name) = resolve_qualified_name(session, view_spec, writer)?;
424 let keyspace = match keyspace {
425 Some(ks) => ks,
426 None => return Ok(()),
427 };
428
429 let query = format!(
430 "SELECT view_name, base_table_name, where_clause, include_all_columns FROM system_schema.views WHERE keyspace_name = '{}' AND view_name = '{}'",
431 keyspace.replace('\'', "''"),
432 view_name.replace('\'', "''")
433 );
434 let result = session.execute_query(&query).await?;
435
436 if result.rows.is_empty() {
437 writeln!(
438 writer,
439 "Materialized view '{keyspace}.{view_name}' not found."
440 )?;
441 return Ok(());
442 }
443
444 let row = &result.rows[0];
445 let mv_name = row
446 .get_by_name("view_name", &result.columns)
447 .map(|v| v.to_string())
448 .unwrap_or_default();
449 let base_table = row
450 .get_by_name("base_table_name", &result.columns)
451 .map(|v| v.to_string())
452 .unwrap_or_default();
453 let where_clause = row
454 .get_by_name("where_clause", &result.columns)
455 .map(|v| v.to_string())
456 .unwrap_or_else(|| "IS NOT NULL".to_string());
457 let include_all = row
458 .get_by_name("include_all_columns", &result.columns)
459 .map(|v| v.to_string() == "True")
460 .unwrap_or(false);
461
462 let col_query = format!(
466 "SELECT column_name, type, kind, position, clustering_order FROM system_schema.columns WHERE keyspace_name = '{}' AND table_name = '{}'",
467 keyspace.replace('\'', "''"),
468 mv_name.replace('\'', "''")
469 );
470 let col_result = session.execute_query(&col_query).await?;
471
472 let mut select_columns = Vec::new();
473 let mut partition_keys: Vec<(i32, String)> = Vec::new();
474 let mut clustering_keys: Vec<(i32, String, String)> = Vec::new();
475
476 for col_row in &col_result.rows {
477 let col_name = col_row
478 .get_by_name("column_name", &col_result.columns)
479 .map(|v| v.to_string())
480 .unwrap_or_default();
481 let kind = col_row
482 .get_by_name("kind", &col_result.columns)
483 .map(|v| v.to_string())
484 .unwrap_or_default();
485 let position = col_row
486 .get_by_name("position", &col_result.columns)
487 .and_then(|v| v.to_string().parse::<i32>().ok())
488 .unwrap_or(0);
489 let clustering_order = col_row
490 .get_by_name("clustering_order", &col_result.columns)
491 .map(|v| v.to_string())
492 .unwrap_or_else(|| "none".to_string());
493
494 select_columns.push(col_name.clone());
495
496 if kind == "partition_key" {
497 partition_keys.push((position, col_name));
498 } else if kind == "clustering" {
499 clustering_keys.push((position, col_name, clustering_order));
500 }
501 }
502
503 partition_keys.sort_by_key(|k| k.0);
504 clustering_keys.sort_by_key(|k| k.0);
505
506 let sorted_pk: Vec<String> = partition_keys
507 .iter()
508 .map(|(_, name)| name.clone())
509 .collect();
510 let sorted_ck: Vec<(String, String)> = clustering_keys
511 .iter()
512 .map(|(_, name, order)| (name.clone(), order.clone()))
513 .collect();
514
515 let props_query = format!(
516 "SELECT bloom_filter_fp_chance, caching, comment, compaction, compression, \
517 crc_check_chance, default_time_to_live, gc_grace_seconds, \
518 max_index_interval, memtable_flush_period_in_ms, min_index_interval, \
519 speculative_retry \
520 FROM system_schema.views \
521 WHERE keyspace_name = '{}' AND view_name = '{}'",
522 keyspace.replace('\'', "''"),
523 mv_name.replace('\'', "''")
524 );
525 let props_result = session.execute_query(&props_query).await?;
526
527 let mut properties = std::collections::BTreeMap::new();
528 if let Some(props_row) = props_result.rows.first() {
529 let prop_names = [
530 "bloom_filter_fp_chance",
531 "caching",
532 "comment",
533 "compaction",
534 "compression",
535 "crc_check_chance",
536 "default_time_to_live",
537 "gc_grace_seconds",
538 "max_index_interval",
539 "memtable_flush_period_in_ms",
540 "min_index_interval",
541 "speculative_retry",
542 ];
543 for prop_name in &prop_names {
544 if let Some(val) = props_row.get_by_name(prop_name, &props_result.columns) {
545 properties.insert(prop_name.to_string(), val.to_string());
546 }
547 }
548 }
549
550 let parts = MvDdlParts {
551 keyspace: &keyspace,
552 view_name: &mv_name,
553 base_table: &base_table,
554 include_all,
555 select_columns: &select_columns,
556 where_clause: &where_clause,
557 partition_keys: &sorted_pk,
558 clustering_keys: &sorted_ck,
559 properties: &properties,
560 };
561 write!(writer, "{}", format_create_mv_ddl(&parts))?;
562 Ok(())
563}
564
565async fn describe_types(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
567 let keyspace = match session.current_keyspace() {
568 Some(ks) => ks.to_string(),
569 None => {
570 writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
571 return Ok(());
572 }
573 };
574
575 let udts = session.get_udts(&keyspace).await?;
576 if udts.is_empty() {
577 writeln!(writer)?;
578 writeln!(writer, "Keyspace '{keyspace}' has no user-defined types.")?;
579 writeln!(writer)?;
580 return Ok(());
581 }
582
583 writeln!(writer)?;
584 for udt in &udts {
585 write!(writer, "{} ", udt.name)?;
586 }
587 writeln!(writer)?;
588 writeln!(writer)?;
589 Ok(())
590}
591
592async fn describe_type(
594 session: &CqlSession,
595 type_spec: &str,
596 writer: &mut dyn Write,
597) -> Result<()> {
598 let (keyspace, type_name) = resolve_qualified_name(session, type_spec, writer)?;
599 let keyspace = match keyspace {
600 Some(ks) => ks,
601 None => return Ok(()),
602 };
603
604 let query = format!(
605 "SELECT type_name, field_names, field_types FROM system_schema.types WHERE keyspace_name = '{}' AND type_name = '{}'",
606 keyspace.replace('\'', "''"),
607 type_name.replace('\'', "''")
608 );
609 let result = session.execute_query(&query).await?;
610
611 if result.rows.is_empty() {
612 writeln!(writer, "Type '{keyspace}.{type_name}' not found.")?;
613 return Ok(());
614 }
615
616 let row = &result.rows[0];
617 let udt_name = row
618 .get_by_name("type_name", &result.columns)
619 .map(|v| v.to_string())
620 .unwrap_or_default();
621 let field_names_str = row
622 .get_by_name("field_names", &result.columns)
623 .map(|v| v.to_string())
624 .unwrap_or_default();
625 let field_types_str = row
626 .get_by_name("field_types", &result.columns)
627 .map(|v| v.to_string())
628 .unwrap_or_default();
629
630 let field_names = parse_list_value(&field_names_str);
631 let field_types = parse_list_value(&field_types_str);
632
633 let field_count = field_names.len().min(field_types.len());
634 let fields: Vec<(String, String)> = field_names
635 .into_iter()
636 .take(field_count)
637 .zip(field_types)
638 .collect();
639 write!(
640 writer,
641 "{}",
642 format_create_type_ddl(&keyspace, &udt_name, &fields)
643 )?;
644 Ok(())
645}
646
647async fn describe_functions(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
649 let keyspace = match session.current_keyspace() {
650 Some(ks) => ks.to_string(),
651 None => {
652 writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
653 return Ok(());
654 }
655 };
656
657 let functions = session.get_functions(&keyspace).await?;
658 if functions.is_empty() {
659 writeln!(writer)?;
660 writeln!(
661 writer,
662 "Keyspace '{keyspace}' has no user-defined functions."
663 )?;
664 writeln!(writer)?;
665 return Ok(());
666 }
667
668 writeln!(writer)?;
669 for func in &functions {
670 write!(writer, "{} ", func.name)?;
671 }
672 writeln!(writer)?;
673 writeln!(writer)?;
674 Ok(())
675}
676
677async fn describe_function(
679 session: &CqlSession,
680 func_spec: &str,
681 writer: &mut dyn Write,
682) -> Result<()> {
683 let (keyspace, func_name) = resolve_qualified_name(session, func_spec, writer)?;
684 let keyspace = match keyspace {
685 Some(ks) => ks,
686 None => return Ok(()),
687 };
688
689 let query = format!(
690 "SELECT function_name, argument_names, argument_types, return_type, language, body, called_on_null_input FROM system_schema.functions WHERE keyspace_name = '{}' AND function_name = '{}'",
691 keyspace.replace('\'', "''"),
692 func_name.replace('\'', "''")
693 );
694 let result = session.execute_query(&query).await?;
695
696 if result.rows.is_empty() {
697 writeln!(writer, "Function '{keyspace}.{func_name}' not found.")?;
698 return Ok(());
699 }
700
701 let row = &result.rows[0];
702 let fn_name = row
703 .get_by_name("function_name", &result.columns)
704 .map(|v| v.to_string())
705 .unwrap_or_default();
706 let arg_names_str = row
707 .get_by_name("argument_names", &result.columns)
708 .map(|v| v.to_string())
709 .unwrap_or_default();
710 let arg_types_str = row
711 .get_by_name("argument_types", &result.columns)
712 .map(|v| v.to_string())
713 .unwrap_or_default();
714 let return_type = row
715 .get_by_name("return_type", &result.columns)
716 .map(|v| v.to_string())
717 .unwrap_or_default();
718 let language = row
719 .get_by_name("language", &result.columns)
720 .map(|v| v.to_string())
721 .unwrap_or_default();
722 let body = row
723 .get_by_name("body", &result.columns)
724 .map(|v| v.to_string())
725 .unwrap_or_default();
726 let called_on_null = row
727 .get_by_name("called_on_null_input", &result.columns)
728 .map(|v| v.to_string() == "True")
729 .unwrap_or(false);
730
731 let arg_names = parse_list_value(&arg_names_str);
732 let arg_types = parse_list_value(&arg_types_str);
733
734 let args_str = arg_names
735 .iter()
736 .zip(arg_types.iter())
737 .map(|(name, typ)| format!("{} {}", quote_if_needed(name), typ))
738 .collect::<Vec<_>>()
739 .join(", ");
740
741 let null_handling = if called_on_null {
742 "CALLED ON NULL INPUT"
743 } else {
744 "RETURNS NULL ON NULL INPUT"
745 };
746
747 write!(
748 writer,
749 "{}",
750 format_create_function_ddl(
751 &keyspace,
752 &fn_name,
753 &args_str,
754 null_handling,
755 &return_type,
756 &language,
757 &body,
758 )
759 )?;
760 Ok(())
761}
762
763async fn describe_aggregates(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
765 let keyspace = match session.current_keyspace() {
766 Some(ks) => ks.to_string(),
767 None => {
768 writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
769 return Ok(());
770 }
771 };
772
773 let aggregates = session.get_aggregates(&keyspace).await?;
774 if aggregates.is_empty() {
775 writeln!(writer)?;
776 writeln!(
777 writer,
778 "Keyspace '{keyspace}' has no user-defined aggregates."
779 )?;
780 writeln!(writer)?;
781 return Ok(());
782 }
783
784 writeln!(writer)?;
785 for agg in &aggregates {
786 write!(writer, "{} ", agg.name)?;
787 }
788 writeln!(writer)?;
789 writeln!(writer)?;
790 Ok(())
791}
792
793async fn describe_aggregate(
795 session: &CqlSession,
796 agg_spec: &str,
797 writer: &mut dyn Write,
798) -> Result<()> {
799 let (keyspace, agg_name) = resolve_qualified_name(session, agg_spec, writer)?;
800 let keyspace = match keyspace {
801 Some(ks) => ks,
802 None => return Ok(()),
803 };
804
805 let query = format!(
806 "SELECT aggregate_name, argument_types, state_func, state_type, final_func, initcond FROM system_schema.aggregates WHERE keyspace_name = '{}' AND aggregate_name = '{}'",
807 keyspace.replace('\'', "''"),
808 agg_name.replace('\'', "''")
809 );
810 let result = session.execute_query(&query).await?;
811
812 if result.rows.is_empty() {
813 writeln!(writer, "Aggregate '{keyspace}.{agg_name}' not found.")?;
814 return Ok(());
815 }
816
817 let row = &result.rows[0];
818 let ag_name = row
819 .get_by_name("aggregate_name", &result.columns)
820 .map(|v| v.to_string())
821 .unwrap_or_default();
822 let arg_types_str = row
823 .get_by_name("argument_types", &result.columns)
824 .map(|v| v.to_string())
825 .unwrap_or_default();
826 let state_func = row
827 .get_by_name("state_func", &result.columns)
828 .map(|v| v.to_string())
829 .unwrap_or_default();
830 let state_type = row
831 .get_by_name("state_type", &result.columns)
832 .map(|v| v.to_string())
833 .unwrap_or_default();
834 let final_func = row
835 .get_by_name("final_func", &result.columns)
836 .map(|v| v.to_string());
837 let initcond = row
838 .get_by_name("initcond", &result.columns)
839 .map(|v| v.to_string());
840
841 let arg_types = parse_list_value(&arg_types_str);
842 let args_str = arg_types.join(", ");
843
844 write!(
845 writer,
846 "{}",
847 format_create_aggregate_ddl(
848 &keyspace,
849 &ag_name,
850 &args_str,
851 &state_func,
852 &state_type,
853 final_func.as_deref(),
854 initcond.as_deref(),
855 )
856 )?;
857 Ok(())
858}
859
860fn write_create_table(writer: &mut dyn Write, meta: &crate::driver::TableMetadata) -> Result<()> {
862 writeln!(
863 writer,
864 "CREATE TABLE {}.{} (",
865 quote_if_needed(&meta.keyspace),
866 quote_if_needed(&meta.name)
867 )?;
868
869 for col in &meta.columns {
871 writeln!(
872 writer,
873 " {} {},",
874 quote_if_needed(&col.name),
875 col.type_name
876 )?;
877 }
878
879 if !meta.partition_key.is_empty() {
881 let pk_str = if meta.partition_key.len() == 1 {
882 quote_if_needed(&meta.partition_key[0])
883 } else {
884 format!(
885 "({})",
886 meta.partition_key
887 .iter()
888 .map(|k| quote_if_needed(k))
889 .collect::<Vec<_>>()
890 .join(", ")
891 )
892 };
893
894 if meta.clustering_key.is_empty() {
895 writeln!(writer, " PRIMARY KEY ({pk_str})")?;
896 } else {
897 let ck_str = meta
898 .clustering_key
899 .iter()
900 .map(|k| quote_if_needed(k))
901 .collect::<Vec<_>>()
902 .join(", ");
903 writeln!(writer, " PRIMARY KEY ({pk_str}, {ck_str})")?;
904 }
905 }
906
907 writeln!(writer, ");")?;
908 Ok(())
909}
910
911async fn write_table_indexes(
913 session: &CqlSession,
914 keyspace: &str,
915 table_name: &str,
916 writer: &mut dyn Write,
917) -> Result<()> {
918 let query = format!(
919 "SELECT index_name, table_name, kind, options FROM system_schema.indexes WHERE keyspace_name = '{}' AND table_name = '{}'",
920 keyspace.replace('\'', "''"),
921 table_name.replace('\'', "''"),
922 );
923 let result = session.execute_query(&query).await?;
924
925 for row in &result.rows {
926 let idx_name = row
927 .get_by_name("index_name", &result.columns)
928 .map(|v| v.to_string())
929 .unwrap_or_default();
930 let tbl_name = row
931 .get_by_name("table_name", &result.columns)
932 .map(|v| v.to_string())
933 .unwrap_or_default();
934 let options = row
935 .get_by_name("options", &result.columns)
936 .map(|v| v.to_string())
937 .unwrap_or_default();
938
939 let target = extract_map_value(&options, "target").unwrap_or_else(|| "unknown".to_string());
940
941 writeln!(
942 writer,
943 "CREATE INDEX {} ON {}.{} ({});",
944 quote_if_needed(&idx_name),
945 quote_if_needed(keyspace),
946 quote_if_needed(&tbl_name),
947 target
948 )?;
949 }
950
951 Ok(())
952}
953
954fn resolve_qualified_name(
959 session: &CqlSession,
960 spec: &str,
961 writer: &mut dyn Write,
962) -> Result<(Option<String>, String)> {
963 if spec.contains('.') {
964 let parts: Vec<&str> = spec.splitn(2, '.').collect();
965 Ok((Some(parts[0].to_string()), parts[1].to_string()))
966 } else {
967 match session.current_keyspace() {
968 Some(ks) => Ok((Some(ks.to_string()), spec.to_string())),
969 None => {
970 writeln!(
971 writer,
972 "No keyspace selected. Use a fully qualified name (keyspace.name) or USE <keyspace> first."
973 )?;
974 Ok((None, spec.to_string()))
975 }
976 }
977 }
978}
979
980fn parse_list_value(s: &str) -> Vec<String> {
982 let trimmed = s.trim();
983 if trimmed == "[]" || trimmed.is_empty() {
985 return Vec::new();
986 }
987 let inner = if trimmed.starts_with('[') && trimmed.ends_with(']') {
989 &trimmed[1..trimmed.len() - 1]
990 } else {
991 trimmed
992 };
993 if inner.trim().is_empty() {
994 return Vec::new();
995 }
996 inner
997 .split(',')
998 .map(|s| {
999 let s = s.trim();
1000 if (s.starts_with('\'') && s.ends_with('\''))
1002 || (s.starts_with('"') && s.ends_with('"'))
1003 {
1004 s[1..s.len() - 1].to_string()
1005 } else {
1006 s.to_string()
1007 }
1008 })
1009 .collect()
1010}
1011
1012fn extract_map_value(map_str: &str, key: &str) -> Option<String> {
1014 let trimmed = map_str.trim();
1015 let inner = if trimmed.starts_with('{') && trimmed.ends_with('}') {
1017 &trimmed[1..trimmed.len() - 1]
1018 } else {
1019 trimmed
1020 };
1021
1022 for entry in inner.split(',') {
1024 let parts: Vec<&str> = entry.splitn(2, ':').collect();
1025 if parts.len() == 2 {
1026 let k = parts[0].trim().trim_matches('\'').trim_matches('"');
1027 let v = parts[1].trim().trim_matches('\'').trim_matches('"');
1028 if k == key {
1029 return Some(v.to_string());
1030 }
1031 }
1032 }
1033 None
1034}
1035
1036fn is_system_keyspace(name: &str) -> bool {
1038 name.starts_with("system")
1039 || name == "dse_system"
1040 || name == "dse_perf"
1041 || name == "dse_security"
1042 || name == "dse_leases"
1043 || name == "dse_system_local"
1044 || name == "dse_insights"
1045 || name == "solr_admin"
1046}
1047
1048fn quote_if_needed(name: &str) -> String {
1050 if name
1052 .chars()
1053 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
1054 && !name.is_empty()
1055 && !name.starts_with(|c: char| c.is_ascii_digit())
1056 {
1057 name.to_string()
1058 } else {
1059 format!("\"{}\"", name.replace('"', "\"\""))
1060 }
1061}
1062
1063fn strip_quotes(s: &str) -> &str {
1065 if (s.starts_with('"') && s.ends_with('"')) || (s.starts_with('\'') && s.ends_with('\'')) {
1066 &s[1..s.len() - 1]
1067 } else {
1068 s
1069 }
1070}
1071
1072fn format_index_ddl(keyspace: &str, index_name: &str, table_name: &str, target: &str) -> String {
1076 format!(
1077 "\nCREATE INDEX {} ON {}.{} ({});\n\n",
1078 quote_if_needed(index_name),
1079 quote_if_needed(keyspace),
1080 quote_if_needed(table_name),
1081 target
1082 )
1083}
1084
1085fn format_create_type_ddl(keyspace: &str, type_name: &str, fields: &[(String, String)]) -> String {
1087 let mut out = String::new();
1088 out.push('\n');
1089 out.push_str(&format!(
1090 "CREATE TYPE {}.{} (\n",
1091 quote_if_needed(keyspace),
1092 quote_if_needed(type_name)
1093 ));
1094 let field_count = fields.len();
1095 for (i, (name, typ)) in fields.iter().enumerate() {
1096 let comma = if i < field_count - 1 { "," } else { "" };
1097 out.push_str(&format!(" {} {}{}\n", quote_if_needed(name), typ, comma));
1098 }
1099 out.push_str(");\n\n");
1100 out
1101}
1102
1103fn format_create_function_ddl(
1105 keyspace: &str,
1106 func_name: &str,
1107 args_str: &str,
1108 null_handling: &str,
1109 return_type: &str,
1110 language: &str,
1111 body: &str,
1112) -> String {
1113 format!(
1114 "\nCREATE OR REPLACE FUNCTION {}.{} ({})\n {}\n RETURNS {}\n LANGUAGE {}\n AS $$ {} $$;\n\n",
1115 quote_if_needed(keyspace),
1116 quote_if_needed(func_name),
1117 args_str,
1118 null_handling,
1119 return_type,
1120 language,
1121 body
1122 )
1123}
1124
1125fn format_create_aggregate_ddl(
1127 keyspace: &str,
1128 agg_name: &str,
1129 args_str: &str,
1130 state_func: &str,
1131 state_type: &str,
1132 final_func: Option<&str>,
1133 initcond: Option<&str>,
1134) -> String {
1135 let mut out = format!(
1136 "\nCREATE OR REPLACE AGGREGATE {}.{} ({})\n SFUNC {}\n STYPE {}",
1137 quote_if_needed(keyspace),
1138 quote_if_needed(agg_name),
1139 args_str,
1140 state_func,
1141 state_type
1142 );
1143 if let Some(ff) = final_func {
1144 if !ff.is_empty() && ff != "null" {
1145 out.push_str(&format!("\n FINALFUNC {ff}"));
1146 }
1147 }
1148 if let Some(ic) = initcond {
1149 if !ic.is_empty() && ic != "null" {
1150 out.push_str(&format!("\n INITCOND {ic}"));
1151 }
1152 }
1153 out.push_str("\n;\n\n");
1154 out
1155}
1156
1157struct MvDdlParts<'a> {
1159 keyspace: &'a str,
1160 view_name: &'a str,
1161 base_table: &'a str,
1162 include_all: bool,
1163 select_columns: &'a [String],
1164 where_clause: &'a str,
1165 partition_keys: &'a [String], clustering_keys: &'a [(String, String)], properties: &'a std::collections::BTreeMap<String, String>,
1168}
1169
1170fn format_create_mv_ddl(parts: &MvDdlParts<'_>) -> String {
1172 let mut out = String::new();
1173 out.push('\n');
1174 out.push_str(&format!(
1175 "CREATE MATERIALIZED VIEW {}.{} AS\n",
1176 quote_if_needed(parts.keyspace),
1177 quote_if_needed(parts.view_name)
1178 ));
1179
1180 let columns_str = if parts.include_all {
1181 "*".to_string()
1182 } else {
1183 parts
1184 .select_columns
1185 .iter()
1186 .map(|c| quote_if_needed(c))
1187 .collect::<Vec<_>>()
1188 .join(", ")
1189 };
1190 out.push_str(&format!(" SELECT {columns_str}\n"));
1191 out.push_str(&format!(
1192 " FROM {}.{}\n",
1193 quote_if_needed(parts.keyspace),
1194 quote_if_needed(parts.base_table)
1195 ));
1196 out.push_str(&format!(" WHERE {}\n", parts.where_clause));
1197
1198 let pk_str = if parts.partition_keys.len() == 1 {
1199 quote_if_needed(&parts.partition_keys[0])
1200 } else {
1201 format!(
1202 "({})",
1203 parts
1204 .partition_keys
1205 .iter()
1206 .map(|k| quote_if_needed(k))
1207 .collect::<Vec<_>>()
1208 .join(", ")
1209 )
1210 };
1211
1212 if parts.clustering_keys.is_empty() {
1213 out.push_str(&format!(" PRIMARY KEY ({pk_str})\n"));
1214 } else {
1215 let ck_str = parts
1216 .clustering_keys
1217 .iter()
1218 .map(|(name, _)| quote_if_needed(name))
1219 .collect::<Vec<_>>()
1220 .join(", ");
1221 out.push_str(&format!(" PRIMARY KEY ({pk_str}, {ck_str})\n"));
1222 }
1223
1224 let mut first_with = true;
1226 if !parts.clustering_keys.is_empty() {
1227 let order_str = parts
1228 .clustering_keys
1229 .iter()
1230 .map(|(name, order)| format!("{} {}", quote_if_needed(name), order.to_uppercase()))
1231 .collect::<Vec<_>>()
1232 .join(", ");
1233 out.push_str(&format!(" WITH CLUSTERING ORDER BY ({order_str})"));
1234 first_with = false;
1235 }
1236
1237 let prop_order = [
1238 "bloom_filter_fp_chance",
1239 "caching",
1240 "comment",
1241 "compaction",
1242 "compression",
1243 "crc_check_chance",
1244 "default_time_to_live",
1245 "gc_grace_seconds",
1246 "max_index_interval",
1247 "memtable_flush_period_in_ms",
1248 "min_index_interval",
1249 "speculative_retry",
1250 ];
1251
1252 for prop_name in &prop_order {
1253 if let Some(value) = parts.properties.get(*prop_name) {
1254 let formatted_value = format_property_value(prop_name, value);
1255 if first_with {
1256 out.push_str(&format!(" WITH {} = {}", prop_name, formatted_value));
1257 first_with = false;
1258 } else {
1259 out.push_str(&format!("\n AND {} = {}", prop_name, formatted_value));
1260 }
1261 }
1262 }
1263
1264 out.push_str(";\n");
1265
1266 out.push('\n');
1267 out
1268}
1269
1270fn format_property_value(name: &str, value: &str) -> String {
1275 match name {
1276 "comment" | "speculative_retry" => format!("'{}'", value.replace('\'', "''")),
1277 "caching" | "compaction" | "compression" => value.to_string(),
1278 _ => value.to_string(),
1279 }
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284 use super::*;
1285
1286 #[test]
1287 fn system_keyspace_detection() {
1288 assert!(is_system_keyspace("system"));
1289 assert!(is_system_keyspace("system_schema"));
1290 assert!(is_system_keyspace("system_auth"));
1291 assert!(is_system_keyspace("system_traces"));
1292 assert!(!is_system_keyspace("my_keyspace"));
1293 assert!(!is_system_keyspace("users"));
1294 }
1295
1296 #[test]
1297 fn quote_simple_identifier() {
1298 assert_eq!(quote_if_needed("users"), "users");
1299 assert_eq!(quote_if_needed("my_table"), "my_table");
1300 }
1301
1302 #[test]
1303 fn quote_mixed_case_identifier() {
1304 assert_eq!(quote_if_needed("MyTable"), "\"MyTable\"");
1305 }
1306
1307 #[test]
1308 fn quote_identifier_with_spaces() {
1309 assert_eq!(quote_if_needed("my table"), "\"my table\"");
1310 }
1311
1312 #[test]
1313 fn quote_identifier_starting_with_digit() {
1314 assert_eq!(quote_if_needed("1table"), "\"1table\"");
1315 }
1316
1317 #[test]
1318 fn strip_quotes_test() {
1319 assert_eq!(strip_quotes("\"hello\""), "hello");
1320 assert_eq!(strip_quotes("'hello'"), "hello");
1321 assert_eq!(strip_quotes("hello"), "hello");
1322 }
1323
1324 #[test]
1325 fn parse_list_value_test() {
1326 assert_eq!(parse_list_value("[]"), Vec::<String>::new());
1327 assert_eq!(parse_list_value(""), Vec::<String>::new());
1328 assert_eq!(parse_list_value("['a', 'b', 'c']"), vec!["a", "b", "c"]);
1329 assert_eq!(
1330 parse_list_value("[int, text, uuid]"),
1331 vec!["int", "text", "uuid"]
1332 );
1333 }
1334
1335 #[test]
1336 fn extract_map_value_test() {
1337 assert_eq!(
1338 extract_map_value("{'target': 'email', 'class_name': 'foo'}", "target"),
1339 Some("email".to_string())
1340 );
1341 assert_eq!(extract_map_value("{'target': 'email'}", "missing"), None);
1342 }
1343
1344 #[test]
1345 fn write_create_table_simple() {
1346 use crate::driver::{ColumnMetadata, TableMetadata};
1347
1348 let meta = TableMetadata {
1349 keyspace: "test_ks".to_string(),
1350 name: "users".to_string(),
1351 columns: vec![
1352 ColumnMetadata {
1353 name: "id".to_string(),
1354 type_name: "uuid".to_string(),
1355 },
1356 ColumnMetadata {
1357 name: "name".to_string(),
1358 type_name: "text".to_string(),
1359 },
1360 ColumnMetadata {
1361 name: "age".to_string(),
1362 type_name: "int".to_string(),
1363 },
1364 ],
1365 partition_key: vec!["id".to_string()],
1366 clustering_key: vec![],
1367 };
1368
1369 let mut buf = Vec::new();
1370 write_create_table(&mut buf, &meta).unwrap();
1371 let output = String::from_utf8(buf).unwrap();
1372 assert!(output.contains("CREATE TABLE test_ks.users"));
1373 assert!(output.contains("id uuid"));
1374 assert!(output.contains("name text"));
1375 assert!(output.contains("PRIMARY KEY (id)"));
1376 }
1377
1378 #[test]
1379 fn write_create_table_composite_key() {
1380 use crate::driver::{ColumnMetadata, TableMetadata};
1381
1382 let meta = TableMetadata {
1383 keyspace: "ks".to_string(),
1384 name: "events".to_string(),
1385 columns: vec![
1386 ColumnMetadata {
1387 name: "user_id".to_string(),
1388 type_name: "uuid".to_string(),
1389 },
1390 ColumnMetadata {
1391 name: "event_time".to_string(),
1392 type_name: "timestamp".to_string(),
1393 },
1394 ColumnMetadata {
1395 name: "data".to_string(),
1396 type_name: "text".to_string(),
1397 },
1398 ],
1399 partition_key: vec!["user_id".to_string()],
1400 clustering_key: vec!["event_time".to_string()],
1401 };
1402
1403 let mut buf = Vec::new();
1404 write_create_table(&mut buf, &meta).unwrap();
1405 let output = String::from_utf8(buf).unwrap();
1406 assert!(output.contains("PRIMARY KEY (user_id, event_time)"));
1407 }
1408
1409 #[test]
1410 fn write_create_table_compound_partition_key() {
1411 use crate::driver::{ColumnMetadata, TableMetadata};
1412
1413 let meta = TableMetadata {
1414 keyspace: "ks".to_string(),
1415 name: "metrics".to_string(),
1416 columns: vec![
1417 ColumnMetadata {
1418 name: "host".to_string(),
1419 type_name: "text".to_string(),
1420 },
1421 ColumnMetadata {
1422 name: "metric".to_string(),
1423 type_name: "text".to_string(),
1424 },
1425 ColumnMetadata {
1426 name: "ts".to_string(),
1427 type_name: "timestamp".to_string(),
1428 },
1429 ColumnMetadata {
1430 name: "value".to_string(),
1431 type_name: "double".to_string(),
1432 },
1433 ],
1434 partition_key: vec!["host".to_string(), "metric".to_string()],
1435 clustering_key: vec!["ts".to_string()],
1436 };
1437
1438 let mut buf = Vec::new();
1439 write_create_table(&mut buf, &meta).unwrap();
1440 let output = String::from_utf8(buf).unwrap();
1441 assert!(output.contains("PRIMARY KEY ((host, metric), ts)"));
1442 }
1443
1444 #[test]
1447 fn format_index_ddl_simple() {
1448 let ddl = format_index_ddl("my_ks", "email_idx", "users", "email");
1449 assert!(ddl.contains("CREATE INDEX email_idx ON my_ks.users (email);"));
1450 }
1451
1452 #[test]
1453 fn format_index_ddl_quoted_names() {
1454 let ddl = format_index_ddl("MyKs", "MyIdx", "MyTable", "email");
1455 assert!(ddl.contains("\"MyKs\""));
1456 assert!(ddl.contains("\"MyIdx\""));
1457 assert!(ddl.contains("\"MyTable\""));
1458 }
1459
1460 #[test]
1461 fn format_create_type_ddl_single_field() {
1462 let fields = vec![("street".to_string(), "text".to_string())];
1463 let ddl = format_create_type_ddl("ks1", "address", &fields);
1464 assert!(ddl.contains("CREATE TYPE ks1.address ("));
1465 assert!(ddl.contains("street text"));
1466 assert!(ddl.contains(");"));
1467 }
1468
1469 #[test]
1470 fn format_create_type_ddl_multiple_fields() {
1471 let fields = vec![
1472 ("street".to_string(), "text".to_string()),
1473 ("city".to_string(), "text".to_string()),
1474 ("zip".to_string(), "int".to_string()),
1475 ];
1476 let ddl = format_create_type_ddl("ks1", "address", &fields);
1477 assert!(
1478 ddl.contains("street text,"),
1479 "expected trailing comma: {ddl}"
1480 );
1481 assert!(ddl.contains("city text,"), "expected trailing comma: {ddl}");
1482 assert!(
1484 !ddl.contains("int,"),
1485 "last field should not have comma: {ddl}"
1486 );
1487 }
1488
1489 #[test]
1490 fn format_create_function_ddl_called_on_null() {
1491 let ddl = format_create_function_ddl(
1492 "ks1",
1493 "add_one",
1494 "val int",
1495 "CALLED ON NULL INPUT",
1496 "int",
1497 "java",
1498 "return val + 1;",
1499 );
1500 assert!(ddl.contains("CREATE OR REPLACE FUNCTION ks1.add_one (val int)"));
1501 assert!(ddl.contains("CALLED ON NULL INPUT"));
1502 assert!(ddl.contains("RETURNS int"));
1503 assert!(ddl.contains("LANGUAGE java"));
1504 assert!(ddl.contains("AS $$ return val + 1; $$;"));
1505 }
1506
1507 #[test]
1508 fn format_create_function_ddl_returns_null() {
1509 let ddl = format_create_function_ddl(
1510 "ks1",
1511 "my_func",
1512 "x text",
1513 "RETURNS NULL ON NULL INPUT",
1514 "text",
1515 "lua",
1516 "return x",
1517 );
1518 assert!(ddl.contains("RETURNS NULL ON NULL INPUT"));
1519 assert!(!ddl.contains("CALLED ON NULL INPUT"));
1520 }
1521
1522 #[test]
1523 fn format_create_aggregate_ddl_minimal() {
1524 let ddl =
1525 format_create_aggregate_ddl("ks1", "my_sum", "int", "state_add", "int", None, None);
1526 assert!(ddl.contains("CREATE OR REPLACE AGGREGATE ks1.my_sum (int)"));
1527 assert!(ddl.contains("SFUNC state_add"));
1528 assert!(ddl.contains("STYPE int"));
1529 assert!(!ddl.contains("FINALFUNC"));
1530 assert!(!ddl.contains("INITCOND"));
1531 assert!(ddl.contains(';'));
1532 }
1533
1534 #[test]
1535 fn format_create_aggregate_ddl_with_optional() {
1536 let ddl = format_create_aggregate_ddl(
1537 "ks1",
1538 "my_avg",
1539 "int",
1540 "state_avg",
1541 "tuple<int,int>",
1542 Some("final_avg"),
1543 Some("0"),
1544 );
1545 assert!(ddl.contains("FINALFUNC final_avg"));
1546 assert!(ddl.contains("INITCOND 0"));
1547 }
1548
1549 #[test]
1550 fn format_create_aggregate_ddl_empty_optional_skipped() {
1551 let ddl = format_create_aggregate_ddl(
1552 "ks1",
1553 "my_agg",
1554 "int",
1555 "sf",
1556 "int",
1557 Some(""),
1558 Some("null"),
1559 );
1560 assert!(
1561 !ddl.contains("FINALFUNC"),
1562 "empty FINALFUNC should be omitted: {ddl}"
1563 );
1564 assert!(
1565 !ddl.contains("INITCOND"),
1566 "'null' INITCOND should be omitted: {ddl}"
1567 );
1568 }
1569
1570 #[test]
1571 fn format_create_mv_ddl_simple() {
1572 let cols = vec!["id".to_string(), "email".to_string()];
1573 let properties = std::collections::BTreeMap::new();
1574 let parts = MvDdlParts {
1575 keyspace: "ks1",
1576 view_name: "user_by_email",
1577 base_table: "users",
1578 include_all: false,
1579 select_columns: &cols,
1580 where_clause: "email IS NOT NULL",
1581 partition_keys: &["email".to_string()],
1582 clustering_keys: &[],
1583 properties: &properties,
1584 };
1585 let ddl = format_create_mv_ddl(&parts);
1586 assert!(ddl.contains("CREATE MATERIALIZED VIEW ks1.user_by_email AS"));
1587 assert!(ddl.contains("SELECT id, email"));
1588 assert!(ddl.contains("FROM ks1.users"));
1589 assert!(ddl.contains("WHERE email IS NOT NULL"));
1590 assert!(ddl.contains("PRIMARY KEY (email)"));
1591 }
1592
1593 #[test]
1594 fn format_create_mv_ddl_include_all() {
1595 let properties = std::collections::BTreeMap::new();
1596 let parts = MvDdlParts {
1597 keyspace: "ks1",
1598 view_name: "mv_all",
1599 base_table: "base",
1600 include_all: true,
1601 select_columns: &["id".to_string()],
1602 where_clause: "id IS NOT NULL",
1603 partition_keys: &["id".to_string()],
1604 clustering_keys: &[],
1605 properties: &properties,
1606 };
1607 let ddl = format_create_mv_ddl(&parts);
1608 assert!(
1609 ddl.contains("SELECT *"),
1610 "include_all should emit SELECT *: {ddl}"
1611 );
1612 }
1613
1614 #[test]
1615 fn format_create_mv_ddl_with_clustering_desc() {
1616 let cols = vec!["user_id".to_string(), "ts".to_string()];
1617 let ck = vec![("ts".to_string(), "DESC".to_string())];
1618 let properties = std::collections::BTreeMap::new();
1619 let parts = MvDdlParts {
1620 keyspace: "ks1",
1621 view_name: "mv_ordered",
1622 base_table: "events",
1623 include_all: false,
1624 select_columns: &cols,
1625 where_clause: "ts IS NOT NULL",
1626 partition_keys: &["user_id".to_string()],
1627 clustering_keys: &ck,
1628 properties: &properties,
1629 };
1630 let ddl = format_create_mv_ddl(&parts);
1631 assert!(ddl.contains("PRIMARY KEY (user_id, ts)"));
1632 assert!(ddl.contains("WITH CLUSTERING ORDER BY (ts DESC)"));
1633 }
1634
1635 #[test]
1636 fn format_create_mv_ddl_with_properties() {
1637 let cols = vec!["state".to_string(), "username".to_string()];
1638 let ck = vec![("username".to_string(), "ASC".to_string())];
1639 let mut properties = std::collections::BTreeMap::new();
1640 properties.insert("bloom_filter_fp_chance".to_string(), "0.01".to_string());
1641 properties.insert("comment".to_string(), "".to_string());
1642 properties.insert("gc_grace_seconds".to_string(), "864000".to_string());
1643 let parts = MvDdlParts {
1644 keyspace: "test",
1645 view_name: "users_by_state",
1646 base_table: "users",
1647 include_all: true,
1648 select_columns: &cols,
1649 where_clause: "state IS NOT null AND username IS NOT null",
1650 partition_keys: &["state".to_string()],
1651 clustering_keys: &ck,
1652 properties: &properties,
1653 };
1654 let ddl = format_create_mv_ddl(&parts);
1655 assert!(
1656 ddl.contains("WITH CLUSTERING ORDER BY (username ASC)"),
1657 "should always emit CLUSTERING ORDER BY for MVs: {ddl}"
1658 );
1659 assert!(ddl.contains("AND bloom_filter_fp_chance = 0.01"));
1660 assert!(ddl.contains("AND comment = ''"));
1661 assert!(ddl.contains("AND gc_grace_seconds = 864000"));
1662 }
1663}