Skip to main content

cqlsh_rs/
copy.rs

1//! COPY TO and COPY FROM implementation — exports/imports CSV data.
2//!
3//! Supports:
4//!   `COPY [ks.]table [(col1, col2, ...)] TO 'filename'|STDOUT [WITH options...]`
5//!   `COPY [ks.]table [(col1, col2, ...)] FROM 'filename'|STDIN [WITH options...]`
6
7use std::io::Write;
8use std::net::IpAddr;
9use std::path::PathBuf;
10use std::time::{Duration, Instant};
11
12use anyhow::{bail, Context, Result};
13use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
14use futures::StreamExt;
15
16use crate::driver::types::CqlValue;
17use crate::driver::PreparedId;
18use crate::session::CqlSession;
19
20/// Where to write/read the CSV data.
21#[derive(Debug, Clone, PartialEq)]
22pub enum CopyTarget {
23    /// Write to / read from a file at the given path.
24    File(PathBuf),
25    /// Write to standard output.
26    Stdout,
27    /// Read from standard input.
28    Stdin,
29}
30
31/// All options controlling CSV export behavior.
32#[derive(Debug, Clone)]
33pub struct CopyOptions {
34    pub delimiter: char,
35    pub quote: char,
36    pub escape: char,
37    pub header: bool,
38    pub null_val: String,
39    pub datetime_format: Option<String>,
40    pub encoding: String,
41    pub float_precision: usize,
42    pub double_precision: usize,
43    pub decimal_sep: char,
44    pub thousands_sep: Option<char>,
45    pub bool_style: (String, String),
46    pub page_size: usize,
47    pub max_output_size: Option<usize>,
48    pub report_frequency: Option<usize>,
49}
50
51impl Default for CopyOptions {
52    fn default() -> Self {
53        Self {
54            delimiter: ',',
55            quote: '"',
56            escape: '\\',
57            header: false,
58            null_val: String::new(),
59            datetime_format: None,
60            encoding: "utf-8".to_string(),
61            float_precision: 5,
62            double_precision: 12,
63            decimal_sep: '.',
64            thousands_sep: None,
65            bool_style: ("True".to_string(), "False".to_string()),
66            page_size: 1000,
67            max_output_size: None,
68            report_frequency: None,
69        }
70    }
71}
72
73/// A parsed COPY TO command.
74#[derive(Debug, Clone)]
75pub struct CopyToCommand {
76    pub keyspace: Option<String>,
77    pub table: String,
78    pub columns: Option<Vec<String>>,
79    pub filename: CopyTarget,
80    pub options: CopyOptions,
81}
82
83/// Parse a `COPY ... TO ...` statement.
84///
85/// Format: `COPY [ks.]table [(col1, col2)] TO 'filename'|STDOUT [WITH opt=val AND ...]`
86pub fn parse_copy_to(input: &str) -> Result<CopyToCommand> {
87    let trimmed = input.trim().trim_end_matches(';').trim();
88
89    // Must start with COPY (case-insensitive)
90    let upper = trimmed.to_uppercase();
91    if !upper.starts_with("COPY ") {
92        bail!("not a COPY statement");
93    }
94
95    // Find the TO keyword (case-insensitive), but not inside parentheses
96    let to_pos =
97        find_keyword_outside_parens(trimmed, "TO").context("COPY statement missing TO keyword")?;
98
99    let before_to = trimmed[4..to_pos].trim(); // skip "COPY"
100    let after_to = trimmed[to_pos + 2..].trim(); // skip "TO"
101
102    // Parse table spec (before TO): [ks.]table [(col1, col2)]
103    let (keyspace, table, columns) = parse_table_spec(before_to)?;
104
105    // Parse target and options (after TO): 'filename'|STDOUT [WITH ...]
106    let (filename, options_str) = parse_target_and_options(after_to)?;
107
108    let options = if let Some(opts) = options_str {
109        parse_options(&opts)?
110    } else {
111        CopyOptions::default()
112    };
113
114    Ok(CopyToCommand {
115        keyspace,
116        table,
117        columns,
118        filename,
119        options,
120    })
121}
122
123/// Execute a COPY TO command, writing results as CSV.
124pub async fn execute_copy_to(
125    session: &CqlSession,
126    cmd: &CopyToCommand,
127    current_keyspace: Option<&str>,
128) -> Result<()> {
129    // Build SELECT query
130    let col_spec = match &cmd.columns {
131        Some(cols) => cols.join(", "),
132        None => "*".to_string(),
133    };
134
135    let table_spec = match (&cmd.keyspace, current_keyspace) {
136        (Some(ks), _) => format!("{}.{}", ks, cmd.table),
137        (None, Some(ks)) => format!("{}.{}", ks, cmd.table),
138        (None, None) => cmd.table.clone(),
139    };
140
141    let query = format!("SELECT {} FROM {}", col_spec, table_spec);
142
143    let result = session.execute_query(&query).await?;
144
145    // Set up CSV writer
146    let mut row_count: usize = 0;
147
148    match &cmd.filename {
149        CopyTarget::File(path) => {
150            let file = std::fs::File::create(path)
151                .with_context(|| format!("failed to create file: {}", path.display()))?;
152            let buf = std::io::BufWriter::new(file);
153            let mut wtr = build_csv_writer(&cmd.options, buf);
154
155            if cmd.options.header {
156                let headers: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
157                wtr.write_record(&headers)?;
158            }
159
160            for row in &result.rows {
161                if let Some(max) = cmd.options.max_output_size {
162                    if row_count >= max {
163                        break;
164                    }
165                }
166                let fields: Vec<String> = row
167                    .values
168                    .iter()
169                    .map(|v| format_value_for_csv(v, &cmd.options))
170                    .collect();
171                wtr.write_record(&fields)?;
172                row_count += 1;
173
174                if let Some(freq) = cmd.options.report_frequency {
175                    if freq > 0 && row_count.is_multiple_of(freq) {
176                        eprintln!("Processed {} rows...", row_count);
177                    }
178                }
179            }
180
181            wtr.flush()?;
182            println!("{} rows exported to '{}'.", row_count, path.display());
183        }
184        CopyTarget::Stdout => {
185            let stdout = std::io::stdout();
186            let handle = stdout.lock();
187            let mut wtr = build_csv_writer(&cmd.options, handle);
188
189            if cmd.options.header {
190                let headers: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
191                wtr.write_record(&headers)?;
192            }
193
194            for row in &result.rows {
195                if let Some(max) = cmd.options.max_output_size {
196                    if row_count >= max {
197                        break;
198                    }
199                }
200                let fields: Vec<String> = row
201                    .values
202                    .iter()
203                    .map(|v| format_value_for_csv(v, &cmd.options))
204                    .collect();
205                wtr.write_record(&fields)?;
206                row_count += 1;
207
208                if let Some(freq) = cmd.options.report_frequency {
209                    if freq > 0 && row_count.is_multiple_of(freq) {
210                        eprintln!("Processed {} rows...", row_count);
211                    }
212                }
213            }
214
215            wtr.flush()?;
216            eprintln!("{} rows exported to STDOUT.", row_count);
217        }
218        CopyTarget::Stdin => {
219            bail!("COPY TO cannot write to STDIN");
220        }
221    }
222
223    Ok(())
224}
225
226/// Format a single CQL value for CSV output according to the given options.
227pub fn format_value_for_csv(value: &CqlValue, options: &CopyOptions) -> String {
228    match value {
229        CqlValue::Null | CqlValue::Unset => options.null_val.clone(),
230        CqlValue::Text(s) | CqlValue::Ascii(s) => s.clone(),
231        CqlValue::Boolean(b) => {
232            if *b {
233                options.bool_style.0.clone()
234            } else {
235                options.bool_style.1.clone()
236            }
237        }
238        CqlValue::Int(v) => v.to_string(),
239        CqlValue::BigInt(v) => v.to_string(),
240        CqlValue::SmallInt(v) => v.to_string(),
241        CqlValue::TinyInt(v) => v.to_string(),
242        CqlValue::Counter(v) => v.to_string(),
243        CqlValue::Varint(v) => v.to_string(),
244        CqlValue::Float(v) => format_float(*v as f64, options.float_precision, options),
245        CqlValue::Double(v) => format_float(*v, options.double_precision, options),
246        CqlValue::Decimal(v) => {
247            let s = v.to_string();
248            if options.decimal_sep != '.' {
249                s.replace('.', &options.decimal_sep.to_string())
250            } else {
251                s
252            }
253        }
254        CqlValue::Timestamp(millis) => format_timestamp(*millis, options),
255        CqlValue::Uuid(u) | CqlValue::TimeUuid(u) => u.to_string(),
256        CqlValue::Blob(bytes) => {
257            let mut s = String::with_capacity(2 + bytes.len() * 2);
258            s.push_str("0x");
259            for b in bytes {
260                s.push_str(&format!("{b:02x}"));
261            }
262            s
263        }
264        CqlValue::Date(d) => d.to_string(),
265        CqlValue::Time(t) => t.to_string(),
266        CqlValue::Duration {
267            months,
268            days,
269            nanoseconds,
270        } => format!("{months}mo{days}d{nanoseconds}ns"),
271        CqlValue::Inet(addr) => addr.to_string(),
272        // Collection types: use CQL literal format via Display
273        CqlValue::List(_)
274        | CqlValue::Set(_)
275        | CqlValue::Map(_)
276        | CqlValue::Tuple(_)
277        | CqlValue::UserDefinedType { .. } => value.to_string(),
278    }
279}
280
281// ---------------------------------------------------------------------------
282// Internal helpers
283// ---------------------------------------------------------------------------
284
285/// Build a csv::Writer with the configured options.
286fn build_csv_writer<W: Write>(options: &CopyOptions, writer: W) -> csv::Writer<W> {
287    csv::WriterBuilder::new()
288        .delimiter(options.delimiter as u8)
289        .quote(options.quote as u8)
290        .escape(options.escape as u8)
291        .double_quote(false)
292        .from_writer(writer)
293}
294
295/// Format a floating-point number with given precision and decimal separator.
296fn format_float(v: f64, precision: usize, options: &CopyOptions) -> String {
297    if v.is_nan() {
298        return "NaN".to_string();
299    }
300    if v.is_infinite() {
301        return if v.is_sign_positive() {
302            "Infinity".to_string()
303        } else {
304            "-Infinity".to_string()
305        };
306    }
307    let s = format!("{v:.prec$}", prec = precision);
308    if options.decimal_sep != '.' {
309        s.replace('.', &options.decimal_sep.to_string())
310    } else {
311        s
312    }
313}
314
315/// Format a timestamp (millis since epoch) using the configured format.
316fn format_timestamp(millis: i64, options: &CopyOptions) -> String {
317    match DateTime::from_timestamp_millis(millis) {
318        Some(dt) => {
319            let utc: DateTime<Utc> = dt;
320            match &options.datetime_format {
321                Some(fmt) => utc.format(fmt).to_string(),
322                None => utc.format("%Y-%m-%d %H:%M:%S%.6f%z").to_string(),
323            }
324        }
325        None => format!("<invalid timestamp: {millis}>"),
326    }
327}
328
329/// Find a keyword in the string that is not inside parentheses.
330/// Returns the byte offset of the keyword start.
331fn find_keyword_outside_parens(s: &str, keyword: &str) -> Option<usize> {
332    let upper = s.to_uppercase();
333    let kw_upper = keyword.to_uppercase();
334    let kw_len = kw_upper.len();
335    let mut depth: i32 = 0;
336    let mut in_quote = false;
337    let mut quote_char: char = '\'';
338    let bytes = s.as_bytes();
339
340    for (i, ch) in s.char_indices() {
341        if in_quote {
342            if ch == quote_char {
343                in_quote = false;
344            }
345            continue;
346        }
347        match ch {
348            '\'' | '"' => {
349                in_quote = true;
350                quote_char = ch;
351            }
352            '(' => depth += 1,
353            ')' => depth -= 1,
354            _ => {}
355        }
356        if depth == 0 && !in_quote {
357            // Check if keyword matches at this position, surrounded by word boundaries
358            if i + kw_len <= upper.len() && upper[i..i + kw_len] == *kw_upper {
359                // Check word boundaries
360                let before_ok =
361                    i == 0 || !(bytes[i - 1].is_ascii_alphanumeric() || bytes[i - 1] == b'_');
362                let after_ok = i + kw_len >= s.len()
363                    || !(bytes[i + kw_len].is_ascii_alphanumeric() || bytes[i + kw_len] == b'_');
364                if before_ok && after_ok {
365                    return Some(i);
366                }
367            }
368        }
369    }
370    None
371}
372
373/// Parse the table spec: `[ks.]table [(col1, col2, ...)]`
374fn parse_table_spec(spec: &str) -> Result<(Option<String>, String, Option<Vec<String>>)> {
375    let spec = spec.trim();
376
377    // Split off column list if present
378    let (table_part, columns) = if let Some(paren_start) = spec.find('(') {
379        let paren_end = spec
380            .rfind(')')
381            .context("unmatched parenthesis in column list")?;
382        let cols_str = &spec[paren_start + 1..paren_end];
383        let cols: Vec<String> = cols_str
384            .split(',')
385            .map(|c| c.trim().to_string())
386            .filter(|c| !c.is_empty())
387            .collect();
388        (spec[..paren_start].trim(), Some(cols))
389    } else {
390        (spec, None)
391    };
392
393    // Split keyspace.table
394    let (keyspace, table) = if let Some(dot_pos) = table_part.find('.') {
395        let ks = table_part[..dot_pos].trim().to_string();
396        let tbl = table_part[dot_pos + 1..].trim().to_string();
397        (Some(ks), tbl)
398    } else {
399        (None, table_part.trim().to_string())
400    };
401
402    if table.is_empty() {
403        bail!("missing table name in COPY statement");
404    }
405
406    Ok((keyspace, table, columns))
407}
408
409/// Parse the target and WITH options after the TO keyword.
410/// Returns `(CopyTarget, Option<options_string>)`.
411fn parse_target_and_options(after_to: &str) -> Result<(CopyTarget, Option<String>)> {
412    let after_to = after_to.trim();
413
414    // Find WITH keyword (case-insensitive) outside of quotes
415    let with_pos = find_keyword_outside_parens(after_to, "WITH");
416
417    let (target_str, options_str) = match with_pos {
418        Some(pos) => {
419            let target = after_to[..pos].trim();
420            let opts = after_to[pos + 4..].trim(); // skip "WITH"
421            (target, Some(opts.to_string()))
422        }
423        None => (after_to, None),
424    };
425
426    let target_str = target_str.trim();
427
428    let target = if target_str.eq_ignore_ascii_case("STDOUT") {
429        CopyTarget::Stdout
430    } else {
431        // Strip surrounding quotes (single quotes)
432        let path_str = if (target_str.starts_with('\'') && target_str.ends_with('\''))
433            || (target_str.starts_with('"') && target_str.ends_with('"'))
434        {
435            &target_str[1..target_str.len() - 1]
436        } else {
437            target_str
438        };
439        CopyTarget::File(PathBuf::from(path_str))
440    };
441
442    Ok((target, options_str))
443}
444
445/// Parse `opt1=val1 AND opt2=val2 ...` pairs into `CopyOptions`.
446fn parse_options(options_str: &str) -> Result<CopyOptions> {
447    let mut opts = CopyOptions::default();
448
449    // Split on AND (case-insensitive)
450    let parts = split_on_and(options_str);
451
452    for part in parts {
453        let part = part.trim();
454        if part.is_empty() {
455            continue;
456        }
457
458        let eq_pos = part
459            .find('=')
460            .with_context(|| format!("invalid option (missing '='): {part}"))?;
461        let key = part[..eq_pos].trim().to_uppercase();
462        let val = unquote(part[eq_pos + 1..].trim());
463
464        match key.as_str() {
465            "DELIMITER" => {
466                opts.delimiter = val
467                    .chars()
468                    .next()
469                    .context("DELIMITER must be a single character")?;
470            }
471            "QUOTE" => {
472                opts.quote = val
473                    .chars()
474                    .next()
475                    .context("QUOTE must be a single character")?;
476            }
477            "ESCAPE" => {
478                opts.escape = val
479                    .chars()
480                    .next()
481                    .context("ESCAPE must be a single character")?;
482            }
483            "HEADER" => {
484                opts.header = parse_bool_option(&val)?;
485            }
486            "NULL" | "NULLVAL" => {
487                opts.null_val = val;
488            }
489            "DATETIMEFORMAT" => {
490                opts.datetime_format = if val.is_empty() { None } else { Some(val) };
491            }
492            "ENCODING" => {
493                opts.encoding = val;
494            }
495            "FLOATPRECISION" => {
496                opts.float_precision = val.parse().context("FLOATPRECISION must be an integer")?;
497            }
498            "DOUBLEPRECISION" => {
499                opts.double_precision =
500                    val.parse().context("DOUBLEPRECISION must be an integer")?;
501            }
502            "DECIMALSEP" => {
503                opts.decimal_sep = val
504                    .chars()
505                    .next()
506                    .context("DECIMALSEP must be a single character")?;
507            }
508            "THOUSANDSSEP" => {
509                opts.thousands_sep = val.chars().next();
510            }
511            "BOOLSTYLE" => {
512                // Format: "True:False"
513                let parts: Vec<&str> = val.splitn(2, ':').collect();
514                if parts.len() == 2 {
515                    opts.bool_style = (parts[0].to_string(), parts[1].to_string());
516                } else {
517                    bail!("BOOLSTYLE must be in format 'TrueVal:FalseVal'");
518                }
519            }
520            "PAGESIZE" => {
521                opts.page_size = val.parse().context("PAGESIZE must be an integer")?;
522            }
523            "MAXOUTPUTSIZE" => {
524                let n: usize = val.parse().context("MAXOUTPUTSIZE must be an integer")?;
525                opts.max_output_size = Some(n);
526            }
527            "REPORTFREQUENCY" => {
528                let n: usize = val.parse().context("REPORTFREQUENCY must be an integer")?;
529                opts.report_frequency = if n == 0 { None } else { Some(n) };
530            }
531            other => {
532                bail!("unknown COPY option: {other}");
533            }
534        }
535    }
536
537    Ok(opts)
538}
539
540/// Split a string on `AND` keywords (case-insensitive), not inside quotes.
541fn split_on_and(s: &str) -> Vec<String> {
542    let mut parts = Vec::new();
543    let mut current = String::new();
544    let upper = s.to_uppercase();
545    let chars: Vec<char> = s.chars().collect();
546    let upper_chars: Vec<char> = upper.chars().collect();
547    let len = chars.len();
548    let mut i = 0;
549    let mut in_quote = false;
550    let mut quote_char = '\'';
551
552    while i < len {
553        if in_quote {
554            if chars[i] == quote_char {
555                in_quote = false;
556            }
557            current.push(chars[i]);
558            i += 1;
559            continue;
560        }
561
562        if chars[i] == '\'' || chars[i] == '"' {
563            in_quote = true;
564            quote_char = chars[i];
565            current.push(chars[i]);
566            i += 1;
567            continue;
568        }
569
570        // Check for " AND " pattern
571        if i + 5 <= len
572            && (i == 0 || chars[i].is_whitespace())
573            && upper_chars[i..].iter().collect::<String>().starts_with(
574                if chars[i].is_whitespace() {
575                    " AND "
576                } else {
577                    "AND "
578                },
579            )
580        {
581            // More precise check
582            let remaining: String = upper_chars[i..].iter().collect();
583            if remaining.starts_with(" AND ") {
584                parts.push(current.clone());
585                current.clear();
586                i += 5; // skip " AND "
587                continue;
588            }
589        }
590
591        current.push(chars[i]);
592        i += 1;
593    }
594
595    if !current.is_empty() {
596        parts.push(current);
597    }
598
599    parts
600}
601
602/// Remove surrounding single or double quotes from a value.
603fn unquote(s: &str) -> String {
604    let s = s.trim();
605    if s.len() >= 2
606        && ((s.starts_with('\'') && s.ends_with('\'')) || (s.starts_with('"') && s.ends_with('"')))
607    {
608        return s[1..s.len() - 1].to_string();
609    }
610    s.to_string()
611}
612
613/// Parse a boolean option value (true/false, yes/no, 1/0).
614fn parse_bool_option(val: &str) -> Result<bool> {
615    match val.to_lowercase().as_str() {
616        "true" | "yes" | "1" => Ok(true),
617        "false" | "no" | "0" => Ok(false),
618        _ => bail!("invalid boolean value: {val}"),
619    }
620}
621
622// ===========================================================================
623// COPY FROM implementation — imports CSV data into a table.
624// ===========================================================================
625
626/// Options specific to COPY FROM (CSV import).
627#[derive(Debug, Clone)]
628pub struct CopyFromOptions {
629    // Shared format options
630    pub delimiter: char,
631    pub quote: char,
632    pub escape: char,
633    pub header: bool,
634    pub null_val: String,
635    pub datetime_format: Option<String>,
636    pub encoding: String,
637    // COPY FROM specific
638    pub chunk_size: usize,
639    pub max_batch_size: usize,
640    pub min_batch_size: usize,
641    pub prepared_statements: bool,
642    pub ttl: Option<u64>,
643    pub max_attempts: usize,
644    pub max_parse_errors: Option<usize>,
645    pub max_insert_errors: Option<usize>,
646    pub err_file: Option<PathBuf>,
647    pub report_frequency: Option<usize>,
648    pub ingest_rate: Option<usize>,
649    pub num_processes: usize,
650}
651
652impl Default for CopyFromOptions {
653    fn default() -> Self {
654        Self {
655            delimiter: ',',
656            quote: '"',
657            escape: '\\',
658            header: false,
659            null_val: String::new(),
660            datetime_format: None,
661            encoding: "utf-8".to_string(),
662            chunk_size: 5000,
663            max_batch_size: 20,
664            min_batch_size: 2,
665            prepared_statements: true,
666            ttl: None,
667            max_attempts: 5,
668            max_parse_errors: None,
669            max_insert_errors: None,
670            err_file: None,
671            report_frequency: None,
672            ingest_rate: None,
673            num_processes: 1,
674        }
675    }
676}
677
678/// A parsed COPY FROM command.
679#[derive(Debug, Clone)]
680pub struct CopyFromCommand {
681    pub keyspace: Option<String>,
682    pub table: String,
683    pub columns: Option<Vec<String>>,
684    pub source: CopyTarget,
685    pub options: CopyFromOptions,
686}
687
688/// Parse a `COPY ... FROM ...` statement.
689///
690/// Format: `COPY [ks.]table [(col1, col2)] FROM 'filename'|STDIN [WITH opt=val AND ...]`
691pub fn parse_copy_from(input: &str) -> Result<CopyFromCommand> {
692    let trimmed = input.trim().trim_end_matches(';').trim();
693
694    let upper = trimmed.to_uppercase();
695    if !upper.starts_with("COPY ") {
696        bail!("not a COPY statement");
697    }
698
699    // Find the FROM keyword (case-insensitive), but not inside parentheses
700    let from_pos = find_keyword_outside_parens(trimmed, "FROM")
701        .context("COPY statement missing FROM keyword")?;
702
703    let before_from = trimmed[4..from_pos].trim(); // skip "COPY"
704    let after_from = trimmed[from_pos + 4..].trim(); // skip "FROM"
705
706    // Parse table spec (before FROM): [ks.]table [(col1, col2)]
707    let (keyspace, table, columns) = parse_table_spec(before_from)?;
708
709    // Parse source and options (after FROM): 'filename'|STDIN [WITH ...]
710    let (source, options_str) = parse_source_and_options(after_from)?;
711
712    let options = if let Some(opts) = options_str {
713        parse_copy_from_options(&opts)?
714    } else {
715        CopyFromOptions::default()
716    };
717
718    Ok(CopyFromCommand {
719        keyspace,
720        table,
721        columns,
722        source,
723        options,
724    })
725}
726
727/// Parse the source and WITH options after the FROM keyword.
728/// Returns `(CopyTarget, Option<options_string>)`.
729fn parse_source_and_options(after_from: &str) -> Result<(CopyTarget, Option<String>)> {
730    let after_from = after_from.trim();
731
732    let with_pos = find_keyword_outside_parens(after_from, "WITH");
733
734    let (source_str, options_str) = match with_pos {
735        Some(pos) => {
736            let source = after_from[..pos].trim();
737            let opts = after_from[pos + 4..].trim();
738            (source, Some(opts.to_string()))
739        }
740        None => (after_from, None),
741    };
742
743    let source_str = source_str.trim();
744
745    let source = if source_str.eq_ignore_ascii_case("STDIN") {
746        CopyTarget::Stdin
747    } else {
748        let path_str = if (source_str.starts_with('\'') && source_str.ends_with('\''))
749            || (source_str.starts_with('"') && source_str.ends_with('"'))
750        {
751            &source_str[1..source_str.len() - 1]
752        } else {
753            source_str
754        };
755        CopyTarget::File(PathBuf::from(path_str))
756    };
757
758    Ok((source, options_str))
759}
760
761/// Parse `opt1=val1 AND opt2=val2 ...` pairs into `CopyFromOptions`.
762fn parse_copy_from_options(options_str: &str) -> Result<CopyFromOptions> {
763    let mut opts = CopyFromOptions::default();
764
765    let parts = split_on_and(options_str);
766
767    for part in parts {
768        let part = part.trim();
769        if part.is_empty() {
770            continue;
771        }
772
773        let eq_pos = part
774            .find('=')
775            .with_context(|| format!("invalid option (missing '='): {part}"))?;
776        let key = part[..eq_pos].trim().to_uppercase();
777        let val = unquote(part[eq_pos + 1..].trim());
778
779        match key.as_str() {
780            "DELIMITER" => {
781                opts.delimiter = val
782                    .chars()
783                    .next()
784                    .context("DELIMITER must be a single character")?;
785            }
786            "QUOTE" => {
787                opts.quote = val
788                    .chars()
789                    .next()
790                    .context("QUOTE must be a single character")?;
791            }
792            "ESCAPE" => {
793                opts.escape = val
794                    .chars()
795                    .next()
796                    .context("ESCAPE must be a single character")?;
797            }
798            "HEADER" => {
799                opts.header = parse_bool_option(&val)?;
800            }
801            "NULL" | "NULLVAL" => {
802                opts.null_val = val;
803            }
804            "DATETIMEFORMAT" => {
805                opts.datetime_format = if val.is_empty() { None } else { Some(val) };
806            }
807            "ENCODING" => {
808                opts.encoding = val;
809            }
810            "CHUNKSIZE" => {
811                opts.chunk_size = val.parse().context("CHUNKSIZE must be an integer")?;
812            }
813            "MAXBATCHSIZE" => {
814                opts.max_batch_size = val.parse().context("MAXBATCHSIZE must be an integer")?;
815            }
816            "MINBATCHSIZE" => {
817                opts.min_batch_size = val.parse().context("MINBATCHSIZE must be an integer")?;
818            }
819            "PREPAREDSTATEMENTS" => {
820                opts.prepared_statements = parse_bool_option(&val)?;
821            }
822            "TTL" => {
823                let n: u64 = val.parse().context("TTL must be a positive integer")?;
824                opts.ttl = Some(n);
825            }
826            "MAXATTEMPTS" => {
827                opts.max_attempts = val.parse().context("MAXATTEMPTS must be an integer")?;
828            }
829            "MAXPARSEERRORS" => {
830                if val == "-1" {
831                    opts.max_parse_errors = None;
832                } else {
833                    let n: usize = val.parse().context("MAXPARSEERRORS must be an integer")?;
834                    opts.max_parse_errors = Some(n);
835                }
836            }
837            "MAXINSERTERRORS" => {
838                if val == "-1" {
839                    opts.max_insert_errors = None;
840                } else {
841                    let n: usize = val.parse().context("MAXINSERTERRORS must be an integer")?;
842                    opts.max_insert_errors = Some(n);
843                }
844            }
845            "ERRFILE" | "ERRORSFILE" => {
846                opts.err_file = if val.is_empty() {
847                    None
848                } else {
849                    Some(PathBuf::from(val))
850                };
851            }
852            "REPORTFREQUENCY" => {
853                let n: usize = val.parse().context("REPORTFREQUENCY must be an integer")?;
854                opts.report_frequency = if n == 0 { None } else { Some(n) };
855            }
856            "INGESTRATE" => {
857                if val == "-1" || val == "0" {
858                    opts.ingest_rate = None;
859                } else {
860                    let n: usize = val.parse().context("INGESTRATE must be an integer")?;
861                    opts.ingest_rate = Some(n);
862                }
863            }
864            "NUMPROCESSES" => {
865                let n: usize = val.parse().context("NUMPROCESSES must be an integer")?;
866                opts.num_processes = n.max(1);
867            }
868            other => {
869                bail!("unknown COPY FROM option: {other}");
870            }
871        }
872    }
873
874    Ok(opts)
875}
876
877// ---------------------------------------------------------------------------
878// Type-aware CSV ↔ CQL conversion
879// ---------------------------------------------------------------------------
880
881/// Convert a CSV string field to a typed `CqlValue` based on the CQL column type.
882///
883/// `type_name` is the raw CQL type string from `system_schema.columns.type`
884/// (e.g. `"int"`, `"text"`, `"list<int>"`, `"frozen<set<uuid>>"`).
885/// Complex nested types (list, set, map, tuple, frozen, udt) are preserved as
886/// `CqlValue::Text` literals — the database will parse them via the unprepared path.
887pub fn csv_str_to_cql_value(field: &str, type_name: &str, null_val: &str) -> Result<CqlValue> {
888    // Null check (exact match with configured null_val, or empty string when null_val is empty)
889    if field == null_val || (null_val.is_empty() && field.is_empty()) {
890        return Ok(CqlValue::Null);
891    }
892
893    let base_type = strip_frozen(type_name).to_lowercase();
894    let base_type = base_type.as_str();
895
896    match base_type {
897        "ascii" => Ok(CqlValue::Ascii(field.to_string())),
898        "text" | "varchar" => Ok(CqlValue::Text(field.to_string())),
899        "boolean" => {
900            let b = match field.to_lowercase().as_str() {
901                "true" | "yes" | "on" | "1" => true,
902                "false" | "no" | "off" | "0" => false,
903                _ => bail!("invalid boolean value: {field:?}"),
904            };
905            Ok(CqlValue::Boolean(b))
906        }
907        "int" => Ok(CqlValue::Int(
908            field
909                .parse::<i32>()
910                .with_context(|| format!("invalid int: {field:?}"))?,
911        )),
912        "bigint" | "counter" => Ok(CqlValue::BigInt(
913            field
914                .parse::<i64>()
915                .with_context(|| format!("invalid bigint: {field:?}"))?,
916        )),
917        "smallint" => Ok(CqlValue::SmallInt(
918            field
919                .parse::<i16>()
920                .with_context(|| format!("invalid smallint: {field:?}"))?,
921        )),
922        "tinyint" => Ok(CqlValue::TinyInt(
923            field
924                .parse::<i8>()
925                .with_context(|| format!("invalid tinyint: {field:?}"))?,
926        )),
927        "float" => Ok(CqlValue::Float(
928            field
929                .parse::<f32>()
930                .with_context(|| format!("invalid float: {field:?}"))?,
931        )),
932        "double" => Ok(CqlValue::Double(
933            field
934                .parse::<f64>()
935                .with_context(|| format!("invalid double: {field:?}"))?,
936        )),
937        "uuid" => {
938            let u =
939                uuid::Uuid::parse_str(field).with_context(|| format!("invalid uuid: {field:?}"))?;
940            Ok(CqlValue::Uuid(u))
941        }
942        "timeuuid" => {
943            let u = uuid::Uuid::parse_str(field)
944                .with_context(|| format!("invalid timeuuid: {field:?}"))?;
945            Ok(CqlValue::TimeUuid(u))
946        }
947        "timestamp" => {
948            // Try RFC 3339 first (handles 'Z' and offset formats)
949            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(field) {
950                return Ok(CqlValue::Timestamp(dt.timestamp_millis()));
951            }
952            // Try space-separated ISO-8601 with numeric offset
953            let formats = [
954                "%Y-%m-%d %H:%M:%S%.f%z",
955                "%Y-%m-%dT%H:%M:%S%.f%z",
956                "%Y-%m-%dT%H:%M:%S%z",
957                "%Y-%m-%d %H:%M:%S%z",
958                "%Y-%m-%d %H:%M:%S%.3f+0000",
959            ];
960            for fmt in &formats {
961                if let Ok(dt) = DateTime::parse_from_str(field, fmt) {
962                    return Ok(CqlValue::Timestamp(dt.timestamp_millis()));
963                }
964            }
965            // Try plain date (midnight UTC)
966            if let Ok(d) = NaiveDate::parse_from_str(field, "%Y-%m-%d") {
967                let dt = d.and_hms_opt(0, 0, 0).unwrap();
968                return Ok(CqlValue::Timestamp(dt.and_utc().timestamp_millis()));
969            }
970            // Try milliseconds since epoch as a number
971            if let Ok(ms) = field.parse::<i64>() {
972                return Ok(CqlValue::Timestamp(ms));
973            }
974            bail!("invalid timestamp: {field:?}")
975        }
976        "date" => {
977            let d = NaiveDate::parse_from_str(field, "%Y-%m-%d")
978                .with_context(|| format!("invalid date (expected YYYY-MM-DD): {field:?}"))?;
979            Ok(CqlValue::Date(d))
980        }
981        "time" => {
982            // Accept HH:MM:SS, HH:MM:SS.nnn, HH:MM:SS.nnnnnnnnn
983            let formats = ["%H:%M:%S%.f", "%H:%M:%S"];
984            for fmt in &formats {
985                if let Ok(t) = NaiveTime::parse_from_str(field, fmt) {
986                    return Ok(CqlValue::Time(t));
987                }
988            }
989            bail!("invalid time (expected HH:MM:SS[.nnn]): {field:?}")
990        }
991        "inet" => {
992            let addr = field
993                .parse::<IpAddr>()
994                .with_context(|| format!("invalid inet: {field:?}"))?;
995            Ok(CqlValue::Inet(addr))
996        }
997        "blob" => {
998            // Accept "0x..." hex or plain hex
999            let hex = field.strip_prefix("0x").unwrap_or(field);
1000            if !hex.len().is_multiple_of(2) {
1001                bail!("invalid blob (odd number of hex digits): {field:?}");
1002            }
1003            let bytes = (0..hex.len())
1004                .step_by(2)
1005                .map(|i| {
1006                    u8::from_str_radix(&hex[i..i + 2], 16)
1007                        .with_context(|| format!("invalid hex byte at offset {i}: {field:?}"))
1008                })
1009                .collect::<Result<Vec<u8>>>()?;
1010            Ok(CqlValue::Blob(bytes))
1011        }
1012        "varint" => {
1013            let n = field
1014                .parse::<num_bigint::BigInt>()
1015                .with_context(|| format!("invalid varint: {field:?}"))?;
1016            Ok(CqlValue::Varint(n))
1017        }
1018        "decimal" => {
1019            let d = field
1020                .parse::<bigdecimal::BigDecimal>()
1021                .with_context(|| format!("invalid decimal: {field:?}"))?;
1022            Ok(CqlValue::Decimal(d))
1023        }
1024        // duration, list<*>, set<*>, map<*>, tuple<*>, and unknown types:
1025        // pass through as Text; the database parses the CQL literal.
1026        _ => Ok(CqlValue::Text(field.to_string())),
1027    }
1028}
1029
1030/// Strip the `frozen<...>` wrapper from a CQL type name, if present.
1031fn strip_frozen(type_name: &str) -> &str {
1032    let lower = type_name.to_lowercase();
1033    if lower.starts_with("frozen<") && type_name.ends_with('>') {
1034        &type_name[7..type_name.len() - 1]
1035    } else {
1036        type_name
1037    }
1038}
1039
1040/// Convert a `CqlValue` to a CQL insert literal string.
1041///
1042/// Used in the unprepared (string-based) INSERT path. Produces values that can
1043/// be embedded directly into a CQL statement without further quoting by the
1044/// caller.
1045fn cql_value_to_insert_literal(v: &CqlValue) -> String {
1046    match v {
1047        CqlValue::Null | CqlValue::Unset => "null".to_string(),
1048        CqlValue::Text(s) | CqlValue::Ascii(s) => {
1049            format!("'{}'", s.replace('\'', "''"))
1050        }
1051        CqlValue::Boolean(b) => if *b { "true" } else { "false" }.to_string(),
1052        CqlValue::Int(n) => n.to_string(),
1053        CqlValue::BigInt(n) | CqlValue::Counter(n) => n.to_string(),
1054        CqlValue::SmallInt(n) => n.to_string(),
1055        CqlValue::TinyInt(n) => n.to_string(),
1056        CqlValue::Float(f) => {
1057            if f.is_nan() {
1058                "NaN".to_string()
1059            } else if f.is_infinite() {
1060                if f.is_sign_positive() {
1061                    "Infinity".to_string()
1062                } else {
1063                    "-Infinity".to_string()
1064                }
1065            } else {
1066                f.to_string()
1067            }
1068        }
1069        CqlValue::Double(d) => {
1070            if d.is_nan() {
1071                "NaN".to_string()
1072            } else if d.is_infinite() {
1073                if d.is_sign_positive() {
1074                    "Infinity".to_string()
1075                } else {
1076                    "-Infinity".to_string()
1077                }
1078            } else {
1079                d.to_string()
1080            }
1081        }
1082        CqlValue::Varint(n) => n.to_string(),
1083        CqlValue::Decimal(d) => d.to_string(),
1084        CqlValue::Uuid(u) | CqlValue::TimeUuid(u) => u.to_string(),
1085        CqlValue::Timestamp(ms) => {
1086            // Format as ISO-8601 string, quoted
1087            match DateTime::from_timestamp_millis(*ms) {
1088                Some(dt) => {
1089                    let utc: DateTime<Utc> = dt;
1090                    format!("'{}'", utc.format("%Y-%m-%d %H:%M:%S%.6f+0000"))
1091                }
1092                None => format!("{ms}"),
1093            }
1094        }
1095        CqlValue::Date(d) => format!("'{d}'"),
1096        CqlValue::Time(t) => format!("'{t}'"),
1097        CqlValue::Inet(addr) => format!("'{addr}'"),
1098        CqlValue::Blob(bytes) => {
1099            let mut s = String::with_capacity(2 + bytes.len() * 2);
1100            s.push_str("0x");
1101            for b in bytes {
1102                s.push_str(&format!("{b:02x}"));
1103            }
1104            s
1105        }
1106        CqlValue::Duration {
1107            months,
1108            days,
1109            nanoseconds,
1110        } => {
1111            format!("{months}mo{days}d{nanoseconds}ns")
1112        }
1113        // Collections and UDTs: use Display which outputs CQL literal format
1114        CqlValue::List(_)
1115        | CqlValue::Set(_)
1116        | CqlValue::Map(_)
1117        | CqlValue::Tuple(_)
1118        | CqlValue::UserDefinedType { .. } => v.to_string(),
1119    }
1120}
1121
1122// ---------------------------------------------------------------------------
1123// Token bucket rate limiter for INGESTRATE
1124// ---------------------------------------------------------------------------
1125
1126/// Simple token bucket for rate limiting row inserts.
1127struct TokenBucket {
1128    rate: f64,
1129    tokens: f64,
1130    last: Instant,
1131}
1132
1133impl TokenBucket {
1134    fn new(rows_per_second: usize) -> Self {
1135        Self {
1136            rate: rows_per_second as f64,
1137            tokens: rows_per_second as f64,
1138            last: Instant::now(),
1139        }
1140    }
1141
1142    async fn acquire(&mut self) {
1143        let now = Instant::now();
1144        let elapsed = now.duration_since(self.last).as_secs_f64();
1145        self.tokens = (self.tokens + elapsed * self.rate).min(self.rate);
1146        self.last = now;
1147
1148        if self.tokens < 1.0 {
1149            let wait_secs = (1.0 - self.tokens) / self.rate;
1150            tokio::time::sleep(Duration::from_secs_f64(wait_secs)).await;
1151            self.tokens = 0.0;
1152        } else {
1153            self.tokens -= 1.0;
1154        }
1155    }
1156}
1157
1158/// Execute a COPY FROM command, importing CSV data into a table.
1159pub async fn execute_copy_from(
1160    session: &CqlSession,
1161    cmd: &CopyFromCommand,
1162    current_keyspace: Option<&str>,
1163) -> Result<()> {
1164    let start = Instant::now();
1165
1166    // Resolve keyspace and table spec
1167    let table_spec = match (&cmd.keyspace, current_keyspace) {
1168        (Some(ks), _) => format!("{}.{}", ks, cmd.table),
1169        (None, Some(ks)) => format!("{}.{}", ks, cmd.table),
1170        (None, None) => cmd.table.clone(),
1171    };
1172
1173    let source_name = match &cmd.source {
1174        CopyTarget::File(path) => format!("'{}'", path.display()),
1175        CopyTarget::Stdin => "STDIN".to_string(),
1176        CopyTarget::Stdout => unreachable!("COPY FROM cannot use STDOUT"),
1177    };
1178
1179    let ttl_clause = match cmd.options.ttl {
1180        Some(ttl) => format!(" USING TTL {ttl}"),
1181        None => String::new(),
1182    };
1183
1184    // Query schema for column metadata: (name, kind, position, type_name)
1185    let ks_for_schema = cmd
1186        .keyspace
1187        .as_deref()
1188        .or(current_keyspace)
1189        .context("no keyspace specified and no current keyspace set")?;
1190    let schema_query = format!(
1191        "SELECT column_name, kind, position, type FROM system_schema.columns \
1192         WHERE keyspace_name = '{}' AND table_name = '{}'",
1193        ks_for_schema, cmd.table
1194    );
1195    let schema_result = session.execute_query(&schema_query).await?;
1196
1197    // Collect and sort into CREATE TABLE order
1198    let mut schema_cols: Vec<(String, String, i32, String)> = Vec::new();
1199    for row in &schema_result.rows {
1200        let name = match row.values.first() {
1201            Some(CqlValue::Text(n)) => n.clone(),
1202            _ => continue,
1203        };
1204        let kind = match row.values.get(1) {
1205            Some(CqlValue::Text(k)) => k.clone(),
1206            _ => "regular".to_string(),
1207        };
1208        let position = match row.values.get(2) {
1209            Some(CqlValue::Int(p)) => *p,
1210            _ => -1,
1211        };
1212        let type_name = match row.values.get(3) {
1213            Some(CqlValue::Text(t)) => t.clone(),
1214            _ => "text".to_string(),
1215        };
1216        schema_cols.push((name, kind, position, type_name));
1217    }
1218    if schema_cols.is_empty() {
1219        bail!(
1220            "could not determine columns for table '{}' — table may not exist",
1221            table_spec
1222        );
1223    }
1224    schema_cols.sort_by(|a, b| {
1225        let kind_order = |k: &str| -> i32 {
1226            match k {
1227                "partition_key" => 0,
1228                "clustering" => 1,
1229                "static" => 2,
1230                _ => 3,
1231            }
1232        };
1233        kind_order(&a.1)
1234            .cmp(&kind_order(&b.1))
1235            .then(a.2.cmp(&b.2))
1236            .then(a.0.cmp(&b.0))
1237    });
1238
1239    // type lookup: column_name → type_name (for header-derived ordering)
1240    let type_map: std::collections::HashMap<String, String> = schema_cols
1241        .iter()
1242        .map(|(n, _, _, t)| (n.clone(), t.clone()))
1243        .collect();
1244
1245    // Preliminary column list: explicit columns or schema order
1246    let prelim_columns: Vec<(String, String)> = match &cmd.columns {
1247        Some(explicit) => explicit
1248            .iter()
1249            .map(|n| {
1250                let t = type_map
1251                    .get(n)
1252                    .cloned()
1253                    .unwrap_or_else(|| "text".to_string());
1254                (n.clone(), t)
1255            })
1256            .collect(),
1257        None => schema_cols.into_iter().map(|(n, _, _, t)| (n, t)).collect(),
1258    };
1259
1260    // Open CSV reader
1261    let reader: Box<dyn std::io::Read> = match &cmd.source {
1262        CopyTarget::File(path) => {
1263            let file = std::fs::File::open(path)
1264                .with_context(|| format!("failed to open file: {}", path.display()))?;
1265            Box::new(std::io::BufReader::new(file))
1266        }
1267        CopyTarget::Stdin => Box::new(std::io::stdin().lock()),
1268        CopyTarget::Stdout => bail!("COPY FROM cannot read from STDOUT"),
1269    };
1270    let mut csv_reader = csv::ReaderBuilder::new()
1271        .delimiter(cmd.options.delimiter as u8)
1272        .quote(cmd.options.quote as u8)
1273        .escape(Some(cmd.options.escape as u8))
1274        .has_headers(cmd.options.header)
1275        .flexible(true)
1276        .from_reader(reader);
1277
1278    // When HEADER=true and no explicit columns, column order comes from CSV header
1279    let columns: Vec<(String, String)> = if cmd.options.header && cmd.columns.is_none() {
1280        let headers = csv_reader
1281            .headers()
1282            .context("failed to read CSV header row")?;
1283        headers
1284            .iter()
1285            .map(|h| {
1286                let name = h.trim().to_string();
1287                let t = type_map
1288                    .get(&name)
1289                    .cloned()
1290                    .unwrap_or_else(|| "text".to_string());
1291                (name, t)
1292            })
1293            .collect()
1294    } else {
1295        prelim_columns
1296    };
1297
1298    let col_list: String = columns
1299        .iter()
1300        .map(|(n, _)| n.as_str())
1301        .collect::<Vec<_>>()
1302        .join(", ");
1303    let col_type_names: Vec<String> = columns.iter().map(|(_, t)| t.clone()).collect();
1304
1305    // Prepare INSERT statement (done after finalizing column list)
1306    let prepared_id = if cmd.options.prepared_statements {
1307        let placeholders = vec!["?"; columns.len()].join(", ");
1308        let insert_template =
1309            format!("INSERT INTO {table_spec} ({col_list}) VALUES ({placeholders}){ttl_clause}");
1310        Some(
1311            session
1312                .prepare(&insert_template)
1313                .await
1314                .with_context(|| format!("failed to prepare: {insert_template}"))?,
1315        )
1316    } else {
1317        None
1318    };
1319
1320    // Open error file
1321    let mut err_writer: Option<std::io::BufWriter<std::fs::File>> = match &cmd.options.err_file {
1322        Some(path) => {
1323            let file = std::fs::File::create(path)
1324                .with_context(|| format!("failed to create error file: {}", path.display()))?;
1325            Some(std::io::BufWriter::new(file))
1326        }
1327        None => None,
1328    };
1329
1330    let mut row_count: usize = 0;
1331    let mut parse_errors: usize = 0;
1332    let mut insert_errors: usize = 0;
1333    let mut rate_limiter = cmd.options.ingest_rate.map(TokenBucket::new);
1334    let num_processes = cmd.options.num_processes.max(1);
1335    let chunk_size = cmd.options.chunk_size.max(1);
1336
1337    let max_attempts = cmd.options.max_attempts;
1338    let max_parse_errors = cmd.options.max_parse_errors;
1339    let max_insert_errors = cmd.options.max_insert_errors;
1340    let report_frequency = cmd.options.report_frequency;
1341    let null_val = &cmd.options.null_val;
1342
1343    let mut csv_records = csv_reader.records();
1344
1345    'outer: loop {
1346        // --- Parse phase: fill a chunk of CHUNKSIZE typed rows ---
1347        let mut chunk: Vec<Vec<CqlValue>> = Vec::with_capacity(chunk_size);
1348
1349        'fill: loop {
1350            if chunk.len() >= chunk_size {
1351                break 'fill;
1352            }
1353            let record = match csv_records.next() {
1354                None => break 'fill,
1355                Some(Err(e)) => {
1356                    parse_errors += 1;
1357                    let msg = format!("CSV parse error on row {}: {e}", row_count + parse_errors);
1358                    eprintln!("{msg}");
1359                    if let Some(ref mut w) = err_writer {
1360                        let _ = writeln!(w, "{msg}");
1361                    }
1362                    if let Some(max) = max_parse_errors {
1363                        if parse_errors > max {
1364                            bail!("Exceeded maximum parse errors ({max}). Aborting.");
1365                        }
1366                    }
1367                    continue 'fill;
1368                }
1369                Some(Ok(r)) => r,
1370            };
1371
1372            if record.len() != col_type_names.len() {
1373                parse_errors += 1;
1374                let msg = format!(
1375                    "Row {}: expected {} columns but got {}",
1376                    row_count + parse_errors,
1377                    col_type_names.len(),
1378                    record.len()
1379                );
1380                eprintln!("{msg}");
1381                if let Some(ref mut w) = err_writer {
1382                    let _ = writeln!(w, "{msg}");
1383                }
1384                if let Some(max) = max_parse_errors {
1385                    if parse_errors > max {
1386                        bail!("Exceeded maximum number of parse errors ({max}). Aborting import.");
1387                    }
1388                }
1389                continue 'fill;
1390            }
1391
1392            let mut row_values: Vec<CqlValue> = Vec::with_capacity(col_type_names.len());
1393            let mut row_ok = true;
1394            for (field, type_name) in record.iter().zip(col_type_names.iter()) {
1395                match csv_str_to_cql_value(field, type_name, null_val) {
1396                    Ok(v) => row_values.push(v),
1397                    Err(e) => {
1398                        parse_errors += 1;
1399                        let msg = format!(
1400                            "Row {}: type error for '{}': {e}",
1401                            row_count + parse_errors,
1402                            type_name
1403                        );
1404                        eprintln!("{msg}");
1405                        if let Some(ref mut w) = err_writer {
1406                            let _ = writeln!(w, "{msg}");
1407                        }
1408                        if let Some(max) = max_parse_errors {
1409                            if parse_errors > max {
1410                                bail!("Exceeded maximum parse errors ({max}). Aborting.");
1411                            }
1412                        }
1413                        row_ok = false;
1414                        break;
1415                    }
1416                }
1417            }
1418            if row_ok {
1419                chunk.push(row_values);
1420            }
1421        }
1422
1423        if chunk.is_empty() {
1424            break 'outer;
1425        }
1426
1427        // --- Rate limiting ---
1428        if let Some(ref mut bucket) = rate_limiter {
1429            for _ in 0..chunk.len() {
1430                bucket.acquire().await;
1431            }
1432        }
1433
1434        // --- Insert phase: execute chunk concurrently ---
1435        let insert_results: Vec<Result<()>> = futures::stream::iter(chunk)
1436            .map(|values| {
1437                let ts = table_spec.as_str();
1438                let cl = col_list.as_str();
1439                let ttl = ttl_clause.as_str();
1440                let pid = prepared_id.as_ref();
1441                async move {
1442                    insert_row_with_retry(session, pid, ts, cl, ttl, &values, max_attempts).await
1443                }
1444            })
1445            .buffer_unordered(num_processes)
1446            .collect()
1447            .await;
1448
1449        for result in insert_results {
1450            match result {
1451                Ok(()) => row_count += 1,
1452                Err(e) => {
1453                    insert_errors += 1;
1454                    let msg = format!("Insert error on row {}: {e}", row_count + insert_errors);
1455                    eprintln!("{msg}");
1456                    if let Some(ref mut w) = err_writer {
1457                        let _ = writeln!(w, "{msg}");
1458                    }
1459                    if let Some(max) = max_insert_errors {
1460                        if insert_errors > max {
1461                            bail!("Exceeded maximum number of insert errors ({max}). Aborting import.");
1462                        }
1463                    }
1464                }
1465            }
1466        }
1467
1468        // --- Progress report ---
1469        if let Some(freq) = report_frequency {
1470            let total = row_count + insert_errors + parse_errors;
1471            if freq > 0 && total > 0 && total.is_multiple_of(freq) {
1472                eprintln!("Processed {} rows...", row_count);
1473            }
1474        }
1475    }
1476
1477    if let Some(ref mut w) = err_writer {
1478        w.flush()?;
1479    }
1480
1481    let elapsed = start.elapsed().as_secs_f64();
1482    println!("{row_count} rows imported from {source_name} in {elapsed:.3}s.");
1483    if parse_errors > 0 {
1484        eprintln!("{parse_errors} parse error(s) encountered.");
1485    }
1486    if insert_errors > 0 {
1487        eprintln!("{insert_errors} insert error(s) encountered.");
1488    }
1489
1490    Ok(())
1491}
1492
1493/// Execute a single row INSERT with retry on failure.
1494///
1495/// When `prepared_id` is `Some`, uses the prepared statement path with typed
1496/// bound values. Otherwise builds a literal-value INSERT string.
1497async fn insert_row_with_retry(
1498    session: &CqlSession,
1499    prepared_id: Option<&PreparedId>,
1500    table_spec: &str,
1501    col_list: &str,
1502    ttl_clause: &str,
1503    values: &[CqlValue],
1504    max_attempts: usize,
1505) -> Result<()> {
1506    let max = max_attempts.max(1);
1507    let mut last_err = anyhow::anyhow!("no attempts made");
1508
1509    for attempt in 1..=max {
1510        let result = if let Some(id) = prepared_id {
1511            session.execute_prepared(id, values).await
1512        } else {
1513            let literals: Vec<String> = values.iter().map(cql_value_to_insert_literal).collect();
1514            let insert = format!(
1515                "INSERT INTO {} ({}) VALUES ({}){};",
1516                table_spec,
1517                col_list,
1518                literals.join(", "),
1519                ttl_clause
1520            );
1521            session.execute_query(&insert).await
1522        };
1523
1524        match result {
1525            Ok(_) => return Ok(()),
1526            Err(e) => {
1527                last_err = e;
1528                if attempt < max {
1529                    // Exponential backoff: 100ms * 2^(attempt-1), capped at 2s
1530                    let wait_ms = (100u64 * (1u64 << (attempt - 1).min(4))).min(2000);
1531                    tokio::time::sleep(Duration::from_millis(wait_ms)).await;
1532                }
1533            }
1534        }
1535    }
1536
1537    Err(last_err)
1538}
1539
1540#[cfg(test)]
1541mod tests {
1542    use super::*;
1543
1544    #[test]
1545    fn parse_copy_to_basic() {
1546        let cmd = parse_copy_to("COPY ks.table TO '/tmp/out.csv'").unwrap();
1547        assert_eq!(cmd.keyspace, Some("ks".to_string()));
1548        assert_eq!(cmd.table, "table");
1549        assert_eq!(cmd.columns, None);
1550        assert_eq!(
1551            cmd.filename,
1552            CopyTarget::File(PathBuf::from("/tmp/out.csv"))
1553        );
1554    }
1555
1556    #[test]
1557    fn parse_copy_to_with_columns() {
1558        let cmd = parse_copy_to("COPY ks.table (col1, col2) TO '/tmp/out.csv'").unwrap();
1559        assert_eq!(cmd.keyspace, Some("ks".to_string()));
1560        assert_eq!(cmd.table, "table");
1561        assert_eq!(
1562            cmd.columns,
1563            Some(vec!["col1".to_string(), "col2".to_string()])
1564        );
1565        assert_eq!(
1566            cmd.filename,
1567            CopyTarget::File(PathBuf::from("/tmp/out.csv"))
1568        );
1569    }
1570
1571    #[test]
1572    fn parse_copy_to_stdout() {
1573        let cmd = parse_copy_to("COPY ks.table TO STDOUT").unwrap();
1574        assert_eq!(cmd.filename, CopyTarget::Stdout);
1575    }
1576
1577    #[test]
1578    fn parse_copy_to_with_options() {
1579        let cmd =
1580            parse_copy_to("COPY ks.table TO '/tmp/out.csv' WITH DELIMITER='|' AND HEADER=true")
1581                .unwrap();
1582        assert_eq!(cmd.options.delimiter, '|');
1583        assert!(cmd.options.header);
1584    }
1585
1586    #[test]
1587    fn format_value_null() {
1588        let opts = CopyOptions::default();
1589        assert_eq!(format_value_for_csv(&CqlValue::Null, &opts), "");
1590    }
1591
1592    #[test]
1593    fn format_value_text() {
1594        let opts = CopyOptions::default();
1595        assert_eq!(
1596            format_value_for_csv(&CqlValue::Text("hello".to_string()), &opts),
1597            "hello"
1598        );
1599    }
1600
1601    #[test]
1602    fn format_value_boolean() {
1603        let opts = CopyOptions::default();
1604        assert_eq!(
1605            format_value_for_csv(&CqlValue::Boolean(true), &opts),
1606            "True"
1607        );
1608        assert_eq!(
1609            format_value_for_csv(&CqlValue::Boolean(false), &opts),
1610            "False"
1611        );
1612    }
1613
1614    #[test]
1615    fn format_value_float_precision() {
1616        let opts = CopyOptions {
1617            float_precision: 3,
1618            ..Default::default()
1619        };
1620        assert_eq!(
1621            format_value_for_csv(&CqlValue::Float(1.23456), &opts),
1622            "1.235"
1623        );
1624    }
1625
1626    #[test]
1627    fn default_options() {
1628        let opts = CopyOptions::default();
1629        assert_eq!(opts.delimiter, ',');
1630        assert_eq!(opts.quote, '"');
1631        assert_eq!(opts.escape, '\\');
1632        assert!(!opts.header);
1633        assert_eq!(opts.null_val, "");
1634        assert_eq!(opts.datetime_format, None);
1635        assert_eq!(opts.encoding, "utf-8");
1636        assert_eq!(opts.float_precision, 5);
1637        assert_eq!(opts.double_precision, 12);
1638        assert_eq!(opts.decimal_sep, '.');
1639        assert_eq!(opts.thousands_sep, None);
1640        assert_eq!(opts.bool_style, ("True".to_string(), "False".to_string()));
1641        assert_eq!(opts.page_size, 1000);
1642        assert_eq!(opts.max_output_size, None);
1643        assert_eq!(opts.report_frequency, None);
1644    }
1645
1646    // -----------------------------------------------------------------------
1647    // COPY FROM tests
1648    // -----------------------------------------------------------------------
1649
1650    #[test]
1651    fn parse_copy_from_basic() {
1652        let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv'").unwrap();
1653        assert_eq!(cmd.keyspace, Some("ks".to_string()));
1654        assert_eq!(cmd.table, "table");
1655        assert_eq!(cmd.columns, None);
1656        assert_eq!(cmd.source, CopyTarget::File(PathBuf::from("/tmp/in.csv")));
1657    }
1658
1659    #[test]
1660    fn parse_copy_from_with_columns() {
1661        let cmd = parse_copy_from("COPY ks.table (col1, col2) FROM '/tmp/in.csv'").unwrap();
1662        assert_eq!(cmd.keyspace, Some("ks".to_string()));
1663        assert_eq!(cmd.table, "table");
1664        assert_eq!(
1665            cmd.columns,
1666            Some(vec!["col1".to_string(), "col2".to_string()])
1667        );
1668    }
1669
1670    #[test]
1671    fn parse_copy_from_stdin() {
1672        let cmd = parse_copy_from("COPY ks.table FROM STDIN").unwrap();
1673        assert_eq!(cmd.source, CopyTarget::Stdin);
1674    }
1675
1676    #[test]
1677    fn parse_copy_from_stdin_case_insensitive() {
1678        let cmd = parse_copy_from("COPY ks.table FROM stdin").unwrap();
1679        assert_eq!(cmd.source, CopyTarget::Stdin);
1680    }
1681
1682    #[test]
1683    fn parse_copy_from_no_keyspace() {
1684        let cmd = parse_copy_from("COPY mytable FROM '/data/file.csv'").unwrap();
1685        assert_eq!(cmd.keyspace, None);
1686        assert_eq!(cmd.table, "mytable");
1687    }
1688
1689    #[test]
1690    fn parse_copy_from_with_options() {
1691        let cmd = parse_copy_from(
1692            "COPY ks.table FROM '/tmp/in.csv' WITH TTL=3600 AND HEADER=true AND CHUNKSIZE=1000 AND DELIMITER='|'",
1693        )
1694        .unwrap();
1695        assert_eq!(cmd.options.ttl, Some(3600));
1696        assert!(cmd.options.header);
1697        assert_eq!(cmd.options.chunk_size, 1000);
1698        assert_eq!(cmd.options.delimiter, '|');
1699    }
1700
1701    #[test]
1702    fn parse_copy_from_with_error_options() {
1703        let cmd = parse_copy_from(
1704            "COPY ks.table FROM '/tmp/in.csv' WITH MAXPARSEERRORS=100 AND MAXINSERTERRORS=50 AND ERRFILE='/tmp/err.log'",
1705        )
1706        .unwrap();
1707        assert_eq!(cmd.options.max_parse_errors, Some(100));
1708        assert_eq!(cmd.options.max_insert_errors, Some(50));
1709        assert_eq!(cmd.options.err_file, Some(PathBuf::from("/tmp/err.log")));
1710    }
1711
1712    #[test]
1713    fn parse_copy_from_with_batch_options() {
1714        let cmd = parse_copy_from(
1715            "COPY ks.table FROM '/tmp/in.csv' WITH MAXBATCHSIZE=50 AND MINBATCHSIZE=5 AND MAXATTEMPTS=10",
1716        )
1717        .unwrap();
1718        assert_eq!(cmd.options.max_batch_size, 50);
1719        assert_eq!(cmd.options.min_batch_size, 5);
1720        assert_eq!(cmd.options.max_attempts, 10);
1721    }
1722
1723    #[test]
1724    fn parse_copy_from_semicolon() {
1725        let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv';").unwrap();
1726        assert_eq!(cmd.source, CopyTarget::File(PathBuf::from("/tmp/in.csv")));
1727    }
1728
1729    #[test]
1730    fn default_copy_from_options() {
1731        let opts = CopyFromOptions::default();
1732        assert_eq!(opts.delimiter, ',');
1733        assert_eq!(opts.quote, '"');
1734        assert_eq!(opts.escape, '\\');
1735        assert!(!opts.header);
1736        assert_eq!(opts.null_val, "");
1737        assert_eq!(opts.datetime_format, None);
1738        assert_eq!(opts.encoding, "utf-8");
1739        assert_eq!(opts.chunk_size, 5000);
1740        assert_eq!(opts.max_batch_size, 20);
1741        assert_eq!(opts.min_batch_size, 2);
1742        assert!(opts.prepared_statements);
1743        assert_eq!(opts.ttl, None);
1744        assert_eq!(opts.max_attempts, 5);
1745        assert_eq!(opts.max_parse_errors, None);
1746        assert_eq!(opts.max_insert_errors, None);
1747        assert_eq!(opts.err_file, None);
1748        assert_eq!(opts.report_frequency, None);
1749        assert_eq!(opts.ingest_rate, None);
1750        assert_eq!(opts.num_processes, 1);
1751    }
1752
1753    // -----------------------------------------------------------------------
1754    // csv_str_to_cql_value unit tests
1755    // -----------------------------------------------------------------------
1756
1757    #[test]
1758    fn csv_to_cql_text_types() {
1759        let v = csv_str_to_cql_value("hello", "text", "").unwrap();
1760        assert_eq!(v, CqlValue::Text("hello".to_string()));
1761
1762        let v = csv_str_to_cql_value("hi", "ascii", "").unwrap();
1763        assert_eq!(v, CqlValue::Ascii("hi".to_string()));
1764
1765        let v = csv_str_to_cql_value("world", "varchar", "").unwrap();
1766        assert_eq!(v, CqlValue::Text("world".to_string())); // varchar → Text
1767    }
1768
1769    #[test]
1770    fn csv_to_cql_int_types() {
1771        assert_eq!(
1772            csv_str_to_cql_value("42", "int", "").unwrap(),
1773            CqlValue::Int(42)
1774        );
1775        assert_eq!(
1776            csv_str_to_cql_value("-100", "bigint", "").unwrap(),
1777            CqlValue::BigInt(-100)
1778        );
1779        assert_eq!(
1780            csv_str_to_cql_value("1000", "counter", "").unwrap(),
1781            CqlValue::BigInt(1000)
1782        );
1783        assert_eq!(
1784            csv_str_to_cql_value("32767", "smallint", "").unwrap(),
1785            CqlValue::SmallInt(32767)
1786        );
1787        assert_eq!(
1788            csv_str_to_cql_value("127", "tinyint", "").unwrap(),
1789            CqlValue::TinyInt(127)
1790        );
1791    }
1792
1793    #[test]
1794    fn csv_to_cql_float_types() {
1795        match csv_str_to_cql_value("1.5", "float", "").unwrap() {
1796            CqlValue::Float(f) => assert!((f - 1.5f32).abs() < 1e-5),
1797            other => panic!("expected Float, got {other:?}"),
1798        }
1799        match csv_str_to_cql_value("1.5", "double", "").unwrap() {
1800            CqlValue::Double(d) => assert!((d - 1.5f64).abs() < 1e-9),
1801            other => panic!("expected Double, got {other:?}"),
1802        }
1803        // Scientific notation
1804        assert!(matches!(
1805            csv_str_to_cql_value("1e10", "double", "").unwrap(),
1806            CqlValue::Double(_)
1807        ));
1808    }
1809
1810    #[test]
1811    fn csv_to_cql_boolean() {
1812        for t in &["true", "True", "TRUE", "yes", "YES", "on", "ON", "1"] {
1813            assert_eq!(
1814                csv_str_to_cql_value(t, "boolean", "").unwrap(),
1815                CqlValue::Boolean(true),
1816                "expected true for {t:?}"
1817            );
1818        }
1819        for f in &["false", "False", "FALSE", "no", "NO", "off", "OFF", "0"] {
1820            assert_eq!(
1821                csv_str_to_cql_value(f, "boolean", "").unwrap(),
1822                CqlValue::Boolean(false),
1823                "expected false for {f:?}"
1824            );
1825        }
1826    }
1827
1828    #[test]
1829    fn csv_to_cql_uuid() {
1830        let uuid_str = "550e8400-e29b-41d4-a716-446655440000";
1831        assert!(matches!(
1832            csv_str_to_cql_value(uuid_str, "uuid", "").unwrap(),
1833            CqlValue::Uuid(_)
1834        ));
1835        assert!(matches!(
1836            csv_str_to_cql_value(uuid_str, "timeuuid", "").unwrap(),
1837            CqlValue::TimeUuid(_)
1838        ));
1839        // Invalid UUID
1840        assert!(csv_str_to_cql_value("not-a-uuid", "uuid", "").is_err());
1841    }
1842
1843    #[test]
1844    fn csv_to_cql_timestamp() {
1845        // ISO-8601 with timezone
1846        let v = csv_str_to_cql_value("2024-01-15T12:34:56Z", "timestamp", "").unwrap();
1847        assert!(matches!(v, CqlValue::Timestamp(_)));
1848
1849        // Milliseconds as integer
1850        let v = csv_str_to_cql_value("1705318496000", "timestamp", "").unwrap();
1851        assert_eq!(v, CqlValue::Timestamp(1705318496000));
1852    }
1853
1854    #[test]
1855    fn csv_to_cql_date() {
1856        use chrono::NaiveDate;
1857        let v = csv_str_to_cql_value("2024-01-15", "date", "").unwrap();
1858        assert_eq!(
1859            v,
1860            CqlValue::Date(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap())
1861        );
1862
1863        assert!(csv_str_to_cql_value("not-a-date", "date", "").is_err());
1864    }
1865
1866    #[test]
1867    fn csv_to_cql_time() {
1868        let v = csv_str_to_cql_value("12:34:56", "time", "").unwrap();
1869        assert!(matches!(v, CqlValue::Time(_)));
1870
1871        let v = csv_str_to_cql_value("12:34:56.789", "time", "").unwrap();
1872        assert!(matches!(v, CqlValue::Time(_)));
1873
1874        assert!(csv_str_to_cql_value("not-a-time", "time", "").is_err());
1875    }
1876
1877    #[test]
1878    fn csv_to_cql_inet() {
1879        let v = csv_str_to_cql_value("127.0.0.1", "inet", "").unwrap();
1880        assert!(matches!(v, CqlValue::Inet(_)));
1881
1882        let v = csv_str_to_cql_value("::1", "inet", "").unwrap();
1883        assert!(matches!(v, CqlValue::Inet(_)));
1884
1885        assert!(csv_str_to_cql_value("not.an.ip", "inet", "").is_err());
1886    }
1887
1888    #[test]
1889    fn csv_to_cql_blob() {
1890        let v = csv_str_to_cql_value("0xdeadbeef", "blob", "").unwrap();
1891        assert_eq!(v, CqlValue::Blob(vec![0xde, 0xad, 0xbe, 0xef]));
1892
1893        // Without 0x prefix
1894        let v = csv_str_to_cql_value("deadbeef", "blob", "").unwrap();
1895        assert_eq!(v, CqlValue::Blob(vec![0xde, 0xad, 0xbe, 0xef]));
1896
1897        // Invalid hex
1898        assert!(csv_str_to_cql_value("0xgg", "blob", "").is_err());
1899        // Odd number of digits
1900        assert!(csv_str_to_cql_value("0xabc", "blob", "").is_err());
1901    }
1902
1903    #[test]
1904    fn csv_to_cql_null_handling() {
1905        // Empty field with empty null_val → Null
1906        assert_eq!(csv_str_to_cql_value("", "int", "").unwrap(), CqlValue::Null);
1907        assert_eq!(
1908            csv_str_to_cql_value("", "text", "").unwrap(),
1909            CqlValue::Null
1910        );
1911    }
1912
1913    #[test]
1914    fn csv_to_cql_null_custom() {
1915        // Custom null_val
1916        assert_eq!(
1917            csv_str_to_cql_value("NULL", "int", "NULL").unwrap(),
1918            CqlValue::Null
1919        );
1920        assert_eq!(
1921            csv_str_to_cql_value("N/A", "text", "N/A").unwrap(),
1922            CqlValue::Null
1923        );
1924        // Non-null value with custom null_val
1925        assert!(matches!(
1926            csv_str_to_cql_value("42", "int", "NULL").unwrap(),
1927            CqlValue::Int(42)
1928        ));
1929    }
1930
1931    #[test]
1932    fn csv_to_cql_unknown_type_fallback() {
1933        // Unknown types fall back to Text
1934        let v = csv_str_to_cql_value("hello", "customtype", "").unwrap();
1935        assert_eq!(v, CqlValue::Text("hello".to_string()));
1936
1937        // Collection types also fall through to Text
1938        let v = csv_str_to_cql_value("[1, 2, 3]", "list<int>", "").unwrap();
1939        assert_eq!(v, CqlValue::Text("[1, 2, 3]".to_string()));
1940    }
1941
1942    #[test]
1943    fn csv_to_cql_parse_error_int() {
1944        // Non-numeric for int type → error
1945        assert!(csv_str_to_cql_value("notanint", "int", "").is_err());
1946        assert!(csv_str_to_cql_value("3.14", "int", "").is_err());
1947        assert!(csv_str_to_cql_value("notanint", "bigint", "").is_err());
1948    }
1949
1950    #[test]
1951    fn csv_to_cql_varint_and_decimal() {
1952        let v = csv_str_to_cql_value("123456789012345678901234567890", "varint", "").unwrap();
1953        assert!(matches!(v, CqlValue::Varint(_)));
1954
1955        let v = csv_str_to_cql_value("3.141592653589793", "decimal", "").unwrap();
1956        assert!(matches!(v, CqlValue::Decimal(_)));
1957    }
1958
1959    #[test]
1960    fn csv_to_cql_frozen_stripped() {
1961        // frozen<set<uuid>> should strip frozen wrapper → set<uuid> → Text fallback
1962        let v = csv_str_to_cql_value("{uuid1, uuid2}", "frozen<set<uuid>>", "").unwrap();
1963        assert!(matches!(v, CqlValue::Text(_)));
1964    }
1965
1966    #[test]
1967    fn parse_copy_from_numprocesses() {
1968        let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv' WITH NUMPROCESSES=4").unwrap();
1969        assert_eq!(cmd.options.num_processes, 4);
1970    }
1971
1972    #[test]
1973    fn format_value_integers() {
1974        let opts = CopyOptions::default();
1975        assert_eq!(format_value_for_csv(&CqlValue::Int(42), &opts), "42");
1976        assert_eq!(format_value_for_csv(&CqlValue::BigInt(-100), &opts), "-100");
1977        assert_eq!(format_value_for_csv(&CqlValue::SmallInt(7), &opts), "7");
1978        assert_eq!(format_value_for_csv(&CqlValue::TinyInt(-1), &opts), "-1");
1979        assert_eq!(format_value_for_csv(&CqlValue::Counter(99), &opts), "99");
1980    }
1981
1982    #[test]
1983    fn format_value_blob() {
1984        let opts = CopyOptions::default();
1985        assert_eq!(
1986            format_value_for_csv(&CqlValue::Blob(vec![0xca, 0xfe]), &opts),
1987            "0xcafe"
1988        );
1989    }
1990
1991    #[test]
1992    fn format_value_uuid() {
1993        let opts = CopyOptions::default();
1994        let output = format_value_for_csv(&CqlValue::Uuid(uuid::Uuid::nil()), &opts);
1995        assert_eq!(output, "00000000-0000-0000-0000-000000000000");
1996    }
1997
1998    #[test]
1999    fn format_value_inet() {
2000        let opts = CopyOptions::default();
2001        let output = format_value_for_csv(&CqlValue::Inet("127.0.0.1".parse().unwrap()), &opts);
2002        assert_eq!(output, "127.0.0.1");
2003    }
2004
2005    #[test]
2006    fn format_value_duration() {
2007        let opts = CopyOptions::default();
2008        let dur = CqlValue::Duration {
2009            months: 1,
2010            days: 2,
2011            nanoseconds: 3,
2012        };
2013        assert_eq!(format_value_for_csv(&dur, &opts), "1mo2d3ns");
2014    }
2015
2016    #[test]
2017    fn format_value_decimal_custom_sep() {
2018        use bigdecimal::BigDecimal;
2019        use std::str::FromStr;
2020        let opts = CopyOptions {
2021            decimal_sep: ',',
2022            ..Default::default()
2023        };
2024        let dec = CqlValue::Decimal(BigDecimal::from_str("3.14").unwrap());
2025        assert_eq!(format_value_for_csv(&dec, &opts), "3,14");
2026    }
2027
2028    #[test]
2029    fn format_value_varint() {
2030        let opts = CopyOptions::default();
2031        let v = CqlValue::Varint(num_bigint::BigInt::from(12345));
2032        assert_eq!(format_value_for_csv(&v, &opts), "12345");
2033    }
2034
2035    #[test]
2036    fn format_value_float_nan_inf() {
2037        let opts = CopyOptions::default();
2038        assert_eq!(
2039            format_value_for_csv(&CqlValue::Float(f32::NAN), &opts),
2040            "NaN"
2041        );
2042        assert_eq!(
2043            format_value_for_csv(&CqlValue::Float(f32::INFINITY), &opts),
2044            "Infinity"
2045        );
2046        assert_eq!(
2047            format_value_for_csv(&CqlValue::Float(f32::NEG_INFINITY), &opts),
2048            "-Infinity"
2049        );
2050    }
2051
2052    #[test]
2053    fn format_value_double_precision() {
2054        let opts = CopyOptions {
2055            double_precision: 3,
2056            ..Default::default()
2057        };
2058        assert_eq!(
2059            format_value_for_csv(&CqlValue::Double(1.23456), &opts),
2060            "1.235"
2061        );
2062    }
2063
2064    #[test]
2065    fn format_value_float_custom_decimal_sep() {
2066        let opts = CopyOptions {
2067            float_precision: 2,
2068            decimal_sep: ',',
2069            ..Default::default()
2070        };
2071        assert_eq!(format_value_for_csv(&CqlValue::Float(1.5), &opts), "1,50");
2072    }
2073
2074    #[test]
2075    fn format_value_timestamp() {
2076        let opts = CopyOptions::default();
2077        let output = format_value_for_csv(&CqlValue::Timestamp(0), &opts);
2078        assert!(output.contains("1970-01-01"));
2079    }
2080
2081    #[test]
2082    fn format_value_timestamp_custom_format() {
2083        let opts = CopyOptions {
2084            datetime_format: Some("%Y/%m/%d".to_string()),
2085            ..Default::default()
2086        };
2087        let output = format_value_for_csv(&CqlValue::Timestamp(0), &opts);
2088        assert_eq!(output, "1970/01/01");
2089    }
2090
2091    #[test]
2092    fn format_value_collections() {
2093        let opts = CopyOptions::default();
2094        let list = CqlValue::List(vec![CqlValue::Int(1), CqlValue::Int(2)]);
2095        assert_eq!(format_value_for_csv(&list, &opts), "[1, 2]");
2096
2097        let map = CqlValue::Map(vec![(CqlValue::Text("k".to_string()), CqlValue::Int(1))]);
2098        assert_eq!(format_value_for_csv(&map, &opts), "{'k': 1}");
2099    }
2100
2101    #[test]
2102    fn format_value_unset() {
2103        let opts = CopyOptions::default();
2104        assert_eq!(format_value_for_csv(&CqlValue::Unset, &opts), "");
2105    }
2106
2107    #[test]
2108    fn parse_copy_to_no_keyspace() {
2109        let cmd = parse_copy_to("COPY mytable TO '/tmp/out.csv'").unwrap();
2110        assert_eq!(cmd.keyspace, None);
2111        assert_eq!(cmd.table, "mytable");
2112    }
2113
2114    #[test]
2115    fn parse_copy_to_case_insensitive() {
2116        let cmd = parse_copy_to("copy ks.table to STDOUT").unwrap();
2117        assert_eq!(cmd.filename, CopyTarget::Stdout);
2118    }
2119
2120    #[test]
2121    fn parse_copy_to_underscore_in_names() {
2122        let cmd = parse_copy_to("COPY test_copy_to_f49405.copy_test TO '/tmp/out.csv'").unwrap();
2123        assert_eq!(cmd.keyspace.as_deref(), Some("test_copy_to_f49405"));
2124        assert_eq!(cmd.table, "copy_test");
2125    }
2126
2127    #[test]
2128    fn parse_copy_from_underscore_in_names() {
2129        let cmd = parse_copy_from("COPY test_from_data.my_table FROM '/tmp/in.csv'").unwrap();
2130        assert_eq!(cmd.keyspace.as_deref(), Some("test_from_data"));
2131        assert_eq!(cmd.table, "my_table");
2132    }
2133
2134    #[test]
2135    fn parse_copy_to_invalid_not_copy() {
2136        assert!(parse_copy_to("SELECT * FROM foo").is_err());
2137    }
2138
2139    #[test]
2140    fn parse_copy_to_missing_to() {
2141        assert!(parse_copy_to("COPY ks.table '/tmp/out.csv'").is_err());
2142    }
2143
2144    #[test]
2145    fn parse_copy_from_invalid_not_copy() {
2146        assert!(parse_copy_from("SELECT * FROM foo").is_err());
2147    }
2148}