Skip to main content

cqlsh_rs/
describe.rs

1//! DESCRIBE command implementations for cqlsh-rs.
2//!
3//! Provides schema introspection commands matching Python cqlsh:
4//! - DESCRIBE CLUSTER
5//! - DESCRIBE KEYSPACES
6//! - DESCRIBE KEYSPACE [name]
7//! - DESCRIBE TABLES
8//! - DESCRIBE TABLE <name>
9//! - DESCRIBE SCHEMA
10//! - DESCRIBE FULL SCHEMA
11//! - DESCRIBE INDEX <name>
12//! - DESCRIBE MATERIALIZED VIEW <name>
13//! - DESCRIBE TYPE <name> / DESCRIBE TYPES
14//! - DESCRIBE FUNCTION <name> / DESCRIBE FUNCTIONS
15//! - DESCRIBE AGGREGATE <name> / DESCRIBE AGGREGATES
16
17use std::io::Write;
18
19use anyhow::Result;
20
21use crate::session::CqlSession;
22
23/// Execute a DESCRIBE command and write output.
24///
25/// Parses the DESCRIBE subcommand and dispatches to the appropriate handler.
26pub async fn execute(session: &CqlSession, args: &str, writer: &mut dyn Write) -> Result<()> {
27    let args = args.trim();
28    let upper = args.to_uppercase();
29
30    // DESCRIBE with no args — show help
31    if args.is_empty() {
32        writeln!(
33            writer,
34            "Usage: DESCRIBE [CLUSTER | KEYSPACES | KEYSPACE [name] | TABLES | TABLE <name> | SCHEMA | FULL SCHEMA | INDEX <name> | MATERIALIZED VIEW <name> | TYPES | TYPE <name> | FUNCTIONS | FUNCTION <name> | AGGREGATES | AGGREGATE <name>]"
35        )?;
36        return Ok(());
37    }
38
39    if upper == "CLUSTER" {
40        describe_cluster(session, writer).await
41    } else if upper == "KEYSPACES" {
42        describe_keyspaces(session, writer).await
43    } else if upper == "TABLES" {
44        describe_tables(session, writer).await
45    } else if upper == "FULL SCHEMA" {
46        describe_full_schema(session, writer).await
47    } else if upper == "SCHEMA" {
48        describe_schema(session, writer).await
49    } else if upper == "KEYSPACE" {
50        // DESCRIBE KEYSPACE with no name → current keyspace
51        describe_keyspace(session, session.current_keyspace(), writer).await
52    } else if upper.starts_with("KEYSPACE ") {
53        // DESCRIBE KEYSPACE <name>
54        let ks_name = args["KEYSPACE ".len()..].trim();
55        let ks_name = strip_quotes(ks_name);
56        describe_keyspace(session, Some(ks_name), writer).await
57    } else if upper == "TABLE" {
58        writeln!(writer, "DESCRIBE TABLE requires a table name.")?;
59        Ok(())
60    } else if upper.starts_with("TABLE ") {
61        // DESCRIBE TABLE <name>
62        let table_spec = args["TABLE ".len()..].trim();
63        let table_spec = strip_quotes(table_spec);
64        describe_table(session, table_spec, writer).await
65    } else if upper == "INDEX" {
66        writeln!(writer, "DESCRIBE INDEX requires an index name.")?;
67        Ok(())
68    } else if upper.starts_with("INDEX ") {
69        let index_spec = args["INDEX ".len()..].trim();
70        let index_spec = strip_quotes(index_spec);
71        describe_index(session, index_spec, writer).await
72    } else if upper == "MATERIALIZED VIEW" {
73        writeln!(writer, "DESCRIBE MATERIALIZED VIEW requires a view name.")?;
74        Ok(())
75    } else if upper.starts_with("MATERIALIZED VIEW ") {
76        let view_spec = args["MATERIALIZED VIEW ".len()..].trim();
77        let view_spec = strip_quotes(view_spec);
78        describe_materialized_view(session, view_spec, writer).await
79    } else if upper == "TYPES" {
80        describe_types(session, writer).await
81    } else if upper == "TYPE" {
82        writeln!(writer, "DESCRIBE TYPE requires a type name.")?;
83        Ok(())
84    } else if upper.starts_with("TYPE ") {
85        let type_spec = args["TYPE ".len()..].trim();
86        let type_spec = strip_quotes(type_spec);
87        describe_type(session, type_spec, writer).await
88    } else if upper == "FUNCTIONS" {
89        describe_functions(session, writer).await
90    } else if upper == "FUNCTION" {
91        writeln!(writer, "DESCRIBE FUNCTION requires a function name.")?;
92        Ok(())
93    } else if upper.starts_with("FUNCTION ") {
94        let func_spec = args["FUNCTION ".len()..].trim();
95        let func_spec = strip_quotes(func_spec);
96        describe_function(session, func_spec, writer).await
97    } else if upper == "AGGREGATES" {
98        describe_aggregates(session, writer).await
99    } else if upper == "AGGREGATE" {
100        writeln!(writer, "DESCRIBE AGGREGATE requires an aggregate name.")?;
101        Ok(())
102    } else if upper.starts_with("AGGREGATE ") {
103        let agg_spec = args["AGGREGATE ".len()..].trim();
104        let agg_spec = strip_quotes(agg_spec);
105        describe_aggregate(session, agg_spec, writer).await
106    } else {
107        // Generic DESCRIBE <name> — resolve across all object types.
108        // Resolution order matches Python cqlsh: keyspace, table, index, MV, type,
109        // function, aggregate.
110        let name = strip_quotes(args);
111        describe_object(session, name, writer).await
112    }
113}
114
115/// Resolve a bare or qualified name across all schema object types.
116///
117/// Resolution order (matching Python cqlsh): keyspace → table → index →
118/// materialized view → type → function → aggregate.
119async fn describe_object(session: &CqlSession, name: &str, writer: &mut dyn Write) -> Result<()> {
120    if name.contains('.') {
121        return describe_qualified_object(session, name, writer).await;
122    }
123
124    let keyspaces = session.get_keyspaces().await?;
125    if keyspaces.iter().any(|ks| ks.name == name) {
126        return describe_keyspace(session, Some(name), writer).await;
127    }
128
129    let ks = match session.current_keyspace() {
130        Some(ks) => ks.to_string(),
131        None => {
132            writeln!(writer, "'{name}' not found in schema.")?;
133            return Ok(());
134        }
135    };
136
137    if object_exists_in(session, &ks, name, "tables", "table_name").await? {
138        return describe_table(session, name, writer).await;
139    }
140    if object_exists_in(session, &ks, name, "indexes", "index_name").await? {
141        return describe_index(session, name, writer).await;
142    }
143    if object_exists_in(session, &ks, name, "views", "view_name").await? {
144        return describe_materialized_view(session, name, writer).await;
145    }
146    if object_exists_in(session, &ks, name, "types", "type_name").await? {
147        return describe_type(session, name, writer).await;
148    }
149    if object_exists_in(session, &ks, name, "functions", "function_name").await? {
150        return describe_function(session, name, writer).await;
151    }
152    if object_exists_in(session, &ks, name, "aggregates", "aggregate_name").await? {
153        return describe_aggregate(session, name, writer).await;
154    }
155
156    writeln!(writer, "'{name}' not found in keyspace '{ks}'.")?;
157    Ok(())
158}
159
160/// Resolve a qualified name (`ks.obj`) across all schema object types.
161async fn describe_qualified_object(
162    session: &CqlSession,
163    spec: &str,
164    writer: &mut dyn Write,
165) -> Result<()> {
166    let parts: Vec<&str> = spec.splitn(2, '.').collect();
167    let ks = parts[0];
168    let obj = parts[1];
169
170    if object_exists_in(session, ks, obj, "tables", "table_name").await? {
171        return describe_table(session, spec, writer).await;
172    }
173    if object_exists_in(session, ks, obj, "indexes", "index_name").await? {
174        return describe_index(session, spec, writer).await;
175    }
176    if object_exists_in(session, ks, obj, "views", "view_name").await? {
177        return describe_materialized_view(session, spec, writer).await;
178    }
179    if object_exists_in(session, ks, obj, "types", "type_name").await? {
180        return describe_type(session, spec, writer).await;
181    }
182    if object_exists_in(session, ks, obj, "functions", "function_name").await? {
183        return describe_function(session, spec, writer).await;
184    }
185    if object_exists_in(session, ks, obj, "aggregates", "aggregate_name").await? {
186        return describe_aggregate(session, spec, writer).await;
187    }
188
189    writeln!(writer, "'{spec}' not found in schema.")?;
190    Ok(())
191}
192
193/// Check whether an object with the given name exists in a system_schema table.
194async fn object_exists_in(
195    session: &CqlSession,
196    keyspace: &str,
197    name: &str,
198    system_table: &str,
199    name_column: &str,
200) -> Result<bool> {
201    let query = format!(
202        "SELECT {name_column} FROM system_schema.{system_table} WHERE keyspace_name = '{}'",
203        keyspace.replace('\'', "''"),
204    );
205    let result = session.execute_query(&query).await?;
206    let lower = name.to_lowercase();
207    Ok(result.rows.iter().any(|r| {
208        r.get_by_name(name_column, &result.columns)
209            .map(|v| v.to_string().to_lowercase())
210            .as_deref()
211            == Some(lower.as_str())
212    }))
213}
214
215/// DESCRIBE CLUSTER — show cluster name and partitioner.
216async fn describe_cluster(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
217    let cluster_name = session.cluster_name.as_deref().unwrap_or("Unknown Cluster");
218
219    writeln!(writer)?;
220    writeln!(writer, "Cluster: {cluster_name}")?;
221    writeln!(writer, "Partitioner: Murmur3Partitioner")?;
222
223    // Fetch snitch info from system.local
224    match session
225        .execute_query("SELECT snitch FROM system.local")
226        .await
227    {
228        Ok(result) => {
229            if let Some(row) = result.rows.first() {
230                if let Some(snitch) = row.get(0) {
231                    writeln!(writer, "Snitch: {snitch}")?;
232                }
233            }
234        }
235        Err(_) => {
236            // Snitch info may not be available in all configurations
237        }
238    }
239    writeln!(writer)?;
240    Ok(())
241}
242
243/// DESCRIBE KEYSPACES — list all keyspace names.
244async fn describe_keyspaces(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
245    let keyspaces = session.get_keyspaces().await?;
246    writeln!(writer)?;
247    for ks in &keyspaces {
248        write!(writer, "{}", ks.name)?;
249        // Add spaces between keyspace names (Python cqlsh style)
250        write!(writer, "  ")?;
251    }
252    writeln!(writer)?;
253    writeln!(writer)?;
254    Ok(())
255}
256
257/// DESCRIBE KEYSPACE [name] — show CREATE KEYSPACE and all objects within it.
258async fn describe_keyspace(
259    session: &CqlSession,
260    keyspace: Option<&str>,
261    writer: &mut dyn Write,
262) -> Result<()> {
263    let ks_name = match keyspace {
264        Some(name) => name,
265        None => {
266            writeln!(
267                writer,
268                "No keyspace specified and no current keyspace. Use DESCRIBE KEYSPACE <name>."
269            )?;
270            return Ok(());
271        }
272    };
273
274    // Fetch keyspace details from system_schema
275    let query = format!(
276        "SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = '{}'",
277        ks_name.replace('\'', "''")
278    );
279    let result = session.execute_query(&query).await?;
280
281    if result.rows.is_empty() {
282        writeln!(writer, "Keyspace '{ks_name}' not found.")?;
283        return Ok(());
284    }
285
286    // Fetch durable_writes
287    let dw_query = format!(
288        "SELECT durable_writes FROM system_schema.keyspaces WHERE keyspace_name = '{}'",
289        ks_name.replace('\'', "''")
290    );
291    let dw_result = session.execute_query(&dw_query).await?;
292    let durable_writes = dw_result
293        .rows
294        .first()
295        .and_then(|r| r.get(0))
296        .map(|v| v.to_string() == "True")
297        .unwrap_or(true);
298
299    // Build replication string from the map value
300    let replication_str = result
301        .rows
302        .first()
303        .and_then(|r| r.get(0))
304        .map(|v| v.to_string())
305        .unwrap_or_else(|| "{}".to_string());
306
307    writeln!(writer)?;
308    writeln!(
309        writer,
310        "CREATE KEYSPACE {ks_name} WITH replication = {replication_str} AND durable_writes = {durable_writes};"
311    )?;
312
313    // Print all tables and their indexes in this keyspace
314    let tables = session.get_tables(ks_name).await?;
315    for table in &tables {
316        writeln!(writer)?;
317        write_create_table(writer, table)?;
318        write_table_indexes(session, ks_name, &table.name, writer).await?;
319    }
320
321    write_keyspace_materialized_views(session, ks_name, writer).await?;
322
323    writeln!(writer)?;
324    Ok(())
325}
326
327/// DESCRIBE TABLES — list tables in the current keyspace.
328async fn describe_tables(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
329    let keyspace = match session.current_keyspace() {
330        Some(ks) => ks.to_string(),
331        None => {
332            writeln!(
333                writer,
334                "No keyspace selected. Use USE <keyspace> first, or DESCRIBE KEYSPACE <name>."
335            )?;
336            return Ok(());
337        }
338    };
339
340    let tables = session.get_tables(&keyspace).await?;
341    if tables.is_empty() {
342        writeln!(writer)?;
343        writeln!(writer, "Keyspace '{keyspace}' has no tables.")?;
344        writeln!(writer)?;
345        return Ok(());
346    }
347
348    writeln!(writer)?;
349    for table in &tables {
350        write!(writer, "{}", table.name)?;
351        write!(writer, "  ")?;
352    }
353    writeln!(writer)?;
354    writeln!(writer)?;
355    Ok(())
356}
357
358/// DESCRIBE TABLE <name> — show CREATE TABLE statement.
359async fn describe_table(
360    session: &CqlSession,
361    table_spec: &str,
362    writer: &mut dyn Write,
363) -> Result<()> {
364    let (keyspace, table_name) = if table_spec.contains('.') {
365        let parts: Vec<&str> = table_spec.splitn(2, '.').collect();
366        (parts[0].to_string(), parts[1].to_string())
367    } else {
368        match session.current_keyspace() {
369            Some(ks) => (ks.to_string(), table_spec.to_string()),
370            None => {
371                writeln!(
372                    writer,
373                    "No keyspace selected. Use a fully qualified name: DESCRIBE TABLE keyspace.table"
374                )?;
375                return Ok(());
376            }
377        }
378    };
379
380    let table = session.get_table_metadata(&keyspace, &table_name).await?;
381
382    match table {
383        Some(meta) => {
384            writeln!(writer)?;
385            write_create_table(writer, &meta)?;
386            write_table_indexes(session, &keyspace, &table_name, writer).await?;
387            writeln!(writer)?;
388        }
389        None => {
390            writeln!(writer, "Table '{keyspace}.{table_name}' not found.")?;
391        }
392    }
393
394    Ok(())
395}
396
397/// DESCRIBE SCHEMA — show CREATE statements for all user keyspaces and their tables.
398async fn describe_schema(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
399    describe_schema_inner(session, writer, false).await
400}
401
402/// DESCRIBE FULL SCHEMA — show CREATE statements for ALL keyspaces (including system).
403async fn describe_full_schema(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
404    describe_schema_inner(session, writer, true).await
405}
406
407/// Shared implementation for DESCRIBE SCHEMA and DESCRIBE FULL SCHEMA.
408async fn describe_schema_inner(
409    session: &CqlSession,
410    writer: &mut dyn Write,
411    include_system: bool,
412) -> Result<()> {
413    let keyspaces = session.get_keyspaces().await?;
414
415    let filtered_keyspaces: Vec<_> = if include_system {
416        keyspaces.iter().collect()
417    } else {
418        keyspaces
419            .iter()
420            .filter(|ks| !is_system_keyspace(&ks.name))
421            .collect()
422    };
423
424    if filtered_keyspaces.is_empty() {
425        writeln!(writer)?;
426        writeln!(writer, "No user-defined keyspaces found.")?;
427        writeln!(writer)?;
428        return Ok(());
429    }
430
431    for ks in filtered_keyspaces {
432        // Print DESCRIBE KEYSPACE
433        describe_keyspace(session, Some(&ks.name), writer).await?;
434
435        // Print all tables in this keyspace
436        let tables = session.get_tables(&ks.name).await?;
437        for table in &tables {
438            writeln!(writer)?;
439            write_create_table(writer, table)?;
440            write_table_indexes(session, &ks.name, &table.name, writer).await?;
441        }
442    }
443
444    writeln!(writer)?;
445    Ok(())
446}
447
448/// DESCRIBE INDEX <name> — show CREATE INDEX statement.
449async fn describe_index(
450    session: &CqlSession,
451    index_spec: &str,
452    writer: &mut dyn Write,
453) -> Result<()> {
454    let (keyspace, index_name) = resolve_qualified_name(session, index_spec, writer)?;
455    let keyspace = match keyspace {
456        Some(ks) => ks,
457        None => return Ok(()),
458    };
459
460    // system_schema.indexes PK is (keyspace_name, table_name, index_name).
461    // We cannot filter on index_name without table_name, so we scan the whole
462    // keyspace partition and match in Rust.
463    let query = format!(
464        "SELECT index_name, table_name, kind, options FROM system_schema.indexes WHERE keyspace_name = '{}'",
465        keyspace.replace('\'', "''"),
466    );
467    let result = session.execute_query(&query).await?;
468
469    let row = result.rows.iter().find(|r| {
470        r.get_by_name("index_name", &result.columns)
471            .map(|v| v.to_string().to_lowercase())
472            .as_deref()
473            == Some(index_name.to_lowercase().as_str())
474    });
475
476    let row = match row {
477        Some(r) => r,
478        None => {
479            writeln!(writer, "Index '{keyspace}.{index_name}' not found.")?;
480            return Ok(());
481        }
482    };
483
484    let idx_name = row
485        .get_by_name("index_name", &result.columns)
486        .map(|v| v.to_string())
487        .unwrap_or_default();
488    let table_name = row
489        .get_by_name("table_name", &result.columns)
490        .map(|v| v.to_string())
491        .unwrap_or_default();
492    let options = row
493        .get_by_name("options", &result.columns)
494        .map(|v| v.to_string())
495        .unwrap_or_default();
496
497    // Extract target column from options map
498    // The options map contains 'target' key with the indexed column
499    let target = extract_map_value(&options, "target").unwrap_or_else(|| "unknown".to_string());
500
501    write!(
502        writer,
503        "{}",
504        format_index_ddl(&keyspace, &idx_name, &table_name, &target)
505    )?;
506    Ok(())
507}
508
509/// DESCRIBE MATERIALIZED VIEW <name> — show CREATE MATERIALIZED VIEW statement.
510async fn describe_materialized_view(
511    session: &CqlSession,
512    view_spec: &str,
513    writer: &mut dyn Write,
514) -> Result<()> {
515    let (keyspace, view_name) = resolve_qualified_name(session, view_spec, writer)?;
516    let keyspace = match keyspace {
517        Some(ks) => ks,
518        None => return Ok(()),
519    };
520
521    let query = format!(
522        "SELECT view_name, base_table_name, where_clause, include_all_columns FROM system_schema.views WHERE keyspace_name = '{}' AND view_name = '{}'",
523        keyspace.replace('\'', "''"),
524        view_name.replace('\'', "''")
525    );
526    let result = session.execute_query(&query).await?;
527
528    if result.rows.is_empty() {
529        writeln!(
530            writer,
531            "Materialized view '{keyspace}.{view_name}' not found."
532        )?;
533        return Ok(());
534    }
535
536    let row = &result.rows[0];
537    let mv_name = row
538        .get_by_name("view_name", &result.columns)
539        .map(|v| v.to_string())
540        .unwrap_or_default();
541    let base_table = row
542        .get_by_name("base_table_name", &result.columns)
543        .map(|v| v.to_string())
544        .unwrap_or_default();
545    let where_clause = row
546        .get_by_name("where_clause", &result.columns)
547        .map(|v| v.to_string())
548        .unwrap_or_else(|| "IS NOT NULL".to_string());
549    let include_all = row
550        .get_by_name("include_all_columns", &result.columns)
551        .map(|v| v.to_string() == "True")
552        .unwrap_or(false);
553
554    // Fetch columns for the view
555    // system_schema.columns is clustered by column_name, not position, so we
556    // cannot ORDER BY position in CQL — fetch all and sort in Rust.
557    let col_query = format!(
558        "SELECT column_name, type, kind, position, clustering_order FROM system_schema.columns WHERE keyspace_name = '{}' AND table_name = '{}'",
559        keyspace.replace('\'', "''"),
560        mv_name.replace('\'', "''")
561    );
562    let col_result = session.execute_query(&col_query).await?;
563
564    let mut select_columns = Vec::new();
565    let mut partition_keys: Vec<(i32, String)> = Vec::new();
566    let mut clustering_keys: Vec<(i32, String, String)> = Vec::new();
567
568    for col_row in &col_result.rows {
569        let col_name = col_row
570            .get_by_name("column_name", &col_result.columns)
571            .map(|v| v.to_string())
572            .unwrap_or_default();
573        let kind = col_row
574            .get_by_name("kind", &col_result.columns)
575            .map(|v| v.to_string())
576            .unwrap_or_default();
577        let position = col_row
578            .get_by_name("position", &col_result.columns)
579            .and_then(|v| v.to_string().parse::<i32>().ok())
580            .unwrap_or(0);
581        let clustering_order = col_row
582            .get_by_name("clustering_order", &col_result.columns)
583            .map(|v| v.to_string())
584            .unwrap_or_else(|| "none".to_string());
585
586        select_columns.push(col_name.clone());
587
588        if kind == "partition_key" {
589            partition_keys.push((position, col_name));
590        } else if kind == "clustering" {
591            clustering_keys.push((position, col_name, clustering_order));
592        }
593    }
594
595    partition_keys.sort_by_key(|k| k.0);
596    clustering_keys.sort_by_key(|k| k.0);
597
598    let sorted_pk: Vec<String> = partition_keys
599        .iter()
600        .map(|(_, name)| name.clone())
601        .collect();
602    let sorted_ck: Vec<(String, String)> = clustering_keys
603        .iter()
604        .map(|(_, name, order)| (name.clone(), order.clone()))
605        .collect();
606
607    let prop_names = [
608        "bloom_filter_fp_chance",
609        "caching",
610        "comment",
611        "compaction",
612        "compression",
613        "crc_check_chance",
614        "default_time_to_live",
615        "gc_grace_seconds",
616        "max_index_interval",
617        "memtable_flush_period_in_ms",
618        "min_index_interval",
619        "speculative_retry",
620    ];
621    let prop_query = format!(
622        "SELECT {} FROM system_schema.views WHERE keyspace_name = '{}' AND view_name = '{}'",
623        prop_names.join(", "),
624        keyspace.replace('\'', "''"),
625        mv_name.replace('\'', "''")
626    );
627    let prop_result = session.execute_query(&prop_query).await?;
628    let mut properties = std::collections::BTreeMap::new();
629    if let Some(prop_row) = prop_result.rows.first() {
630        for pn in &prop_names {
631            if let Some(val) = prop_row.get_by_name(pn, &prop_result.columns) {
632                properties.insert(pn.to_string(), val.to_string());
633            }
634        }
635    }
636
637    let parts = MvDdlParts {
638        keyspace: &keyspace,
639        view_name: &mv_name,
640        base_table: &base_table,
641        include_all,
642        select_columns: &select_columns,
643        where_clause: &where_clause,
644        partition_keys: &sorted_pk,
645        clustering_keys: &sorted_ck,
646        properties: &properties,
647    };
648    write!(writer, "{}", format_create_mv_ddl(&parts))?;
649    Ok(())
650}
651
652/// DESCRIBE TYPES — list all UDT names in the current keyspace.
653async fn describe_types(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
654    let keyspace = match session.current_keyspace() {
655        Some(ks) => ks.to_string(),
656        None => {
657            writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
658            return Ok(());
659        }
660    };
661
662    let udts = session.get_udts(&keyspace).await?;
663    if udts.is_empty() {
664        writeln!(writer)?;
665        writeln!(writer, "Keyspace '{keyspace}' has no user-defined types.")?;
666        writeln!(writer)?;
667        return Ok(());
668    }
669
670    writeln!(writer)?;
671    for udt in &udts {
672        write!(writer, "{}  ", udt.name)?;
673    }
674    writeln!(writer)?;
675    writeln!(writer)?;
676    Ok(())
677}
678
679/// DESCRIBE TYPE <name> — show CREATE TYPE statement.
680async fn describe_type(
681    session: &CqlSession,
682    type_spec: &str,
683    writer: &mut dyn Write,
684) -> Result<()> {
685    let (keyspace, type_name) = resolve_qualified_name(session, type_spec, writer)?;
686    let keyspace = match keyspace {
687        Some(ks) => ks,
688        None => return Ok(()),
689    };
690
691    let query = format!(
692        "SELECT type_name, field_names, field_types FROM system_schema.types WHERE keyspace_name = '{}' AND type_name = '{}'",
693        keyspace.replace('\'', "''"),
694        type_name.replace('\'', "''")
695    );
696    let result = session.execute_query(&query).await?;
697
698    if result.rows.is_empty() {
699        writeln!(writer, "Type '{keyspace}.{type_name}' not found.")?;
700        return Ok(());
701    }
702
703    let row = &result.rows[0];
704    let udt_name = row
705        .get_by_name("type_name", &result.columns)
706        .map(|v| v.to_string())
707        .unwrap_or_default();
708    let field_names_str = row
709        .get_by_name("field_names", &result.columns)
710        .map(|v| v.to_string())
711        .unwrap_or_default();
712    let field_types_str = row
713        .get_by_name("field_types", &result.columns)
714        .map(|v| v.to_string())
715        .unwrap_or_default();
716
717    let field_names = parse_list_value(&field_names_str);
718    let field_types = parse_list_value(&field_types_str);
719
720    let field_count = field_names.len().min(field_types.len());
721    let fields: Vec<(String, String)> = field_names
722        .into_iter()
723        .take(field_count)
724        .zip(field_types)
725        .collect();
726    write!(
727        writer,
728        "{}",
729        format_create_type_ddl(&keyspace, &udt_name, &fields)
730    )?;
731    Ok(())
732}
733
734/// DESCRIBE FUNCTIONS — list all UDF names in the current keyspace.
735async fn describe_functions(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
736    let keyspace = match session.current_keyspace() {
737        Some(ks) => ks.to_string(),
738        None => {
739            writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
740            return Ok(());
741        }
742    };
743
744    let functions = session.get_functions(&keyspace).await?;
745    if functions.is_empty() {
746        writeln!(writer)?;
747        writeln!(
748            writer,
749            "Keyspace '{keyspace}' has no user-defined functions."
750        )?;
751        writeln!(writer)?;
752        return Ok(());
753    }
754
755    writeln!(writer)?;
756    for func in &functions {
757        write!(writer, "{}  ", func.name)?;
758    }
759    writeln!(writer)?;
760    writeln!(writer)?;
761    Ok(())
762}
763
764/// DESCRIBE FUNCTION <name> — show CREATE FUNCTION statement.
765async fn describe_function(
766    session: &CqlSession,
767    func_spec: &str,
768    writer: &mut dyn Write,
769) -> Result<()> {
770    let (keyspace, func_name) = resolve_qualified_name(session, func_spec, writer)?;
771    let keyspace = match keyspace {
772        Some(ks) => ks,
773        None => return Ok(()),
774    };
775
776    let query = format!(
777        "SELECT function_name, argument_names, argument_types, return_type, language, body, called_on_null_input FROM system_schema.functions WHERE keyspace_name = '{}' AND function_name = '{}'",
778        keyspace.replace('\'', "''"),
779        func_name.replace('\'', "''")
780    );
781    let result = session.execute_query(&query).await?;
782
783    if result.rows.is_empty() {
784        writeln!(writer, "Function '{keyspace}.{func_name}' not found.")?;
785        return Ok(());
786    }
787
788    let row = &result.rows[0];
789    let fn_name = row
790        .get_by_name("function_name", &result.columns)
791        .map(|v| v.to_string())
792        .unwrap_or_default();
793    let arg_names_str = row
794        .get_by_name("argument_names", &result.columns)
795        .map(|v| v.to_string())
796        .unwrap_or_default();
797    let arg_types_str = row
798        .get_by_name("argument_types", &result.columns)
799        .map(|v| v.to_string())
800        .unwrap_or_default();
801    let return_type = row
802        .get_by_name("return_type", &result.columns)
803        .map(|v| v.to_string())
804        .unwrap_or_default();
805    let language = row
806        .get_by_name("language", &result.columns)
807        .map(|v| v.to_string())
808        .unwrap_or_default();
809    let body = row
810        .get_by_name("body", &result.columns)
811        .map(|v| v.to_string())
812        .unwrap_or_default();
813    let called_on_null = row
814        .get_by_name("called_on_null_input", &result.columns)
815        .map(|v| v.to_string() == "True")
816        .unwrap_or(false);
817
818    let arg_names = parse_list_value(&arg_names_str);
819    let arg_types = parse_list_value(&arg_types_str);
820
821    let args_str = arg_names
822        .iter()
823        .zip(arg_types.iter())
824        .map(|(name, typ)| format!("{} {}", quote_if_needed(name), typ))
825        .collect::<Vec<_>>()
826        .join(", ");
827
828    let null_handling = if called_on_null {
829        "CALLED ON NULL INPUT"
830    } else {
831        "RETURNS NULL ON NULL INPUT"
832    };
833
834    write!(
835        writer,
836        "{}",
837        format_create_function_ddl(
838            &keyspace,
839            &fn_name,
840            &args_str,
841            null_handling,
842            &return_type,
843            &language,
844            &body,
845        )
846    )?;
847    Ok(())
848}
849
850/// DESCRIBE AGGREGATES — list all UDA names in the current keyspace.
851async fn describe_aggregates(session: &CqlSession, writer: &mut dyn Write) -> Result<()> {
852    let keyspace = match session.current_keyspace() {
853        Some(ks) => ks.to_string(),
854        None => {
855            writeln!(writer, "No keyspace selected. Use USE <keyspace> first.")?;
856            return Ok(());
857        }
858    };
859
860    let aggregates = session.get_aggregates(&keyspace).await?;
861    if aggregates.is_empty() {
862        writeln!(writer)?;
863        writeln!(
864            writer,
865            "Keyspace '{keyspace}' has no user-defined aggregates."
866        )?;
867        writeln!(writer)?;
868        return Ok(());
869    }
870
871    writeln!(writer)?;
872    for agg in &aggregates {
873        write!(writer, "{}  ", agg.name)?;
874    }
875    writeln!(writer)?;
876    writeln!(writer)?;
877    Ok(())
878}
879
880/// DESCRIBE AGGREGATE <name> — show CREATE AGGREGATE statement.
881async fn describe_aggregate(
882    session: &CqlSession,
883    agg_spec: &str,
884    writer: &mut dyn Write,
885) -> Result<()> {
886    let (keyspace, agg_name) = resolve_qualified_name(session, agg_spec, writer)?;
887    let keyspace = match keyspace {
888        Some(ks) => ks,
889        None => return Ok(()),
890    };
891
892    let query = format!(
893        "SELECT aggregate_name, argument_types, state_func, state_type, final_func, initcond FROM system_schema.aggregates WHERE keyspace_name = '{}' AND aggregate_name = '{}'",
894        keyspace.replace('\'', "''"),
895        agg_name.replace('\'', "''")
896    );
897    let result = session.execute_query(&query).await?;
898
899    if result.rows.is_empty() {
900        writeln!(writer, "Aggregate '{keyspace}.{agg_name}' not found.")?;
901        return Ok(());
902    }
903
904    let row = &result.rows[0];
905    let ag_name = row
906        .get_by_name("aggregate_name", &result.columns)
907        .map(|v| v.to_string())
908        .unwrap_or_default();
909    let arg_types_str = row
910        .get_by_name("argument_types", &result.columns)
911        .map(|v| v.to_string())
912        .unwrap_or_default();
913    let state_func = row
914        .get_by_name("state_func", &result.columns)
915        .map(|v| v.to_string())
916        .unwrap_or_default();
917    let state_type = row
918        .get_by_name("state_type", &result.columns)
919        .map(|v| v.to_string())
920        .unwrap_or_default();
921    let final_func = row
922        .get_by_name("final_func", &result.columns)
923        .map(|v| v.to_string());
924    let initcond = row
925        .get_by_name("initcond", &result.columns)
926        .map(|v| v.to_string());
927
928    let arg_types = parse_list_value(&arg_types_str);
929    let args_str = arg_types.join(", ");
930
931    write!(
932        writer,
933        "{}",
934        format_create_aggregate_ddl(
935            &keyspace,
936            &ag_name,
937            &args_str,
938            &state_func,
939            &state_type,
940            final_func.as_deref(),
941            initcond.as_deref(),
942        )
943    )?;
944    Ok(())
945}
946
947/// Write a CREATE TABLE statement for the given table metadata.
948fn write_create_table(writer: &mut dyn Write, meta: &crate::driver::TableMetadata) -> Result<()> {
949    writeln!(
950        writer,
951        "CREATE TABLE {}.{} (",
952        quote_if_needed(&meta.keyspace),
953        quote_if_needed(&meta.name)
954    )?;
955
956    // Print columns
957    for col in &meta.columns {
958        writeln!(
959            writer,
960            "    {} {},",
961            quote_if_needed(&col.name),
962            col.type_name
963        )?;
964    }
965
966    // Print PRIMARY KEY
967    if !meta.partition_key.is_empty() {
968        let pk_str = if meta.partition_key.len() == 1 {
969            quote_if_needed(&meta.partition_key[0])
970        } else {
971            format!(
972                "({})",
973                meta.partition_key
974                    .iter()
975                    .map(|k| quote_if_needed(k))
976                    .collect::<Vec<_>>()
977                    .join(", ")
978            )
979        };
980
981        if meta.clustering_key.is_empty() {
982            writeln!(writer, "    PRIMARY KEY ({pk_str})")?;
983        } else {
984            let ck_str = meta
985                .clustering_key
986                .iter()
987                .map(|k| quote_if_needed(k))
988                .collect::<Vec<_>>()
989                .join(", ");
990            writeln!(writer, "    PRIMARY KEY ({pk_str}, {ck_str})")?;
991        }
992    }
993
994    writeln!(writer, ")")?;
995
996    let mut first_with = true;
997
998    if !meta.clustering_key.is_empty() {
999        let order_parts: Vec<String> = meta
1000            .clustering_key
1001            .iter()
1002            .enumerate()
1003            .map(|(i, name)| {
1004                let order = meta
1005                    .clustering_order
1006                    .get(i)
1007                    .map(|s| s.as_str())
1008                    .unwrap_or("ASC");
1009                format!("{} {}", quote_if_needed(name), order)
1010            })
1011            .collect();
1012        write!(
1013            writer,
1014            " WITH CLUSTERING ORDER BY ({})",
1015            order_parts.join(", ")
1016        )?;
1017        first_with = false;
1018    }
1019
1020    let prop_order = [
1021        "bloom_filter_fp_chance",
1022        "caching",
1023        "comment",
1024        "compaction",
1025        "compression",
1026        "crc_check_chance",
1027        "default_time_to_live",
1028        "gc_grace_seconds",
1029        "max_index_interval",
1030        "memtable_flush_period_in_ms",
1031        "min_index_interval",
1032        "speculative_retry",
1033    ];
1034
1035    for prop_name in &prop_order {
1036        if let Some(value) = meta.properties.get(*prop_name) {
1037            let formatted_value = format_property_value(prop_name, value);
1038            if first_with {
1039                write!(writer, " WITH {} = {}", prop_name, formatted_value)?;
1040                first_with = false;
1041            } else {
1042                write!(writer, "\n    AND {} = {}", prop_name, formatted_value)?;
1043            }
1044        }
1045    }
1046
1047    writeln!(writer, ";")?;
1048    Ok(())
1049}
1050
1051/// Fetch indexes for a table from system_schema.indexes and write CREATE INDEX statements.
1052async fn write_table_indexes(
1053    session: &CqlSession,
1054    keyspace: &str,
1055    table_name: &str,
1056    writer: &mut dyn Write,
1057) -> Result<()> {
1058    let query = format!(
1059        "SELECT index_name, table_name, kind, options FROM system_schema.indexes WHERE keyspace_name = '{}' AND table_name = '{}'",
1060        keyspace.replace('\'', "''"),
1061        table_name.replace('\'', "''"),
1062    );
1063    let result = session.execute_query(&query).await?;
1064
1065    for row in &result.rows {
1066        let idx_name = row
1067            .get_by_name("index_name", &result.columns)
1068            .map(|v| v.to_string())
1069            .unwrap_or_default();
1070        let tbl_name = row
1071            .get_by_name("table_name", &result.columns)
1072            .map(|v| v.to_string())
1073            .unwrap_or_default();
1074        let options = row
1075            .get_by_name("options", &result.columns)
1076            .map(|v| v.to_string())
1077            .unwrap_or_default();
1078
1079        let target = extract_map_value(&options, "target").unwrap_or_else(|| "unknown".to_string());
1080
1081        writeln!(
1082            writer,
1083            "CREATE INDEX {} ON {}.{} ({});",
1084            quote_if_needed(&idx_name),
1085            quote_if_needed(keyspace),
1086            quote_if_needed(&tbl_name),
1087            target
1088        )?;
1089    }
1090
1091    Ok(())
1092}
1093
1094async fn write_keyspace_materialized_views(
1095    session: &CqlSession,
1096    keyspace: &str,
1097    writer: &mut dyn Write,
1098) -> Result<()> {
1099    let query = format!(
1100        "SELECT view_name FROM system_schema.views WHERE keyspace_name = '{}'",
1101        keyspace.replace('\'', "''")
1102    );
1103    let result = session.execute_query(&query).await?;
1104
1105    for row in &result.rows {
1106        if let Some(view_name) = row.get(0) {
1107            let view_name = view_name.to_string();
1108            writeln!(writer)?;
1109            describe_materialized_view(session, &format!("{keyspace}.{view_name}"), writer).await?;
1110        }
1111    }
1112
1113    Ok(())
1114}
1115
1116/// Resolve a potentially qualified name (ks.name or just name) into (keyspace, name).
1117///
1118/// If no keyspace prefix is given, uses the session's current keyspace.
1119/// Returns `(None, name)` if no keyspace can be determined (and prints an error).
1120fn resolve_qualified_name(
1121    session: &CqlSession,
1122    spec: &str,
1123    writer: &mut dyn Write,
1124) -> Result<(Option<String>, String)> {
1125    if spec.contains('.') {
1126        let parts: Vec<&str> = spec.splitn(2, '.').collect();
1127        Ok((Some(parts[0].to_string()), parts[1].to_string()))
1128    } else {
1129        match session.current_keyspace() {
1130            Some(ks) => Ok((Some(ks.to_string()), spec.to_string())),
1131            None => {
1132                writeln!(
1133                    writer,
1134                    "No keyspace selected. Use a fully qualified name (keyspace.name) or USE <keyspace> first."
1135                )?;
1136                Ok((None, spec.to_string()))
1137            }
1138        }
1139    }
1140}
1141
1142/// Parse a CQL list string like `['a', 'b', 'c']` or `[a, b, c]` into a Vec of strings.
1143fn parse_list_value(s: &str) -> Vec<String> {
1144    let trimmed = s.trim();
1145    // Handle empty list
1146    if trimmed == "[]" || trimmed.is_empty() {
1147        return Vec::new();
1148    }
1149    // Strip surrounding brackets
1150    let inner = if trimmed.starts_with('[') && trimmed.ends_with(']') {
1151        &trimmed[1..trimmed.len() - 1]
1152    } else {
1153        trimmed
1154    };
1155    if inner.trim().is_empty() {
1156        return Vec::new();
1157    }
1158    inner
1159        .split(',')
1160        .map(|s| {
1161            let s = s.trim();
1162            // Strip surrounding quotes
1163            if (s.starts_with('\'') && s.ends_with('\''))
1164                || (s.starts_with('"') && s.ends_with('"'))
1165            {
1166                s[1..s.len() - 1].to_string()
1167            } else {
1168                s.to_string()
1169            }
1170        })
1171        .collect()
1172}
1173
1174/// Extract a value from a CQL map string like `{'key': 'value', ...}`.
1175fn extract_map_value(map_str: &str, key: &str) -> Option<String> {
1176    let trimmed = map_str.trim();
1177    // Strip surrounding braces
1178    let inner = if trimmed.starts_with('{') && trimmed.ends_with('}') {
1179        &trimmed[1..trimmed.len() - 1]
1180    } else {
1181        trimmed
1182    };
1183
1184    // Simple parsing: split on commas, then on ':'
1185    for entry in inner.split(',') {
1186        let parts: Vec<&str> = entry.splitn(2, ':').collect();
1187        if parts.len() == 2 {
1188            let k = parts[0].trim().trim_matches('\'').trim_matches('"');
1189            let v = parts[1].trim().trim_matches('\'').trim_matches('"');
1190            if k == key {
1191                return Some(v.to_string());
1192            }
1193        }
1194    }
1195    None
1196}
1197
1198fn format_property_value(name: &str, value: &str) -> String {
1199    match name {
1200        "comment" | "speculative_retry" => format!("'{}'", value.replace('\'', "''")),
1201        "caching" | "compaction" | "compression" => value.to_string(),
1202        _ => value.to_string(),
1203    }
1204}
1205
1206/// Check if a keyspace is a system keyspace.
1207fn is_system_keyspace(name: &str) -> bool {
1208    name.starts_with("system")
1209        || name == "dse_system"
1210        || name == "dse_perf"
1211        || name == "dse_security"
1212        || name == "dse_leases"
1213        || name == "dse_system_local"
1214        || name == "dse_insights"
1215        || name == "solr_admin"
1216}
1217
1218/// Quote an identifier if it needs quoting (contains uppercase, spaces, or reserved words).
1219fn quote_if_needed(name: &str) -> String {
1220    // Simple heuristic: quote if not all lowercase alphanumeric + underscore
1221    if name
1222        .chars()
1223        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
1224        && !name.is_empty()
1225        && !name.starts_with(|c: char| c.is_ascii_digit())
1226    {
1227        name.to_string()
1228    } else {
1229        format!("\"{}\"", name.replace('"', "\"\""))
1230    }
1231}
1232
1233/// Strip surrounding single or double quotes from a string.
1234fn strip_quotes(s: &str) -> &str {
1235    if (s.starts_with('"') && s.ends_with('"')) || (s.starts_with('\'') && s.ends_with('\'')) {
1236        &s[1..s.len() - 1]
1237    } else {
1238        s
1239    }
1240}
1241
1242// --- Pure DDL formatters (testable without a session) ---
1243
1244/// Format a CREATE INDEX DDL string.
1245fn format_index_ddl(keyspace: &str, index_name: &str, table_name: &str, target: &str) -> String {
1246    format!(
1247        "\nCREATE INDEX {} ON {}.{} ({});\n\n",
1248        quote_if_needed(index_name),
1249        quote_if_needed(keyspace),
1250        quote_if_needed(table_name),
1251        target
1252    )
1253}
1254
1255/// Format a CREATE TYPE DDL string.
1256fn format_create_type_ddl(keyspace: &str, type_name: &str, fields: &[(String, String)]) -> String {
1257    let mut out = String::new();
1258    out.push('\n');
1259    out.push_str(&format!(
1260        "CREATE TYPE {}.{} (\n",
1261        quote_if_needed(keyspace),
1262        quote_if_needed(type_name)
1263    ));
1264    let field_count = fields.len();
1265    for (i, (name, typ)) in fields.iter().enumerate() {
1266        let comma = if i < field_count - 1 { "," } else { "" };
1267        out.push_str(&format!("    {} {}{}\n", quote_if_needed(name), typ, comma));
1268    }
1269    out.push_str(");\n\n");
1270    out
1271}
1272
1273/// Format a CREATE FUNCTION DDL string.
1274fn format_create_function_ddl(
1275    keyspace: &str,
1276    func_name: &str,
1277    args_str: &str,
1278    null_handling: &str,
1279    return_type: &str,
1280    language: &str,
1281    body: &str,
1282) -> String {
1283    format!(
1284        "\nCREATE OR REPLACE FUNCTION {}.{} ({})\n    {}\n    RETURNS {}\n    LANGUAGE {}\n    AS $$ {} $$;\n\n",
1285        quote_if_needed(keyspace),
1286        quote_if_needed(func_name),
1287        args_str,
1288        null_handling,
1289        return_type,
1290        language,
1291        body
1292    )
1293}
1294
1295/// Format a CREATE AGGREGATE DDL string.
1296fn format_create_aggregate_ddl(
1297    keyspace: &str,
1298    agg_name: &str,
1299    args_str: &str,
1300    state_func: &str,
1301    state_type: &str,
1302    final_func: Option<&str>,
1303    initcond: Option<&str>,
1304) -> String {
1305    let mut out = format!(
1306        "\nCREATE OR REPLACE AGGREGATE {}.{} ({})\n    SFUNC {}\n    STYPE {}",
1307        quote_if_needed(keyspace),
1308        quote_if_needed(agg_name),
1309        args_str,
1310        state_func,
1311        state_type
1312    );
1313    if let Some(ff) = final_func {
1314        if !ff.is_empty() && ff != "null" {
1315            out.push_str(&format!("\n    FINALFUNC {ff}"));
1316        }
1317    }
1318    if let Some(ic) = initcond {
1319        if !ic.is_empty() && ic != "null" {
1320            out.push_str(&format!("\n    INITCOND {ic}"));
1321        }
1322    }
1323    out.push_str("\n;\n\n");
1324    out
1325}
1326
1327/// Parts needed to format a CREATE MATERIALIZED VIEW DDL string.
1328struct MvDdlParts<'a> {
1329    keyspace: &'a str,
1330    view_name: &'a str,
1331    base_table: &'a str,
1332    include_all: bool,
1333    select_columns: &'a [String],
1334    where_clause: &'a str,
1335    partition_keys: &'a [String],            // sorted by position
1336    clustering_keys: &'a [(String, String)], // (name, order), sorted by position
1337    properties: &'a std::collections::BTreeMap<String, String>,
1338}
1339
1340/// Format a CREATE MATERIALIZED VIEW DDL string.
1341fn format_create_mv_ddl(parts: &MvDdlParts<'_>) -> String {
1342    let mut out = String::new();
1343    out.push('\n');
1344    out.push_str(&format!(
1345        "CREATE MATERIALIZED VIEW {}.{} AS\n",
1346        quote_if_needed(parts.keyspace),
1347        quote_if_needed(parts.view_name)
1348    ));
1349
1350    let columns_str = if parts.include_all {
1351        "*".to_string()
1352    } else {
1353        parts
1354            .select_columns
1355            .iter()
1356            .map(|c| quote_if_needed(c))
1357            .collect::<Vec<_>>()
1358            .join(", ")
1359    };
1360    out.push_str(&format!("    SELECT {columns_str}\n"));
1361    out.push_str(&format!(
1362        "    FROM {}.{}\n",
1363        quote_if_needed(parts.keyspace),
1364        quote_if_needed(parts.base_table)
1365    ));
1366    out.push_str(&format!("    WHERE {}\n", parts.where_clause));
1367
1368    let pk_str = if parts.partition_keys.len() == 1 {
1369        quote_if_needed(&parts.partition_keys[0])
1370    } else {
1371        format!(
1372            "({})",
1373            parts
1374                .partition_keys
1375                .iter()
1376                .map(|k| quote_if_needed(k))
1377                .collect::<Vec<_>>()
1378                .join(", ")
1379        )
1380    };
1381
1382    if parts.clustering_keys.is_empty() {
1383        out.push_str(&format!("    PRIMARY KEY ({pk_str})\n"));
1384    } else {
1385        let ck_str = parts
1386            .clustering_keys
1387            .iter()
1388            .map(|(name, _)| quote_if_needed(name))
1389            .collect::<Vec<_>>()
1390            .join(", ");
1391        out.push_str(&format!("    PRIMARY KEY ({pk_str}, {ck_str})\n"));
1392    }
1393
1394    let mut first_with = true;
1395
1396    if !parts.clustering_keys.is_empty() {
1397        let order_str = parts
1398            .clustering_keys
1399            .iter()
1400            .map(|(name, order)| {
1401                let o = if order.to_uppercase() == "DESC" {
1402                    order.to_uppercase()
1403                } else {
1404                    "ASC".to_string()
1405                };
1406                format!("{} {o}", quote_if_needed(name))
1407            })
1408            .collect::<Vec<_>>()
1409            .join(", ");
1410        out.push_str(&format!(" WITH CLUSTERING ORDER BY ({order_str})"));
1411        first_with = false;
1412    }
1413
1414    let prop_order = [
1415        "bloom_filter_fp_chance",
1416        "caching",
1417        "comment",
1418        "compaction",
1419        "compression",
1420        "crc_check_chance",
1421        "default_time_to_live",
1422        "gc_grace_seconds",
1423        "max_index_interval",
1424        "memtable_flush_period_in_ms",
1425        "min_index_interval",
1426        "speculative_retry",
1427    ];
1428
1429    for prop_name in &prop_order {
1430        if let Some(value) = parts.properties.get(*prop_name) {
1431            let formatted_value = format_property_value(prop_name, value);
1432            if first_with {
1433                out.push_str(&format!(" WITH {} = {}", prop_name, formatted_value));
1434                first_with = false;
1435            } else {
1436                out.push_str(&format!("\n    AND {} = {}", prop_name, formatted_value));
1437            }
1438        }
1439    }
1440
1441    out.push_str(";\n");
1442    out.push('\n');
1443    out
1444}
1445
1446#[cfg(test)]
1447mod tests {
1448    use super::*;
1449
1450    #[test]
1451    fn system_keyspace_detection() {
1452        assert!(is_system_keyspace("system"));
1453        assert!(is_system_keyspace("system_schema"));
1454        assert!(is_system_keyspace("system_auth"));
1455        assert!(is_system_keyspace("system_traces"));
1456        assert!(!is_system_keyspace("my_keyspace"));
1457        assert!(!is_system_keyspace("users"));
1458    }
1459
1460    #[test]
1461    fn quote_simple_identifier() {
1462        assert_eq!(quote_if_needed("users"), "users");
1463        assert_eq!(quote_if_needed("my_table"), "my_table");
1464    }
1465
1466    #[test]
1467    fn quote_mixed_case_identifier() {
1468        assert_eq!(quote_if_needed("MyTable"), "\"MyTable\"");
1469    }
1470
1471    #[test]
1472    fn quote_identifier_with_spaces() {
1473        assert_eq!(quote_if_needed("my table"), "\"my table\"");
1474    }
1475
1476    #[test]
1477    fn quote_identifier_starting_with_digit() {
1478        assert_eq!(quote_if_needed("1table"), "\"1table\"");
1479    }
1480
1481    #[test]
1482    fn strip_quotes_test() {
1483        assert_eq!(strip_quotes("\"hello\""), "hello");
1484        assert_eq!(strip_quotes("'hello'"), "hello");
1485        assert_eq!(strip_quotes("hello"), "hello");
1486    }
1487
1488    #[test]
1489    fn parse_list_value_test() {
1490        assert_eq!(parse_list_value("[]"), Vec::<String>::new());
1491        assert_eq!(parse_list_value(""), Vec::<String>::new());
1492        assert_eq!(parse_list_value("['a', 'b', 'c']"), vec!["a", "b", "c"]);
1493        assert_eq!(
1494            parse_list_value("[int, text, uuid]"),
1495            vec!["int", "text", "uuid"]
1496        );
1497    }
1498
1499    #[test]
1500    fn extract_map_value_test() {
1501        assert_eq!(
1502            extract_map_value("{'target': 'email', 'class_name': 'foo'}", "target"),
1503            Some("email".to_string())
1504        );
1505        assert_eq!(extract_map_value("{'target': 'email'}", "missing"), None);
1506    }
1507
1508    #[test]
1509    fn write_create_table_simple() {
1510        use crate::driver::{ColumnMetadata, TableMetadata};
1511
1512        let meta = TableMetadata {
1513            keyspace: "test_ks".to_string(),
1514            name: "users".to_string(),
1515            columns: vec![
1516                ColumnMetadata {
1517                    name: "id".to_string(),
1518                    type_name: "uuid".to_string(),
1519                },
1520                ColumnMetadata {
1521                    name: "name".to_string(),
1522                    type_name: "text".to_string(),
1523                },
1524                ColumnMetadata {
1525                    name: "age".to_string(),
1526                    type_name: "int".to_string(),
1527                },
1528            ],
1529            partition_key: vec!["id".to_string()],
1530            clustering_key: vec![],
1531            clustering_order: vec![],
1532            properties: std::collections::BTreeMap::new(),
1533        };
1534
1535        let mut buf = Vec::new();
1536        write_create_table(&mut buf, &meta).unwrap();
1537        let output = String::from_utf8(buf).unwrap();
1538        assert!(output.contains("CREATE TABLE test_ks.users"));
1539        assert!(output.contains("id uuid"));
1540        assert!(output.contains("name text"));
1541        assert!(output.contains("PRIMARY KEY (id)"));
1542    }
1543
1544    #[test]
1545    fn write_create_table_composite_key() {
1546        use crate::driver::{ColumnMetadata, TableMetadata};
1547
1548        let meta = TableMetadata {
1549            keyspace: "ks".to_string(),
1550            name: "events".to_string(),
1551            columns: vec![
1552                ColumnMetadata {
1553                    name: "user_id".to_string(),
1554                    type_name: "uuid".to_string(),
1555                },
1556                ColumnMetadata {
1557                    name: "event_time".to_string(),
1558                    type_name: "timestamp".to_string(),
1559                },
1560                ColumnMetadata {
1561                    name: "data".to_string(),
1562                    type_name: "text".to_string(),
1563                },
1564            ],
1565            partition_key: vec!["user_id".to_string()],
1566            clustering_key: vec!["event_time".to_string()],
1567            clustering_order: vec!["ASC".to_string()],
1568            properties: std::collections::BTreeMap::new(),
1569        };
1570
1571        let mut buf = Vec::new();
1572        write_create_table(&mut buf, &meta).unwrap();
1573        let output = String::from_utf8(buf).unwrap();
1574        assert!(output.contains("PRIMARY KEY (user_id, event_time)"));
1575        assert!(
1576            output.contains("WITH CLUSTERING ORDER BY (event_time ASC)"),
1577            "expected CLUSTERING ORDER BY: {output}"
1578        );
1579    }
1580
1581    #[test]
1582    fn write_create_table_compound_partition_key() {
1583        use crate::driver::{ColumnMetadata, TableMetadata};
1584
1585        let meta = TableMetadata {
1586            keyspace: "ks".to_string(),
1587            name: "metrics".to_string(),
1588            columns: vec![
1589                ColumnMetadata {
1590                    name: "host".to_string(),
1591                    type_name: "text".to_string(),
1592                },
1593                ColumnMetadata {
1594                    name: "metric".to_string(),
1595                    type_name: "text".to_string(),
1596                },
1597                ColumnMetadata {
1598                    name: "ts".to_string(),
1599                    type_name: "timestamp".to_string(),
1600                },
1601                ColumnMetadata {
1602                    name: "value".to_string(),
1603                    type_name: "double".to_string(),
1604                },
1605            ],
1606            partition_key: vec!["host".to_string(), "metric".to_string()],
1607            clustering_key: vec!["ts".to_string()],
1608            clustering_order: vec!["ASC".to_string()],
1609            properties: std::collections::BTreeMap::new(),
1610        };
1611
1612        let mut buf = Vec::new();
1613        write_create_table(&mut buf, &meta).unwrap();
1614        let output = String::from_utf8(buf).unwrap();
1615        assert!(output.contains("PRIMARY KEY ((host, metric), ts)"));
1616    }
1617
1618    // --- DDL formatter tests ---
1619
1620    #[test]
1621    fn format_index_ddl_simple() {
1622        let ddl = format_index_ddl("my_ks", "email_idx", "users", "email");
1623        assert!(ddl.contains("CREATE INDEX email_idx ON my_ks.users (email);"));
1624    }
1625
1626    #[test]
1627    fn format_index_ddl_quoted_names() {
1628        let ddl = format_index_ddl("MyKs", "MyIdx", "MyTable", "email");
1629        assert!(ddl.contains("\"MyKs\""));
1630        assert!(ddl.contains("\"MyIdx\""));
1631        assert!(ddl.contains("\"MyTable\""));
1632    }
1633
1634    #[test]
1635    fn format_create_type_ddl_single_field() {
1636        let fields = vec![("street".to_string(), "text".to_string())];
1637        let ddl = format_create_type_ddl("ks1", "address", &fields);
1638        assert!(ddl.contains("CREATE TYPE ks1.address ("));
1639        assert!(ddl.contains("street text"));
1640        assert!(ddl.contains(");"));
1641    }
1642
1643    #[test]
1644    fn format_create_type_ddl_multiple_fields() {
1645        let fields = vec![
1646            ("street".to_string(), "text".to_string()),
1647            ("city".to_string(), "text".to_string()),
1648            ("zip".to_string(), "int".to_string()),
1649        ];
1650        let ddl = format_create_type_ddl("ks1", "address", &fields);
1651        assert!(
1652            ddl.contains("street text,"),
1653            "expected trailing comma: {ddl}"
1654        );
1655        assert!(ddl.contains("city text,"), "expected trailing comma: {ddl}");
1656        // last field has no trailing comma
1657        assert!(
1658            !ddl.contains("int,"),
1659            "last field should not have comma: {ddl}"
1660        );
1661    }
1662
1663    #[test]
1664    fn format_create_function_ddl_called_on_null() {
1665        let ddl = format_create_function_ddl(
1666            "ks1",
1667            "add_one",
1668            "val int",
1669            "CALLED ON NULL INPUT",
1670            "int",
1671            "java",
1672            "return val + 1;",
1673        );
1674        assert!(ddl.contains("CREATE OR REPLACE FUNCTION ks1.add_one (val int)"));
1675        assert!(ddl.contains("CALLED ON NULL INPUT"));
1676        assert!(ddl.contains("RETURNS int"));
1677        assert!(ddl.contains("LANGUAGE java"));
1678        assert!(ddl.contains("AS $$ return val + 1; $$;"));
1679    }
1680
1681    #[test]
1682    fn format_create_function_ddl_returns_null() {
1683        let ddl = format_create_function_ddl(
1684            "ks1",
1685            "my_func",
1686            "x text",
1687            "RETURNS NULL ON NULL INPUT",
1688            "text",
1689            "lua",
1690            "return x",
1691        );
1692        assert!(ddl.contains("RETURNS NULL ON NULL INPUT"));
1693        assert!(!ddl.contains("CALLED ON NULL INPUT"));
1694    }
1695
1696    #[test]
1697    fn format_create_aggregate_ddl_minimal() {
1698        let ddl =
1699            format_create_aggregate_ddl("ks1", "my_sum", "int", "state_add", "int", None, None);
1700        assert!(ddl.contains("CREATE OR REPLACE AGGREGATE ks1.my_sum (int)"));
1701        assert!(ddl.contains("SFUNC state_add"));
1702        assert!(ddl.contains("STYPE int"));
1703        assert!(!ddl.contains("FINALFUNC"));
1704        assert!(!ddl.contains("INITCOND"));
1705        assert!(ddl.contains(';'));
1706    }
1707
1708    #[test]
1709    fn format_create_aggregate_ddl_with_optional() {
1710        let ddl = format_create_aggregate_ddl(
1711            "ks1",
1712            "my_avg",
1713            "int",
1714            "state_avg",
1715            "tuple<int,int>",
1716            Some("final_avg"),
1717            Some("0"),
1718        );
1719        assert!(ddl.contains("FINALFUNC final_avg"));
1720        assert!(ddl.contains("INITCOND 0"));
1721    }
1722
1723    #[test]
1724    fn format_create_aggregate_ddl_empty_optional_skipped() {
1725        let ddl = format_create_aggregate_ddl(
1726            "ks1",
1727            "my_agg",
1728            "int",
1729            "sf",
1730            "int",
1731            Some(""),
1732            Some("null"),
1733        );
1734        assert!(
1735            !ddl.contains("FINALFUNC"),
1736            "empty FINALFUNC should be omitted: {ddl}"
1737        );
1738        assert!(
1739            !ddl.contains("INITCOND"),
1740            "'null' INITCOND should be omitted: {ddl}"
1741        );
1742    }
1743
1744    #[test]
1745    fn format_create_mv_ddl_simple() {
1746        let cols = vec!["id".to_string(), "email".to_string()];
1747        let props = std::collections::BTreeMap::new();
1748        let parts = MvDdlParts {
1749            keyspace: "ks1",
1750            view_name: "user_by_email",
1751            base_table: "users",
1752            include_all: false,
1753            select_columns: &cols,
1754            where_clause: "email IS NOT NULL",
1755            partition_keys: &["email".to_string()],
1756            clustering_keys: &[],
1757            properties: &props,
1758        };
1759        let ddl = format_create_mv_ddl(&parts);
1760        assert!(ddl.contains("CREATE MATERIALIZED VIEW ks1.user_by_email AS"));
1761        assert!(ddl.contains("SELECT id, email"));
1762        assert!(ddl.contains("FROM ks1.users"));
1763        assert!(ddl.contains("WHERE email IS NOT NULL"));
1764        assert!(ddl.contains("PRIMARY KEY (email)"));
1765    }
1766
1767    #[test]
1768    fn format_create_mv_ddl_include_all() {
1769        let props = std::collections::BTreeMap::new();
1770        let parts = MvDdlParts {
1771            keyspace: "ks1",
1772            view_name: "mv_all",
1773            base_table: "base",
1774            include_all: true,
1775            select_columns: &["id".to_string()],
1776            where_clause: "id IS NOT NULL",
1777            partition_keys: &["id".to_string()],
1778            clustering_keys: &[],
1779            properties: &props,
1780        };
1781        let ddl = format_create_mv_ddl(&parts);
1782        assert!(
1783            ddl.contains("SELECT *"),
1784            "include_all should emit SELECT *: {ddl}"
1785        );
1786    }
1787
1788    #[test]
1789    fn format_create_mv_ddl_with_clustering_desc() {
1790        let cols = vec!["user_id".to_string(), "ts".to_string()];
1791        let ck = vec![("ts".to_string(), "DESC".to_string())];
1792        let props = std::collections::BTreeMap::new();
1793        let parts = MvDdlParts {
1794            keyspace: "ks1",
1795            view_name: "mv_ordered",
1796            base_table: "events",
1797            include_all: false,
1798            select_columns: &cols,
1799            where_clause: "ts IS NOT NULL",
1800            partition_keys: &["user_id".to_string()],
1801            clustering_keys: &ck,
1802            properties: &props,
1803        };
1804        let ddl = format_create_mv_ddl(&parts);
1805        assert!(ddl.contains("PRIMARY KEY (user_id, ts)"));
1806        assert!(ddl.contains("WITH CLUSTERING ORDER BY (ts DESC)"));
1807    }
1808
1809    #[test]
1810    fn format_create_mv_ddl_with_properties() {
1811        let cols = vec!["id".to_string(), "val".to_string()];
1812        let ck = vec![("val".to_string(), "ASC".to_string())];
1813        let mut props = std::collections::BTreeMap::new();
1814        props.insert("gc_grace_seconds".to_string(), "864000".to_string());
1815        props.insert("comment".to_string(), "test view".to_string());
1816        let parts = MvDdlParts {
1817            keyspace: "ks1",
1818            view_name: "mv_props",
1819            base_table: "t",
1820            include_all: false,
1821            select_columns: &cols,
1822            where_clause: "val IS NOT NULL",
1823            partition_keys: &["id".to_string()],
1824            clustering_keys: &ck,
1825            properties: &props,
1826        };
1827        let ddl = format_create_mv_ddl(&parts);
1828        assert!(
1829            ddl.contains("WITH CLUSTERING ORDER BY (val ASC)"),
1830            "should always show CLUSTERING ORDER BY: {ddl}"
1831        );
1832        assert!(
1833            ddl.contains("AND comment = 'test view'"),
1834            "should include comment property: {ddl}"
1835        );
1836        assert!(
1837            ddl.contains("AND gc_grace_seconds = 864000"),
1838            "should include gc_grace_seconds: {ddl}"
1839        );
1840    }
1841
1842    #[test]
1843    fn format_property_value_comment_quoted() {
1844        let result = format_property_value("comment", "hello world");
1845        assert_eq!(result, "'hello world'");
1846    }
1847
1848    #[test]
1849    fn format_property_value_comment_with_single_quote() {
1850        let result = format_property_value("comment", "it's a test");
1851        assert_eq!(result, "'it''s a test'");
1852    }
1853
1854    #[test]
1855    fn format_property_value_speculative_retry_quoted() {
1856        let result = format_property_value("speculative_retry", "99PERCENTILE");
1857        assert_eq!(result, "'99PERCENTILE'");
1858    }
1859
1860    #[test]
1861    fn format_property_value_caching_passthrough() {
1862        let val = "{'keys': 'ALL', 'rows_per_partition': 'NONE'}";
1863        let result = format_property_value("caching", val);
1864        assert_eq!(result, val);
1865    }
1866
1867    #[test]
1868    fn format_property_value_compaction_passthrough() {
1869        let val = "{'class': 'SizeTieredCompactionStrategy'}";
1870        let result = format_property_value("compaction", val);
1871        assert_eq!(result, val);
1872    }
1873
1874    #[test]
1875    fn format_property_value_compression_passthrough() {
1876        let val = "{'sstable_compression': 'LZ4Compressor'}";
1877        let result = format_property_value("compression", val);
1878        assert_eq!(result, val);
1879    }
1880
1881    #[test]
1882    fn format_property_value_other_passthrough() {
1883        let result = format_property_value("gc_grace_seconds", "864000");
1884        assert_eq!(result, "864000");
1885    }
1886}