cqlsh_rs/
describe.rs

1//! DESCRIBE command implementations for cqlsh-rs.
2//!
3//! Provides schema introspection commands matching Python cqlsh:
4//! - DESCRIBE CLUSTER
5//! - DESCRIBE KEYSPACES
6//! - DESCRIBE KEYSPACE [name]
7//! - DESCRIBE TABLES
8//! - DESCRIBE TABLE <name>
9//! - DESCRIBE SCHEMA
10//! - DESCRIBE FULL SCHEMA
11//! - DESCRIBE INDEX <name>
12//! - DESCRIBE MATERIALIZED VIEW <name>
13//! - DESCRIBE TYPE <name> / DESCRIBE TYPES
14//! - DESCRIBE FUNCTION <name> / DESCRIBE FUNCTIONS
15//! - DESCRIBE AGGREGATE <name> / DESCRIBE AGGREGATES
16
17use std::io::Write;
18
19use anyhow::Result;
20
21use crate::session::CqlSession;
22
23/// Execute a DESCRIBE command and write output.
24///
25/// Parses the DESCRIBE subcommand and dispatches to the appropriate handler.
26pub async fn execute(session: &CqlSession, args: &str, writer: &mut dyn Write) -> Result<()> {
27    let args = args.trim();
28    let upper = args.to_uppercase();
29
30    // DESCRIBE with no args — show help
31    if args.is_empty() {
32        writeln!(
33            writer,
34            "Usage: DESCRIBE [CLUSTER | KEYSPACES | KEYSPACE [name] | TABLES | TABLE <name> | SCHEMA | FULL SCHEMA | INDEX <name> | MATERIALIZED VIEW <name> | TYPES | TYPE <name> | FUNCTIONS | FUNCTION <name> | AGGREGATES | AGGREGATE <name>]"
35        )?;
36        return Ok(());
37    }
38
39    if upper == "CLUSTER" {
40        describe_cluster(session, writer).await
41    } else if upper == "KEYSPACES" {
42        describe_keyspaces(session, writer).await
43    } else if upper == "TABLES" {
44        describe_tables(session, writer).await
45    } else if upper == "FULL SCHEMA" {
46        describe_full_schema(session, writer).await
47    } else if upper == "SCHEMA" {
48        describe_schema(session, writer).await
49    } else if upper == "KEYSPACE" {
50        // DESCRIBE KEYSPACE with no name → current keyspace
51        describe_keyspace(session, session.current_keyspace(), writer).await
52    } else if upper.starts_with("KEYSPACE ") {
53        // DESCRIBE KEYSPACE <name>
54        let ks_name = args["KEYSPACE ".len()..].trim();
55        let ks_name = strip_quotes(ks_name);
56        describe_keyspace(session, Some(ks_name), writer).await
57    } else if upper == "TABLE" {
58        writeln!(writer, "DESCRIBE TABLE requires a table name.")?;
59        Ok(())
60    } else if upper.starts_with("TABLE ") {
61        // DESCRIBE TABLE <name>
62        let table_spec = args["TABLE ".len()..].trim();
63        let table_spec = strip_quotes(table_spec);
64        describe_table(session, table_spec, writer).await
65    } else if upper == "INDEX" {
66        writeln!(writer, "DESCRIBE INDEX requires an index name.")?;
67        Ok(())
68    } else if upper.starts_with("INDEX ") {
69        let index_spec = args["INDEX ".len()..].trim();
70        let index_spec = strip_quotes(index_spec);
71        describe_index(session, index_spec, writer).await
72    } else if upper == "MATERIALIZED VIEW" {
73        writeln!(writer, "DESCRIBE MATERIALIZED VIEW requires a view name.")?;
74        Ok(())
75    } else if upper.starts_with("MATERIALIZED VIEW ") {
76        let view_spec = args["MATERIALIZED VIEW ".len()..].trim();
77        let view_spec = strip_quotes(view_spec);
78        describe_materialized_view(session, view_spec, writer).await
79    } else if upper == "TYPES" {
80        describe_types(session, writer).await
81    } else if upper == "TYPE" {
82        writeln!(writer, "DESCRIBE TYPE requires a type name.")?;
83        Ok(())
84    } else if upper.starts_with("TYPE ") {
85        let type_spec = args["TYPE ".len()..].trim();
86        let type_spec = strip_quotes(type_spec);
87        describe_type(session, type_spec, writer).await
88    } else if upper == "FUNCTIONS" {
89        describe_functions(session, writer).await
90    } else if upper == "FUNCTION" {
91        writeln!(writer, "DESCRIBE FUNCTION requires a function name.")?;
92        Ok(())
93    } else if upper.starts_with("FUNCTION ") {
94        let func_spec = args["FUNCTION ".len()..].trim();
95        let func_spec = strip_quotes(func_spec);
96        describe_function(session, func_spec, writer).await
97    } else if upper == "AGGREGATES" {
98        describe_aggregates(session, writer).await
99    } else if upper == "AGGREGATE" {
100        writeln!(writer, "DESCRIBE AGGREGATE requires an aggregate name.")?;
101        Ok(())
102    } else if upper.starts_with("AGGREGATE ") {
103        let agg_spec = args["AGGREGATE ".len()..].trim();
104        let agg_spec = strip_quotes(agg_spec);
105        describe_aggregate(session, agg_spec, writer).await
106    } else {
107        // Try to guess: could be a keyspace name, table name, or keyspace.table
108        // Check if it matches a keyspace first, then a table in current keyspace
109        let name = strip_quotes(args);
110        if name.contains('.') {
111            // Qualified table name: keyspace.table
112            describe_table(session, name, writer).await
113        } else {
114            // Try as keyspace first
115            let keyspaces = session.get_keyspaces().await?;
116            if keyspaces.iter().any(|ks| ks.name == name) {
117                describe_keyspace(session, Some(name), writer).await
118            } else {
119                // Try as table in current keyspace
120                describe_table(session, name, writer).await
121            }
122        }
123    }
124}
125
126/// DESCRIBE CLUSTER — show cluster name and partitioner.
127async fn describe_cluster(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
128    let cluster_name = session.cluster_name.as_deref().unwrap_or("Unknown Cluster");
129
130    writeln!(writer)?;
131    writeln!(writer, "Cluster: {cluster_name}")?;
132    writeln!(writer, "Partitioner: Murmur3Partitioner")?;
133
134    // Fetch snitch info from system.local
135    match session
136        .execute_query("SELECT snitch FROM system.local")
137        .await
138    {
139        Ok(result) => {
140            if let Some(row) = result.rows.first() {
141                if let Some(snitch) = row.get(0) {
142                    writeln!(writer, "Snitch: {snitch}")?;
143                }
144            }
145        }
146        Err(_) => {
147            // Snitch info may not be available in all configurations
148        }
149    }
150    writeln!(writer)?;
151    Ok(())
152}
153
154/// DESCRIBE KEYSPACES — list all keyspace names.
155async fn describe_keyspaces(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
156    let keyspaces = session.get_keyspaces().await?;
157    writeln!(writer)?;
158    for ks in &keyspaces {
159        write!(writer, "{}", ks.name)?;
160        // Add spaces between keyspace names (Python cqlsh style)
161        write!(writer, "  ")?;
162    }
163    writeln!(writer)?;
164    writeln!(writer)?;
165    Ok(())
166}
167
168/// DESCRIBE KEYSPACE [name] — show CREATE KEYSPACE and all objects within it.
169async fn describe_keyspace(
170    session: &CqlSession,
171    keyspace: Option<&str>,
172    writer: &mut dyn Write,
173) -> Result<()> {
174    let ks_name = match keyspace {
175        Some(name) => name,
176        None => {
177            writeln!(
178                writer,
179                "No keyspace specified and no current keyspace. Use DESCRIBE KEYSPACE <name>."
180            )?;
181            return Ok(());
182        }
183    };
184
185    // Fetch keyspace details from system_schema
186    let query = format!(
187        "SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = '{}'",
188        ks_name.replace('\'', "''")
189    );
190    let result = session.execute_query(&query).await?;
191
192    if result.rows.is_empty() {
193        writeln!(writer, "Keyspace '{ks_name}' not found.")?;
194        return Ok(());
195    }
196
197    // Fetch durable_writes
198    let dw_query = format!(
199        "SELECT durable_writes FROM system_schema.keyspaces WHERE keyspace_name = '{}'",
200        ks_name.replace('\'', "''")
201    );
202    let dw_result = session.execute_query(&dw_query).await?;
203    let durable_writes = dw_result
204        .rows
205        .first()
206        .and_then(|r| r.get(0))
207        .map(|v| v.to_string() == "True")
208        .unwrap_or(true);
209
210    // Build replication string from the map value
211    let replication_str = result
212        .rows
213        .first()
214        .and_then(|r| r.get(0))
215        .map(|v| v.to_string())
216        .unwrap_or_else(|| "{}".to_string());
217
218    writeln!(writer)?;
219    writeln!(
220        writer,
221        "CREATE KEYSPACE {ks_name} WITH replication = {replication_str} AND durable_writes = {durable_writes};"
222    )?;
223
224    // Print all tables and their indexes in this keyspace
225    let tables = session.get_tables(ks_name).await?;
226    for table in &tables {
227        writeln!(writer)?;
228        write_create_table(writer, table)?;
229        write_table_indexes(session, ks_name, &table.name, writer).await?;
230    }
231
232    writeln!(writer)?;
233    Ok(())
234}
235
236/// DESCRIBE TABLES — list tables in the current keyspace.
237async fn describe_tables(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
238    let keyspace = match session.current_keyspace() {
239        Some(ks) => ks.to_string(),
240        None => {
241            writeln!(
242                writer,
243                "No keyspace selected. Use USE <keyspace> first, or DESCRIBE KEYSPACE <name>."
244            )?;
245            return Ok(());
246        }
247    };
248
249    let tables = session.get_tables(&keyspace).await?;
250    if tables.is_empty() {
251        writeln!(writer)?;
252        writeln!(writer, "Keyspace '{keyspace}' has no tables.")?;
253        writeln!(writer)?;
254        return Ok(());
255    }
256
257    writeln!(writer)?;
258    for table in &tables {
259        write!(writer, "{}", table.name)?;
260        write!(writer, "  ")?;
261    }
262    writeln!(writer)?;
263    writeln!(writer)?;
264    Ok(())
265}
266
267/// DESCRIBE TABLE <name> — show CREATE TABLE statement.
268async fn describe_table(
269    session: &CqlSession,
270    table_spec: &str,
271    writer: &mut dyn Write,
272) -> Result<()> {
273    let (keyspace, table_name) = if table_spec.contains('.') {
274        let parts: Vec<&str> = table_spec.splitn(2, '.').collect();
275        (parts[0].to_string(), parts[1].to_string())
276    } else {
277        match session.current_keyspace() {
278            Some(ks) => (ks.to_string(), table_spec.to_string()),
279            None => {
280                writeln!(
281                    writer,
282                    "No keyspace selected. Use a fully qualified name: DESCRIBE TABLE keyspace.table"
283                )?;
284                return Ok(());
285            }
286        }
287    };
288
289    let table = session.get_table_metadata(&keyspace, &table_name).await?;
290
291    match table {
292        Some(meta) => {
293            writeln!(writer)?;
294            write_create_table(writer, &meta)?;
295            writeln!(writer)?;
296        }
297        None => {
298            writeln!(writer, "Table '{keyspace}.{table_name}' not found.")?;
299        }
300    }
301
302    Ok(())
303}
304
305/// DESCRIBE SCHEMA — show CREATE statements for all user keyspaces and their tables.
306async fn describe_schema(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
307    describe_schema_inner(session, writer, false).await
308}
309
310/// DESCRIBE FULL SCHEMA — show CREATE statements for ALL keyspaces (including system).
311async fn describe_full_schema(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
312    describe_schema_inner(session, writer, true).await
313}
314
315/// Shared implementation for DESCRIBE SCHEMA and DESCRIBE FULL SCHEMA.
316async fn describe_schema_inner(
317    session: &CqlSession,
318    writer: &mut dyn Write,
319    include_system: bool,
320) -> Result<()> {
321    let keyspaces = session.get_keyspaces().await?;
322
323    let filtered_keyspaces: Vec<_> = if include_system {
324        keyspaces.iter().collect()
325    } else {
326        keyspaces
327            .iter()
328            .filter(|ks| !is_system_keyspace(&ks.name))
329            .collect()
330    };
331
332    if filtered_keyspaces.is_empty() {
333        writeln!(writer)?;
334        writeln!(writer, "No user-defined keyspaces found.")?;
335        writeln!(writer)?;
336        return Ok(());
337    }
338
339    for ks in filtered_keyspaces {
340        // Print DESCRIBE KEYSPACE
341        describe_keyspace(session, Some(&ks.name), writer).await?;
342
343        // Print all tables in this keyspace
344        let tables = session.get_tables(&ks.name).await?;
345        for table in &tables {
346            writeln!(writer)?;
347            write_create_table(writer, table)?;
348            write_table_indexes(session, &ks.name, &table.name, writer).await?;
349        }
350    }
351
352    writeln!(writer)?;
353    Ok(())
354}
355
356/// DESCRIBE INDEX <name> — show CREATE INDEX statement.
357async fn describe_index(
358    session: &CqlSession,
359    index_spec: &str,
360    writer: &mut dyn Write,
361) -> Result<()> {
362    let (keyspace, index_name) = resolve_qualified_name(session, index_spec, writer)?;
363    let keyspace = match keyspace {
364        Some(ks) => ks,
365        None => return Ok(()),
366    };
367
368    // system_schema.indexes PK is (keyspace_name, table_name, index_name).
369    // We cannot filter on index_name without table_name, so we scan the whole
370    // keyspace partition and match in Rust.
371    let query = format!(
372        "SELECT index_name, table_name, kind, options FROM system_schema.indexes WHERE keyspace_name = '{}'",
373        keyspace.replace('\'', "''"),
374    );
375    let result = session.execute_query(&query).await?;
376
377    let row = result.rows.iter().find(|r| {
378        r.get_by_name("index_name", &result.columns)
379            .map(|v| v.to_string().to_lowercase())
380            .as_deref()
381            == Some(index_name.to_lowercase().as_str())
382    });
383
384    let row = match row {
385        Some(r) => r,
386        None => {
387            writeln!(writer, "Index '{keyspace}.{index_name}' not found.")?;
388            return Ok(());
389        }
390    };
391
392    let idx_name = row
393        .get_by_name("index_name", &result.columns)
394        .map(|v| v.to_string())
395        .unwrap_or_default();
396    let table_name = row
397        .get_by_name("table_name", &result.columns)
398        .map(|v| v.to_string())
399        .unwrap_or_default();
400    let options = row
401        .get_by_name("options", &result.columns)
402        .map(|v| v.to_string())
403        .unwrap_or_default();
404
405    // Extract target column from options map
406    // The options map contains 'target' key with the indexed column
407    let target = extract_map_value(&options, "target").unwrap_or_else(|| "unknown".to_string());
408
409    write!(
410        writer,
411        "{}",
412        format_index_ddl(&keyspace, &idx_name, &table_name, &target)
413    )?;
414    Ok(())
415}
416
417/// DESCRIBE MATERIALIZED VIEW <name> — show CREATE MATERIALIZED VIEW statement.
418async fn describe_materialized_view(
419    session: &CqlSession,
420    view_spec: &str,
421    writer: &mut dyn Write,
422) -> Result<()> {
423    let (keyspace, view_name) = resolve_qualified_name(session, view_spec, writer)?;
424    let keyspace = match keyspace {
425        Some(ks) => ks,
426        None => return Ok(()),
427    };
428
429    let query = format!(
430        "SELECT view_name, base_table_name, where_clause, include_all_columns FROM system_schema.views WHERE keyspace_name = '{}' AND view_name = '{}'",
431        keyspace.replace('\'', "''"),
432        view_name.replace('\'', "''")
433    );
434    let result = session.execute_query(&query).await?;
435
436    if result.rows.is_empty() {
437        writeln!(
438            writer,
439            "Materialized view '{keyspace}.{view_name}' not found."
440        )?;
441        return Ok(());
442    }
443
444    let row = &result.rows[0];
445    let mv_name = row
446        .get_by_name("view_name", &result.columns)
447        .map(|v| v.to_string())
448        .unwrap_or_default();
449    let base_table = row
450        .get_by_name("base_table_name", &result.columns)
451        .map(|v| v.to_string())
452        .unwrap_or_default();
453    let where_clause = row
454        .get_by_name("where_clause", &result.columns)
455        .map(|v| v.to_string())
456        .unwrap_or_else(|| "IS NOT NULL".to_string());
457    let include_all = row
458        .get_by_name("include_all_columns", &result.columns)
459        .map(|v| v.to_string() == "True")
460        .unwrap_or(false);
461
462    // Fetch columns for the view
463    // system_schema.columns is clustered by column_name, not position, so we
464    // cannot ORDER BY position in CQL — fetch all and sort in Rust.
465    let col_query = format!(
466        "SELECT column_name, type, kind, position, clustering_order FROM system_schema.columns WHERE keyspace_name = '{}' AND table_name = '{}'",
467        keyspace.replace('\'', "''"),
468        mv_name.replace('\'', "''")
469    );
470    let col_result = session.execute_query(&col_query).await?;
471
472    let mut select_columns = Vec::new();
473    let mut partition_keys: Vec<(i32, String)> = Vec::new();
474    let mut clustering_keys: Vec<(i32, String, String)> = Vec::new();
475
476    for col_row in &col_result.rows {
477        let col_name = col_row
478            .get_by_name("column_name", &col_result.columns)
479            .map(|v| v.to_string())
480            .unwrap_or_default();
481        let kind = col_row
482            .get_by_name("kind", &col_result.columns)
483            .map(|v| v.to_string())
484            .unwrap_or_default();
485        let position = col_row
486            .get_by_name("position", &col_result.columns)
487            .and_then(|v| v.to_string().parse::<i32>().ok())
488            .unwrap_or(0);
489        let clustering_order = col_row
490            .get_by_name("clustering_order", &col_result.columns)
491            .map(|v| v.to_string())
492            .unwrap_or_else(|| "none".to_string());
493
494        select_columns.push(col_name.clone());
495
496        if kind == "partition_key" {
497            partition_keys.push((position, col_name));
498        } else if kind == "clustering" {
499            clustering_keys.push((position, col_name, clustering_order));
500        }
501    }
502
503    partition_keys.sort_by_key(|k| k.0);
504    clustering_keys.sort_by_key(|k| k.0);
505
506    let sorted_pk: Vec<String> = partition_keys
507        .iter()
508        .map(|(_, name)| name.clone())
509        .collect();
510    let sorted_ck: Vec<(String, String)> = clustering_keys
511        .iter()
512        .map(|(_, name, order)| (name.clone(), order.clone()))
513        .collect();
514
515    let props_query = format!(
516        "SELECT bloom_filter_fp_chance, caching, comment, compaction, compression, \
517         crc_check_chance, default_time_to_live, gc_grace_seconds, \
518         max_index_interval, memtable_flush_period_in_ms, min_index_interval, \
519         speculative_retry \
520         FROM system_schema.views \
521         WHERE keyspace_name = '{}' AND view_name = '{}'",
522        keyspace.replace('\'', "''"),
523        mv_name.replace('\'', "''")
524    );
525    let props_result = session.execute_query(&props_query).await?;
526
527    let mut properties = std::collections::BTreeMap::new();
528    if let Some(props_row) = props_result.rows.first() {
529        let prop_names = [
530            "bloom_filter_fp_chance",
531            "caching",
532            "comment",
533            "compaction",
534            "compression",
535            "crc_check_chance",
536            "default_time_to_live",
537            "gc_grace_seconds",
538            "max_index_interval",
539            "memtable_flush_period_in_ms",
540            "min_index_interval",
541            "speculative_retry",
542        ];
543        for prop_name in &prop_names {
544            if let Some(val) = props_row.get_by_name(prop_name, &props_result.columns) {
545                properties.insert(prop_name.to_string(), val.to_string());
546            }
547        }
548    }
549
550    let parts = MvDdlParts {
551        keyspace: &keyspace,
552        view_name: &mv_name,
553        base_table: &base_table,
554        include_all,
555        select_columns: &select_columns,
556        where_clause: &where_clause,
557        partition_keys: &sorted_pk,
558        clustering_keys: &sorted_ck,
559        properties: &properties,
560    };
561    write!(writer, "{}", format_create_mv_ddl(&parts))?;
562    Ok(())
563}
564
565/// DESCRIBE TYPES — list all UDT names in the current keyspace.
566async fn describe_types(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
567    let keyspace = match session.current_keyspace() {
568        Some(ks) => ks.to_string(),
569        None => {
570            writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
571            return Ok(());
572        }
573    };
574
575    let udts = session.get_udts(&keyspace).await?;
576    if udts.is_empty() {
577        writeln!(writer)?;
578        writeln!(writer, "Keyspace '{keyspace}' has no user-defined types.")?;
579        writeln!(writer)?;
580        return Ok(());
581    }
582
583    writeln!(writer)?;
584    for udt in &udts {
585        write!(writer, "{}  ", udt.name)?;
586    }
587    writeln!(writer)?;
588    writeln!(writer)?;
589    Ok(())
590}
591
592/// DESCRIBE TYPE <name> — show CREATE TYPE statement.
593async fn describe_type(
594    session: &CqlSession,
595    type_spec: &str,
596    writer: &mut dyn Write,
597) -> Result<()> {
598    let (keyspace, type_name) = resolve_qualified_name(session, type_spec, writer)?;
599    let keyspace = match keyspace {
600        Some(ks) => ks,
601        None => return Ok(()),
602    };
603
604    let query = format!(
605        "SELECT type_name, field_names, field_types FROM system_schema.types WHERE keyspace_name = '{}' AND type_name = '{}'",
606        keyspace.replace('\'', "''"),
607        type_name.replace('\'', "''")
608    );
609    let result = session.execute_query(&query).await?;
610
611    if result.rows.is_empty() {
612        writeln!(writer, "Type '{keyspace}.{type_name}' not found.")?;
613        return Ok(());
614    }
615
616    let row = &result.rows[0];
617    let udt_name = row
618        .get_by_name("type_name", &result.columns)
619        .map(|v| v.to_string())
620        .unwrap_or_default();
621    let field_names_str = row
622        .get_by_name("field_names", &result.columns)
623        .map(|v| v.to_string())
624        .unwrap_or_default();
625    let field_types_str = row
626        .get_by_name("field_types", &result.columns)
627        .map(|v| v.to_string())
628        .unwrap_or_default();
629
630    let field_names = parse_list_value(&field_names_str);
631    let field_types = parse_list_value(&field_types_str);
632
633    let field_count = field_names.len().min(field_types.len());
634    let fields: Vec<(String, String)> = field_names
635        .into_iter()
636        .take(field_count)
637        .zip(field_types)
638        .collect();
639    write!(
640        writer,
641        "{}",
642        format_create_type_ddl(&keyspace, &udt_name, &fields)
643    )?;
644    Ok(())
645}
646
647/// DESCRIBE FUNCTIONS — list all UDF names in the current keyspace.
648async fn describe_functions(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
649    let keyspace = match session.current_keyspace() {
650        Some(ks) => ks.to_string(),
651        None => {
652            writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
653            return Ok(());
654        }
655    };
656
657    let functions = session.get_functions(&keyspace).await?;
658    if functions.is_empty() {
659        writeln!(writer)?;
660        writeln!(
661            writer,
662            "Keyspace '{keyspace}' has no user-defined functions."
663        )?;
664        writeln!(writer)?;
665        return Ok(());
666    }
667
668    writeln!(writer)?;
669    for func in &functions {
670        write!(writer, "{}  ", func.name)?;
671    }
672    writeln!(writer)?;
673    writeln!(writer)?;
674    Ok(())
675}
676
677/// DESCRIBE FUNCTION <name> — show CREATE FUNCTION statement.
678async fn describe_function(
679    session: &CqlSession,
680    func_spec: &str,
681    writer: &mut dyn Write,
682) -> Result<()> {
683    let (keyspace, func_name) = resolve_qualified_name(session, func_spec, writer)?;
684    let keyspace = match keyspace {
685        Some(ks) => ks,
686        None => return Ok(()),
687    };
688
689    let query = format!(
690        "SELECT function_name, argument_names, argument_types, return_type, language, body, called_on_null_input FROM system_schema.functions WHERE keyspace_name = '{}' AND function_name = '{}'",
691        keyspace.replace('\'', "''"),
692        func_name.replace('\'', "''")
693    );
694    let result = session.execute_query(&query).await?;
695
696    if result.rows.is_empty() {
697        writeln!(writer, "Function '{keyspace}.{func_name}' not found.")?;
698        return Ok(());
699    }
700
701    let row = &result.rows[0];
702    let fn_name = row
703        .get_by_name("function_name", &result.columns)
704        .map(|v| v.to_string())
705        .unwrap_or_default();
706    let arg_names_str = row
707        .get_by_name("argument_names", &result.columns)
708        .map(|v| v.to_string())
709        .unwrap_or_default();
710    let arg_types_str = row
711        .get_by_name("argument_types", &result.columns)
712        .map(|v| v.to_string())
713        .unwrap_or_default();
714    let return_type = row
715        .get_by_name("return_type", &result.columns)
716        .map(|v| v.to_string())
717        .unwrap_or_default();
718    let language = row
719        .get_by_name("language", &result.columns)
720        .map(|v| v.to_string())
721        .unwrap_or_default();
722    let body = row
723        .get_by_name("body", &result.columns)
724        .map(|v| v.to_string())
725        .unwrap_or_default();
726    let called_on_null = row
727        .get_by_name("called_on_null_input", &result.columns)
728        .map(|v| v.to_string() == "True")
729        .unwrap_or(false);
730
731    let arg_names = parse_list_value(&arg_names_str);
732    let arg_types = parse_list_value(&arg_types_str);
733
734    let args_str = arg_names
735        .iter()
736        .zip(arg_types.iter())
737        .map(|(name, typ)| format!("{} {}", quote_if_needed(name), typ))
738        .collect::<Vec<_>>()
739        .join(", ");
740
741    let null_handling = if called_on_null {
742        "CALLED ON NULL INPUT"
743    } else {
744        "RETURNS NULL ON NULL INPUT"
745    };
746
747    write!(
748        writer,
749        "{}",
750        format_create_function_ddl(
751            &keyspace,
752            &fn_name,
753            &args_str,
754            null_handling,
755            &return_type,
756            &language,
757            &body,
758        )
759    )?;
760    Ok(())
761}
762
763/// DESCRIBE AGGREGATES — list all UDA names in the current keyspace.
764async fn describe_aggregates(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
765    let keyspace = match session.current_keyspace() {
766        Some(ks) => ks.to_string(),
767        None => {
768            writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
769            return Ok(());
770        }
771    };
772
773    let aggregates = session.get_aggregates(&keyspace).await?;
774    if aggregates.is_empty() {
775        writeln!(writer)?;
776        writeln!(
777            writer,
778            "Keyspace '{keyspace}' has no user-defined aggregates."
779        )?;
780        writeln!(writer)?;
781        return Ok(());
782    }
783
784    writeln!(writer)?;
785    for agg in &aggregates {
786        write!(writer, "{}  ", agg.name)?;
787    }
788    writeln!(writer)?;
789    writeln!(writer)?;
790    Ok(())
791}
792
793/// DESCRIBE AGGREGATE <name> — show CREATE AGGREGATE statement.
794async fn describe_aggregate(
795    session: &CqlSession,
796    agg_spec: &str,
797    writer: &mut dyn Write,
798) -> Result<()> {
799    let (keyspace, agg_name) = resolve_qualified_name(session, agg_spec, writer)?;
800    let keyspace = match keyspace {
801        Some(ks) => ks,
802        None => return Ok(()),
803    };
804
805    let query = format!(
806        "SELECT aggregate_name, argument_types, state_func, state_type, final_func, initcond FROM system_schema.aggregates WHERE keyspace_name = '{}' AND aggregate_name = '{}'",
807        keyspace.replace('\'', "''"),
808        agg_name.replace('\'', "''")
809    );
810    let result = session.execute_query(&query).await?;
811
812    if result.rows.is_empty() {
813        writeln!(writer, "Aggregate '{keyspace}.{agg_name}' not found.")?;
814        return Ok(());
815    }
816
817    let row = &result.rows[0];
818    let ag_name = row
819        .get_by_name("aggregate_name", &result.columns)
820        .map(|v| v.to_string())
821        .unwrap_or_default();
822    let arg_types_str = row
823        .get_by_name("argument_types", &result.columns)
824        .map(|v| v.to_string())
825        .unwrap_or_default();
826    let state_func = row
827        .get_by_name("state_func", &result.columns)
828        .map(|v| v.to_string())
829        .unwrap_or_default();
830    let state_type = row
831        .get_by_name("state_type", &result.columns)
832        .map(|v| v.to_string())
833        .unwrap_or_default();
834    let final_func = row
835        .get_by_name("final_func", &result.columns)
836        .map(|v| v.to_string());
837    let initcond = row
838        .get_by_name("initcond", &result.columns)
839        .map(|v| v.to_string());
840
841    let arg_types = parse_list_value(&arg_types_str);
842    let args_str = arg_types.join(", ");
843
844    write!(
845        writer,
846        "{}",
847        format_create_aggregate_ddl(
848            &keyspace,
849            &ag_name,
850            &args_str,
851            &state_func,
852            &state_type,
853            final_func.as_deref(),
854            initcond.as_deref(),
855        )
856    )?;
857    Ok(())
858}
859
860/// Write a CREATE TABLE statement for the given table metadata.
861fn write_create_table(writer: &mut dyn Write, meta: &crate::driver::TableMetadata) -> Result<()> {
862    writeln!(
863        writer,
864        "CREATE TABLE {}.{} (",
865        quote_if_needed(&meta.keyspace),
866        quote_if_needed(&meta.name)
867    )?;
868
869    // Print columns
870    for col in &meta.columns {
871        writeln!(
872            writer,
873            "    {} {},",
874            quote_if_needed(&col.name),
875            col.type_name
876        )?;
877    }
878
879    // Print PRIMARY KEY
880    if !meta.partition_key.is_empty() {
881        let pk_str = if meta.partition_key.len() == 1 {
882            quote_if_needed(&meta.partition_key[0])
883        } else {
884            format!(
885                "({})",
886                meta.partition_key
887                    .iter()
888                    .map(|k| quote_if_needed(k))
889                    .collect::<Vec<_>>()
890                    .join(", ")
891            )
892        };
893
894        if meta.clustering_key.is_empty() {
895            writeln!(writer, "    PRIMARY KEY ({pk_str})")?;
896        } else {
897            let ck_str = meta
898                .clustering_key
899                .iter()
900                .map(|k| quote_if_needed(k))
901                .collect::<Vec<_>>()
902                .join(", ");
903            writeln!(writer, "    PRIMARY KEY ({pk_str}, {ck_str})")?;
904        }
905    }
906
907    writeln!(writer, ");")?;
908    Ok(())
909}
910
911/// Fetch indexes for a table from system_schema.indexes and write CREATE INDEX statements.
912async fn write_table_indexes(
913    session: &CqlSession,
914    keyspace: &str,
915    table_name: &str,
916    writer: &mut dyn Write,
917) -> Result<()> {
918    let query = format!(
919        "SELECT index_name, table_name, kind, options FROM system_schema.indexes WHERE keyspace_name = '{}' AND table_name = '{}'",
920        keyspace.replace('\'', "''"),
921        table_name.replace('\'', "''"),
922    );
923    let result = session.execute_query(&query).await?;
924
925    for row in &result.rows {
926        let idx_name = row
927            .get_by_name("index_name", &result.columns)
928            .map(|v| v.to_string())
929            .unwrap_or_default();
930        let tbl_name = row
931            .get_by_name("table_name", &result.columns)
932            .map(|v| v.to_string())
933            .unwrap_or_default();
934        let options = row
935            .get_by_name("options", &result.columns)
936            .map(|v| v.to_string())
937            .unwrap_or_default();
938
939        let target = extract_map_value(&options, "target").unwrap_or_else(|| "unknown".to_string());
940
941        writeln!(
942            writer,
943            "CREATE INDEX {} ON {}.{} ({});",
944            quote_if_needed(&idx_name),
945            quote_if_needed(keyspace),
946            quote_if_needed(&tbl_name),
947            target
948        )?;
949    }
950
951    Ok(())
952}
953
954/// Resolve a potentially qualified name (ks.name or just name) into (keyspace, name).
955///
956/// If no keyspace prefix is given, uses the session's current keyspace.
957/// Returns `(None, name)` if no keyspace can be determined (and prints an error).
958fn resolve_qualified_name(
959    session: &CqlSession,
960    spec: &str,
961    writer: &mut dyn Write,
962) -> Result<(Option<String>, String)> {
963    if spec.contains('.') {
964        let parts: Vec<&str> = spec.splitn(2, '.').collect();
965        Ok((Some(parts[0].to_string()), parts[1].to_string()))
966    } else {
967        match session.current_keyspace() {
968            Some(ks) => Ok((Some(ks.to_string()), spec.to_string())),
969            None => {
970                writeln!(
971                    writer,
972                    "No keyspace selected. Use a fully qualified name (keyspace.name) or USE <keyspace> first."
973                )?;
974                Ok((None, spec.to_string()))
975            }
976        }
977    }
978}
979
980/// Parse a CQL list string like `['a', 'b', 'c']` or `[a, b, c]` into a Vec of strings.
981fn parse_list_value(s: &str) -> Vec<String> {
982    let trimmed = s.trim();
983    // Handle empty list
984    if trimmed == "[]" || trimmed.is_empty() {
985        return Vec::new();
986    }
987    // Strip surrounding brackets
988    let inner = if trimmed.starts_with('[') && trimmed.ends_with(']') {
989        &trimmed[1..trimmed.len() - 1]
990    } else {
991        trimmed
992    };
993    if inner.trim().is_empty() {
994        return Vec::new();
995    }
996    inner
997        .split(',')
998        .map(|s| {
999            let s = s.trim();
1000            // Strip surrounding quotes
1001            if (s.starts_with('\'') && s.ends_with('\''))
1002                || (s.starts_with('"') && s.ends_with('"'))
1003            {
1004                s[1..s.len() - 1].to_string()
1005            } else {
1006                s.to_string()
1007            }
1008        })
1009        .collect()
1010}
1011
1012/// Extract a value from a CQL map string like `{'key': 'value', ...}`.
1013fn extract_map_value(map_str: &str, key: &str) -> Option<String> {
1014    let trimmed = map_str.trim();
1015    // Strip surrounding braces
1016    let inner = if trimmed.starts_with('{') && trimmed.ends_with('}') {
1017        &trimmed[1..trimmed.len() - 1]
1018    } else {
1019        trimmed
1020    };
1021
1022    // Simple parsing: split on commas, then on ':'
1023    for entry in inner.split(',') {
1024        let parts: Vec<&str> = entry.splitn(2, ':').collect();
1025        if parts.len() == 2 {
1026            let k = parts[0].trim().trim_matches('\'').trim_matches('"');
1027            let v = parts[1].trim().trim_matches('\'').trim_matches('"');
1028            if k == key {
1029                return Some(v.to_string());
1030            }
1031        }
1032    }
1033    None
1034}
1035
1036/// Check if a keyspace is a system keyspace.
1037fn is_system_keyspace(name: &str) -> bool {
1038    name.starts_with("system")
1039        || name == "dse_system"
1040        || name == "dse_perf"
1041        || name == "dse_security"
1042        || name == "dse_leases"
1043        || name == "dse_system_local"
1044        || name == "dse_insights"
1045        || name == "solr_admin"
1046}
1047
1048/// Quote an identifier if it needs quoting (contains uppercase, spaces, or reserved words).
1049fn quote_if_needed(name: &str) -> String {
1050    // Simple heuristic: quote if not all lowercase alphanumeric + underscore
1051    if name
1052        .chars()
1053        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
1054        && !name.is_empty()
1055        && !name.starts_with(|c: char| c.is_ascii_digit())
1056    {
1057        name.to_string()
1058    } else {
1059        format!("\"{}\"", name.replace('"', "\"\""))
1060    }
1061}
1062
1063/// Strip surrounding single or double quotes from a string.
1064fn strip_quotes(s: &str) -> &str {
1065    if (s.starts_with('"') && s.ends_with('"')) || (s.starts_with('\'') && s.ends_with('\'')) {
1066        &s[1..s.len() - 1]
1067    } else {
1068        s
1069    }
1070}
1071
1072// --- Pure DDL formatters (testable without a session) ---
1073
1074/// Format a CREATE INDEX DDL string.
1075fn format_index_ddl(keyspace: &str, index_name: &str, table_name: &str, target: &str) -> String {
1076    format!(
1077        "\nCREATE INDEX {} ON {}.{} ({});\n\n",
1078        quote_if_needed(index_name),
1079        quote_if_needed(keyspace),
1080        quote_if_needed(table_name),
1081        target
1082    )
1083}
1084
1085/// Format a CREATE TYPE DDL string.
1086fn format_create_type_ddl(keyspace: &str, type_name: &str, fields: &[(String, String)]) -> String {
1087    let mut out = String::new();
1088    out.push('\n');
1089    out.push_str(&format!(
1090        "CREATE TYPE {}.{} (\n",
1091        quote_if_needed(keyspace),
1092        quote_if_needed(type_name)
1093    ));
1094    let field_count = fields.len();
1095    for (i, (name, typ)) in fields.iter().enumerate() {
1096        let comma = if i < field_count - 1 { "," } else { "" };
1097        out.push_str(&format!("    {} {}{}\n", quote_if_needed(name), typ, comma));
1098    }
1099    out.push_str(");\n\n");
1100    out
1101}
1102
1103/// Format a CREATE FUNCTION DDL string.
1104fn format_create_function_ddl(
1105    keyspace: &str,
1106    func_name: &str,
1107    args_str: &str,
1108    null_handling: &str,
1109    return_type: &str,
1110    language: &str,
1111    body: &str,
1112) -> String {
1113    format!(
1114        "\nCREATE OR REPLACE FUNCTION {}.{} ({})\n    {}\n    RETURNS {}\n    LANGUAGE {}\n    AS $$ {} $$;\n\n",
1115        quote_if_needed(keyspace),
1116        quote_if_needed(func_name),
1117        args_str,
1118        null_handling,
1119        return_type,
1120        language,
1121        body
1122    )
1123}
1124
1125/// Format a CREATE AGGREGATE DDL string.
1126fn format_create_aggregate_ddl(
1127    keyspace: &str,
1128    agg_name: &str,
1129    args_str: &str,
1130    state_func: &str,
1131    state_type: &str,
1132    final_func: Option<&str>,
1133    initcond: Option<&str>,
1134) -> String {
1135    let mut out = format!(
1136        "\nCREATE OR REPLACE AGGREGATE {}.{} ({})\n    SFUNC {}\n    STYPE {}",
1137        quote_if_needed(keyspace),
1138        quote_if_needed(agg_name),
1139        args_str,
1140        state_func,
1141        state_type
1142    );
1143    if let Some(ff) = final_func {
1144        if !ff.is_empty() && ff != "null" {
1145            out.push_str(&format!("\n    FINALFUNC {ff}"));
1146        }
1147    }
1148    if let Some(ic) = initcond {
1149        if !ic.is_empty() && ic != "null" {
1150            out.push_str(&format!("\n    INITCOND {ic}"));
1151        }
1152    }
1153    out.push_str("\n;\n\n");
1154    out
1155}
1156
1157/// Parts needed to format a CREATE MATERIALIZED VIEW DDL string.
1158struct MvDdlParts<'a> {
1159    keyspace: &'a str,
1160    view_name: &'a str,
1161    base_table: &'a str,
1162    include_all: bool,
1163    select_columns: &'a [String],
1164    where_clause: &'a str,
1165    partition_keys: &'a [String],            // sorted by position
1166    clustering_keys: &'a [(String, String)], // (name, order), sorted by position
1167    properties: &'a std::collections::BTreeMap<String, String>,
1168}
1169
1170/// Format a CREATE MATERIALIZED VIEW DDL string.
1171fn format_create_mv_ddl(parts: &MvDdlParts<'_>) -> String {
1172    let mut out = String::new();
1173    out.push('\n');
1174    out.push_str(&format!(
1175        "CREATE MATERIALIZED VIEW {}.{} AS\n",
1176        quote_if_needed(parts.keyspace),
1177        quote_if_needed(parts.view_name)
1178    ));
1179
1180    let columns_str = if parts.include_all {
1181        "*".to_string()
1182    } else {
1183        parts
1184            .select_columns
1185            .iter()
1186            .map(|c| quote_if_needed(c))
1187            .collect::<Vec<_>>()
1188            .join(", ")
1189    };
1190    out.push_str(&format!("    SELECT {columns_str}\n"));
1191    out.push_str(&format!(
1192        "    FROM {}.{}\n",
1193        quote_if_needed(parts.keyspace),
1194        quote_if_needed(parts.base_table)
1195    ));
1196    out.push_str(&format!("    WHERE {}\n", parts.where_clause));
1197
1198    let pk_str = if parts.partition_keys.len() == 1 {
1199        quote_if_needed(&parts.partition_keys[0])
1200    } else {
1201        format!(
1202            "({})",
1203            parts
1204                .partition_keys
1205                .iter()
1206                .map(|k| quote_if_needed(k))
1207                .collect::<Vec<_>>()
1208                .join(", ")
1209        )
1210    };
1211
1212    if parts.clustering_keys.is_empty() {
1213        out.push_str(&format!("    PRIMARY KEY ({pk_str})\n"));
1214    } else {
1215        let ck_str = parts
1216            .clustering_keys
1217            .iter()
1218            .map(|(name, _)| quote_if_needed(name))
1219            .collect::<Vec<_>>()
1220            .join(", ");
1221        out.push_str(&format!("    PRIMARY KEY ({pk_str}, {ck_str})\n"));
1222    }
1223
1224    // Python cqlsh always emits CLUSTERING ORDER BY for MVs (even when all ASC)
1225    let mut first_with = true;
1226    if !parts.clustering_keys.is_empty() {
1227        let order_str = parts
1228            .clustering_keys
1229            .iter()
1230            .map(|(name, order)| format!("{} {}", quote_if_needed(name), order.to_uppercase()))
1231            .collect::<Vec<_>>()
1232            .join(", ");
1233        out.push_str(&format!("    WITH CLUSTERING ORDER BY ({order_str})"));
1234        first_with = false;
1235    }
1236
1237    let prop_order = [
1238        "bloom_filter_fp_chance",
1239        "caching",
1240        "comment",
1241        "compaction",
1242        "compression",
1243        "crc_check_chance",
1244        "default_time_to_live",
1245        "gc_grace_seconds",
1246        "max_index_interval",
1247        "memtable_flush_period_in_ms",
1248        "min_index_interval",
1249        "speculative_retry",
1250    ];
1251
1252    for prop_name in &prop_order {
1253        if let Some(value) = parts.properties.get(*prop_name) {
1254            let formatted_value = format_property_value(prop_name, value);
1255            if first_with {
1256                out.push_str(&format!("    WITH {} = {}", prop_name, formatted_value));
1257                first_with = false;
1258            } else {
1259                out.push_str(&format!("\n    AND {} = {}", prop_name, formatted_value));
1260            }
1261        }
1262    }
1263
1264    out.push_str(";\n");
1265
1266    out.push('\n');
1267    out
1268}
1269
1270/// Format a table/MV property value for DDL output.
1271///
1272/// String-like properties (comment, speculative_retry) are single-quoted;
1273/// map-like properties (caching, compaction, compression) are emitted as-is.
1274fn format_property_value(name: &str, value: &str) -> String {
1275    match name {
1276        "comment" | "speculative_retry" => format!("'{}'", value.replace('\'', "''")),
1277        "caching" | "compaction" | "compression" => value.to_string(),
1278        _ => value.to_string(),
1279    }
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284    use super::*;
1285
1286    #[test]
1287    fn system_keyspace_detection() {
1288        assert!(is_system_keyspace("system"));
1289        assert!(is_system_keyspace("system_schema"));
1290        assert!(is_system_keyspace("system_auth"));
1291        assert!(is_system_keyspace("system_traces"));
1292        assert!(!is_system_keyspace("my_keyspace"));
1293        assert!(!is_system_keyspace("users"));
1294    }
1295
1296    #[test]
1297    fn quote_simple_identifier() {
1298        assert_eq!(quote_if_needed("users"), "users");
1299        assert_eq!(quote_if_needed("my_table"), "my_table");
1300    }
1301
1302    #[test]
1303    fn quote_mixed_case_identifier() {
1304        assert_eq!(quote_if_needed("MyTable"), "\"MyTable\"");
1305    }
1306
1307    #[test]
1308    fn quote_identifier_with_spaces() {
1309        assert_eq!(quote_if_needed("my table"), "\"my table\"");
1310    }
1311
1312    #[test]
1313    fn quote_identifier_starting_with_digit() {
1314        assert_eq!(quote_if_needed("1table"), "\"1table\"");
1315    }
1316
1317    #[test]
1318    fn strip_quotes_test() {
1319        assert_eq!(strip_quotes("\"hello\""), "hello");
1320        assert_eq!(strip_quotes("'hello'"), "hello");
1321        assert_eq!(strip_quotes("hello"), "hello");
1322    }
1323
1324    #[test]
1325    fn parse_list_value_test() {
1326        assert_eq!(parse_list_value("[]"), Vec::<String>::new());
1327        assert_eq!(parse_list_value(""), Vec::<String>::new());
1328        assert_eq!(parse_list_value("['a', 'b', 'c']"), vec!["a", "b", "c"]);
1329        assert_eq!(
1330            parse_list_value("[int, text, uuid]"),
1331            vec!["int", "text", "uuid"]
1332        );
1333    }
1334
1335    #[test]
1336    fn extract_map_value_test() {
1337        assert_eq!(
1338            extract_map_value("{'target': 'email', 'class_name': 'foo'}", "target"),
1339            Some("email".to_string())
1340        );
1341        assert_eq!(extract_map_value("{'target': 'email'}", "missing"), None);
1342    }
1343
1344    #[test]
1345    fn write_create_table_simple() {
1346        use crate::driver::{ColumnMetadata, TableMetadata};
1347
1348        let meta = TableMetadata {
1349            keyspace: "test_ks".to_string(),
1350            name: "users".to_string(),
1351            columns: vec![
1352                ColumnMetadata {
1353                    name: "id".to_string(),
1354                    type_name: "uuid".to_string(),
1355                },
1356                ColumnMetadata {
1357                    name: "name".to_string(),
1358                    type_name: "text".to_string(),
1359                },
1360                ColumnMetadata {
1361                    name: "age".to_string(),
1362                    type_name: "int".to_string(),
1363                },
1364            ],
1365            partition_key: vec!["id".to_string()],
1366            clustering_key: vec![],
1367        };
1368
1369        let mut buf = Vec::new();
1370        write_create_table(&mut buf, &meta).unwrap();
1371        let output = String::from_utf8(buf).unwrap();
1372        assert!(output.contains("CREATE TABLE test_ks.users"));
1373        assert!(output.contains("id uuid"));
1374        assert!(output.contains("name text"));
1375        assert!(output.contains("PRIMARY KEY (id)"));
1376    }
1377
1378    #[test]
1379    fn write_create_table_composite_key() {
1380        use crate::driver::{ColumnMetadata, TableMetadata};
1381
1382        let meta = TableMetadata {
1383            keyspace: "ks".to_string(),
1384            name: "events".to_string(),
1385            columns: vec![
1386                ColumnMetadata {
1387                    name: "user_id".to_string(),
1388                    type_name: "uuid".to_string(),
1389                },
1390                ColumnMetadata {
1391                    name: "event_time".to_string(),
1392                    type_name: "timestamp".to_string(),
1393                },
1394                ColumnMetadata {
1395                    name: "data".to_string(),
1396                    type_name: "text".to_string(),
1397                },
1398            ],
1399            partition_key: vec!["user_id".to_string()],
1400            clustering_key: vec!["event_time".to_string()],
1401        };
1402
1403        let mut buf = Vec::new();
1404        write_create_table(&mut buf, &meta).unwrap();
1405        let output = String::from_utf8(buf).unwrap();
1406        assert!(output.contains("PRIMARY KEY (user_id, event_time)"));
1407    }
1408
1409    #[test]
1410    fn write_create_table_compound_partition_key() {
1411        use crate::driver::{ColumnMetadata, TableMetadata};
1412
1413        let meta = TableMetadata {
1414            keyspace: "ks".to_string(),
1415            name: "metrics".to_string(),
1416            columns: vec![
1417                ColumnMetadata {
1418                    name: "host".to_string(),
1419                    type_name: "text".to_string(),
1420                },
1421                ColumnMetadata {
1422                    name: "metric".to_string(),
1423                    type_name: "text".to_string(),
1424                },
1425                ColumnMetadata {
1426                    name: "ts".to_string(),
1427                    type_name: "timestamp".to_string(),
1428                },
1429                ColumnMetadata {
1430                    name: "value".to_string(),
1431                    type_name: "double".to_string(),
1432                },
1433            ],
1434            partition_key: vec!["host".to_string(), "metric".to_string()],
1435            clustering_key: vec!["ts".to_string()],
1436        };
1437
1438        let mut buf = Vec::new();
1439        write_create_table(&mut buf, &meta).unwrap();
1440        let output = String::from_utf8(buf).unwrap();
1441        assert!(output.contains("PRIMARY KEY ((host, metric), ts)"));
1442    }
1443
1444    // --- DDL formatter tests ---
1445
1446    #[test]
1447    fn format_index_ddl_simple() {
1448        let ddl = format_index_ddl("my_ks", "email_idx", "users", "email");
1449        assert!(ddl.contains("CREATE INDEX email_idx ON my_ks.users (email);"));
1450    }
1451
1452    #[test]
1453    fn format_index_ddl_quoted_names() {
1454        let ddl = format_index_ddl("MyKs", "MyIdx", "MyTable", "email");
1455        assert!(ddl.contains("\"MyKs\""));
1456        assert!(ddl.contains("\"MyIdx\""));
1457        assert!(ddl.contains("\"MyTable\""));
1458    }
1459
1460    #[test]
1461    fn format_create_type_ddl_single_field() {
1462        let fields = vec![("street".to_string(), "text".to_string())];
1463        let ddl = format_create_type_ddl("ks1", "address", &fields);
1464        assert!(ddl.contains("CREATE TYPE ks1.address ("));
1465        assert!(ddl.contains("street text"));
1466        assert!(ddl.contains(");"));
1467    }
1468
1469    #[test]
1470    fn format_create_type_ddl_multiple_fields() {
1471        let fields = vec![
1472            ("street".to_string(), "text".to_string()),
1473            ("city".to_string(), "text".to_string()),
1474            ("zip".to_string(), "int".to_string()),
1475        ];
1476        let ddl = format_create_type_ddl("ks1", "address", &fields);
1477        assert!(
1478            ddl.contains("street text,"),
1479            "expected trailing comma: {ddl}"
1480        );
1481        assert!(ddl.contains("city text,"), "expected trailing comma: {ddl}");
1482        // last field has no trailing comma
1483        assert!(
1484            !ddl.contains("int,"),
1485            "last field should not have comma: {ddl}"
1486        );
1487    }
1488
1489    #[test]
1490    fn format_create_function_ddl_called_on_null() {
1491        let ddl = format_create_function_ddl(
1492            "ks1",
1493            "add_one",
1494            "val int",
1495            "CALLED ON NULL INPUT",
1496            "int",
1497            "java",
1498            "return val + 1;",
1499        );
1500        assert!(ddl.contains("CREATE OR REPLACE FUNCTION ks1.add_one (val int)"));
1501        assert!(ddl.contains("CALLED ON NULL INPUT"));
1502        assert!(ddl.contains("RETURNS int"));
1503        assert!(ddl.contains("LANGUAGE java"));
1504        assert!(ddl.contains("AS $$ return val + 1; $$;"));
1505    }
1506
1507    #[test]
1508    fn format_create_function_ddl_returns_null() {
1509        let ddl = format_create_function_ddl(
1510            "ks1",
1511            "my_func",
1512            "x text",
1513            "RETURNS NULL ON NULL INPUT",
1514            "text",
1515            "lua",
1516            "return x",
1517        );
1518        assert!(ddl.contains("RETURNS NULL ON NULL INPUT"));
1519        assert!(!ddl.contains("CALLED ON NULL INPUT"));
1520    }
1521
1522    #[test]
1523    fn format_create_aggregate_ddl_minimal() {
1524        let ddl =
1525            format_create_aggregate_ddl("ks1", "my_sum", "int", "state_add", "int", None, None);
1526        assert!(ddl.contains("CREATE OR REPLACE AGGREGATE ks1.my_sum (int)"));
1527        assert!(ddl.contains("SFUNC state_add"));
1528        assert!(ddl.contains("STYPE int"));
1529        assert!(!ddl.contains("FINALFUNC"));
1530        assert!(!ddl.contains("INITCOND"));
1531        assert!(ddl.contains(';'));
1532    }
1533
1534    #[test]
1535    fn format_create_aggregate_ddl_with_optional() {
1536        let ddl = format_create_aggregate_ddl(
1537            "ks1",
1538            "my_avg",
1539            "int",
1540            "state_avg",
1541            "tuple<int,int>",
1542            Some("final_avg"),
1543            Some("0"),
1544        );
1545        assert!(ddl.contains("FINALFUNC final_avg"));
1546        assert!(ddl.contains("INITCOND 0"));
1547    }
1548
1549    #[test]
1550    fn format_create_aggregate_ddl_empty_optional_skipped() {
1551        let ddl = format_create_aggregate_ddl(
1552            "ks1",
1553            "my_agg",
1554            "int",
1555            "sf",
1556            "int",
1557            Some(""),
1558            Some("null"),
1559        );
1560        assert!(
1561            !ddl.contains("FINALFUNC"),
1562            "empty FINALFUNC should be omitted: {ddl}"
1563        );
1564        assert!(
1565            !ddl.contains("INITCOND"),
1566            "'null' INITCOND should be omitted: {ddl}"
1567        );
1568    }
1569
1570    #[test]
1571    fn format_create_mv_ddl_simple() {
1572        let cols = vec!["id".to_string(), "email".to_string()];
1573        let properties = std::collections::BTreeMap::new();
1574        let parts = MvDdlParts {
1575            keyspace: "ks1",
1576            view_name: "user_by_email",
1577            base_table: "users",
1578            include_all: false,
1579            select_columns: &cols,
1580            where_clause: "email IS NOT NULL",
1581            partition_keys: &["email".to_string()],
1582            clustering_keys: &[],
1583            properties: &properties,
1584        };
1585        let ddl = format_create_mv_ddl(&parts);
1586        assert!(ddl.contains("CREATE MATERIALIZED VIEW ks1.user_by_email AS"));
1587        assert!(ddl.contains("SELECT id, email"));
1588        assert!(ddl.contains("FROM ks1.users"));
1589        assert!(ddl.contains("WHERE email IS NOT NULL"));
1590        assert!(ddl.contains("PRIMARY KEY (email)"));
1591    }
1592
1593    #[test]
1594    fn format_create_mv_ddl_include_all() {
1595        let properties = std::collections::BTreeMap::new();
1596        let parts = MvDdlParts {
1597            keyspace: "ks1",
1598            view_name: "mv_all",
1599            base_table: "base",
1600            include_all: true,
1601            select_columns: &["id".to_string()],
1602            where_clause: "id IS NOT NULL",
1603            partition_keys: &["id".to_string()],
1604            clustering_keys: &[],
1605            properties: &properties,
1606        };
1607        let ddl = format_create_mv_ddl(&parts);
1608        assert!(
1609            ddl.contains("SELECT *"),
1610            "include_all should emit SELECT *: {ddl}"
1611        );
1612    }
1613
1614    #[test]
1615    fn format_create_mv_ddl_with_clustering_desc() {
1616        let cols = vec!["user_id".to_string(), "ts".to_string()];
1617        let ck = vec![("ts".to_string(), "DESC".to_string())];
1618        let properties = std::collections::BTreeMap::new();
1619        let parts = MvDdlParts {
1620            keyspace: "ks1",
1621            view_name: "mv_ordered",
1622            base_table: "events",
1623            include_all: false,
1624            select_columns: &cols,
1625            where_clause: "ts IS NOT NULL",
1626            partition_keys: &["user_id".to_string()],
1627            clustering_keys: &ck,
1628            properties: &properties,
1629        };
1630        let ddl = format_create_mv_ddl(&parts);
1631        assert!(ddl.contains("PRIMARY KEY (user_id, ts)"));
1632        assert!(ddl.contains("WITH CLUSTERING ORDER BY (ts DESC)"));
1633    }
1634
1635    #[test]
1636    fn format_create_mv_ddl_with_properties() {
1637        let cols = vec!["state".to_string(), "username".to_string()];
1638        let ck = vec![("username".to_string(), "ASC".to_string())];
1639        let mut properties = std::collections::BTreeMap::new();
1640        properties.insert("bloom_filter_fp_chance".to_string(), "0.01".to_string());
1641        properties.insert("comment".to_string(), "".to_string());
1642        properties.insert("gc_grace_seconds".to_string(), "864000".to_string());
1643        let parts = MvDdlParts {
1644            keyspace: "test",
1645            view_name: "users_by_state",
1646            base_table: "users",
1647            include_all: true,
1648            select_columns: &cols,
1649            where_clause: "state IS NOT null AND username IS NOT null",
1650            partition_keys: &["state".to_string()],
1651            clustering_keys: &ck,
1652            properties: &properties,
1653        };
1654        let ddl = format_create_mv_ddl(&parts);
1655        assert!(
1656            ddl.contains("WITH CLUSTERING ORDER BY (username ASC)"),
1657            "should always emit CLUSTERING ORDER BY for MVs: {ddl}"
1658        );
1659        assert!(ddl.contains("AND bloom_filter_fp_chance = 0.01"));
1660        assert!(ddl.contains("AND comment = ''"));
1661        assert!(ddl.contains("AND gc_grace_seconds = 864000"));
1662    }
1663}