cqlsh_rs/
copy.rs

1//! COPY TO and COPY FROM implementation — exports/imports CSV data.
2//!
3//! Supports:
4//!   `COPY [ks.]table [(col1, col2, ...)] TO 'filename'|STDOUT [WITH options...]`
5//!   `COPY [ks.]table [(col1, col2, ...)] FROM 'filename'|STDIN [WITH options...]`
6
7use std::io::Write;
8use std::net::IpAddr;
9use std::path::PathBuf;
10use std::time::{Duration, Instant};
11
12use anyhow::{bail, Context, Result};
13use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
14use futures::StreamExt;
15
16use crate::driver::types::CqlValue;
17use crate::driver::PreparedId;
18use crate::session::CqlSession;
19
20/// Where to write/read the CSV data.
21#[derive(Debug, Clone, PartialEq)]
22pub enum CopyTarget {
23    /// Write to / read from a file at the given path.
24    File(PathBuf),
25    /// Write to standard output.
26    Stdout,
27    /// Read from standard input.
28    Stdin,
29}
30
31/// All options controlling CSV export behavior.
32#[derive(Debug, Clone)]
33pub struct CopyOptions {
34    pub delimiter: char,
35    pub quote: char,
36    pub escape: char,
37    pub header: bool,
38    pub null_val: String,
39    pub datetime_format: Option<String>,
40    pub encoding: String,
41    pub float_precision: usize,
42    pub double_precision: usize,
43    pub decimal_sep: char,
44    pub thousands_sep: Option<char>,
45    pub bool_style: (String, String),
46    pub page_size: usize,
47    pub max_output_size: Option<usize>,
48    pub report_frequency: Option<usize>,
49}
50
51impl Default for CopyOptions {
52    fn default() -> Self {
53        Self {
54            delimiter: ',',
55            quote: '"',
56            escape: '\\',
57            header: false,
58            null_val: String::new(),
59            datetime_format: None,
60            encoding: "utf-8".to_string(),
61            float_precision: 5,
62            double_precision: 12,
63            decimal_sep: '.',
64            thousands_sep: None,
65            bool_style: ("True".to_string(), "False".to_string()),
66            page_size: 1000,
67            max_output_size: None,
68            report_frequency: None,
69        }
70    }
71}
72
73/// A parsed COPY TO command.
74#[derive(Debug, Clone)]
75pub struct CopyToCommand {
76    pub keyspace: Option<String>,
77    pub table: String,
78    pub columns: Option<Vec<String>>,
79    pub filename: CopyTarget,
80    pub options: CopyOptions,
81}
82
83/// Parse a `COPY ... TO ...` statement.
84///
85/// Format: `COPY [ks.]table [(col1, col2)] TO 'filename'|STDOUT [WITH opt=val AND ...]`
86pub fn parse_copy_to(input: &str) -> Result<CopyToCommand> {
87    let trimmed = input.trim().trim_end_matches(';').trim();
88
89    // Must start with COPY (case-insensitive)
90    let upper = trimmed.to_uppercase();
91    if !upper.starts_with("COPY ") {
92        bail!("not a COPY statement");
93    }
94
95    // Find the TO keyword (case-insensitive), but not inside parentheses
96    let to_pos =
97        find_keyword_outside_parens(trimmed, "TO").context("COPY statement missing TO keyword")?;
98
99    let before_to = trimmed[4..to_pos].trim(); // skip "COPY"
100    let after_to = trimmed[to_pos + 2..].trim(); // skip "TO"
101
102    // Parse table spec (before TO): [ks.]table [(col1, col2)]
103    let (keyspace, table, columns) = parse_table_spec(before_to)?;
104
105    // Parse target and options (after TO): 'filename'|STDOUT [WITH ...]
106    let (filename, options_str) = parse_target_and_options(after_to)?;
107
108    let options = if let Some(opts) = options_str {
109        parse_options(&opts)?
110    } else {
111        CopyOptions::default()
112    };
113
114    Ok(CopyToCommand {
115        keyspace,
116        table,
117        columns,
118        filename,
119        options,
120    })
121}
122
123/// Execute a COPY TO command, writing results as CSV.
124pub async fn execute_copy_to(
125    session: &CqlSession,
126    cmd: &CopyToCommand,
127    current_keyspace: Option<&str>,
128) -> Result<()> {
129    // Build SELECT query
130    let col_spec = match &cmd.columns {
131        Some(cols) => cols.join(", "),
132        None => "*".to_string(),
133    };
134
135    let table_spec = match (&cmd.keyspace, current_keyspace) {
136        (Some(ks), _) => format!("{}.{}", ks, cmd.table),
137        (None, Some(ks)) => format!("{}.{}", ks, cmd.table),
138        (None, None) => cmd.table.clone(),
139    };
140
141    let query = format!("SELECT {} FROM {}", col_spec, table_spec);
142
143    let result = session.execute_query(&query).await?;
144
145    // Set up CSV writer
146    let mut row_count: usize = 0;
147
148    match &cmd.filename {
149        CopyTarget::File(path) => {
150            let file = std::fs::File::create(path)
151                .with_context(|| format!("failed to create file: {}", path.display()))?;
152            let buf = std::io::BufWriter::new(file);
153            let mut wtr = build_csv_writer(&cmd.options, buf);
154
155            if cmd.options.header {
156                let headers: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
157                wtr.write_record(&headers)?;
158            }
159
160            for row in &result.rows {
161                if let Some(max) = cmd.options.max_output_size {
162                    if row_count >= max {
163                        break;
164                    }
165                }
166                let fields: Vec<String> = row
167                    .values
168                    .iter()
169                    .map(|v| format_value_for_csv(v, &cmd.options))
170                    .collect();
171                wtr.write_record(&fields)?;
172                row_count += 1;
173
174                if let Some(freq) = cmd.options.report_frequency {
175                    if freq > 0 && row_count.is_multiple_of(freq) {
176                        eprintln!("Processed {} rows...", row_count);
177                    }
178                }
179            }
180
181            wtr.flush()?;
182            println!("{} rows exported to '{}'.", row_count, path.display());
183        }
184        CopyTarget::Stdout => {
185            let stdout = std::io::stdout();
186            let handle = stdout.lock();
187            let mut wtr = build_csv_writer(&cmd.options, handle);
188
189            if cmd.options.header {
190                let headers: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
191                wtr.write_record(&headers)?;
192            }
193
194            for row in &result.rows {
195                if let Some(max) = cmd.options.max_output_size {
196                    if row_count >= max {
197                        break;
198                    }
199                }
200                let fields: Vec<String> = row
201                    .values
202                    .iter()
203                    .map(|v| format_value_for_csv(v, &cmd.options))
204                    .collect();
205                wtr.write_record(&fields)?;
206                row_count += 1;
207
208                if let Some(freq) = cmd.options.report_frequency {
209                    if freq > 0 && row_count.is_multiple_of(freq) {
210                        eprintln!("Processed {} rows...", row_count);
211                    }
212                }
213            }
214
215            wtr.flush()?;
216            eprintln!("{} rows exported to STDOUT.", row_count);
217        }
218        CopyTarget::Stdin => {
219            bail!("COPY TO cannot write to STDIN");
220        }
221    }
222
223    Ok(())
224}
225
226/// Format a single CQL value for CSV output according to the given options.
227pub fn format_value_for_csv(value: &CqlValue, options: &CopyOptions) -> String {
228    match value {
229        CqlValue::Null | CqlValue::Unset => options.null_val.clone(),
230        CqlValue::Text(s) | CqlValue::Ascii(s) => s.clone(),
231        CqlValue::Boolean(b) => {
232            if *b {
233                options.bool_style.0.clone()
234            } else {
235                options.bool_style.1.clone()
236            }
237        }
238        CqlValue::Int(v) => v.to_string(),
239        CqlValue::BigInt(v) => v.to_string(),
240        CqlValue::SmallInt(v) => v.to_string(),
241        CqlValue::TinyInt(v) => v.to_string(),
242        CqlValue::Counter(v) => v.to_string(),
243        CqlValue::Varint(v) => v.to_string(),
244        CqlValue::Float(v) => format_float(*v as f64, options.float_precision, options),
245        CqlValue::Double(v) => format_float(*v, options.double_precision, options),
246        CqlValue::Decimal(v) => {
247            let s = v.to_string();
248            if options.decimal_sep != '.' {
249                s.replace('.', &options.decimal_sep.to_string())
250            } else {
251                s
252            }
253        }
254        CqlValue::Timestamp(millis) => format_timestamp(*millis, options),
255        CqlValue::Uuid(u) | CqlValue::TimeUuid(u) => u.to_string(),
256        CqlValue::Blob(bytes) => {
257            let mut s = String::with_capacity(2 + bytes.len() * 2);
258            s.push_str("0x");
259            for b in bytes {
260                s.push_str(&format!("{b:02x}"));
261            }
262            s
263        }
264        CqlValue::Date(d) => d.to_string(),
265        CqlValue::Time(t) => t.to_string(),
266        CqlValue::Duration {
267            months,
268            days,
269            nanoseconds,
270        } => format!("{months}mo{days}d{nanoseconds}ns"),
271        CqlValue::Inet(addr) => addr.to_string(),
272        // Collection types: use CQL literal format via Display
273        CqlValue::List(_)
274        | CqlValue::Set(_)
275        | CqlValue::Map(_)
276        | CqlValue::Tuple(_)
277        | CqlValue::UserDefinedType { .. } => value.to_string(),
278    }
279}
280
281// ---------------------------------------------------------------------------
282// Internal helpers
283// ---------------------------------------------------------------------------
284
285/// Build a csv::Writer with the configured options.
286fn build_csv_writer<W: Write>(options: &CopyOptions, writer: W) -> csv::Writer<W> {
287    csv::WriterBuilder::new()
288        .delimiter(options.delimiter as u8)
289        .quote(options.quote as u8)
290        .escape(options.escape as u8)
291        .double_quote(false)
292        .from_writer(writer)
293}
294
295/// Format a floating-point number with given precision and decimal separator.
296fn format_float(v: f64, precision: usize, options: &CopyOptions) -> String {
297    if v.is_nan() {
298        return "NaN".to_string();
299    }
300    if v.is_infinite() {
301        return if v.is_sign_positive() {
302            "Infinity".to_string()
303        } else {
304            "-Infinity".to_string()
305        };
306    }
307    let s = format!("{v:.prec$}", prec = precision);
308    if options.decimal_sep != '.' {
309        s.replace('.', &options.decimal_sep.to_string())
310    } else {
311        s
312    }
313}
314
315/// Format a timestamp (millis since epoch) using the configured format.
316fn format_timestamp(millis: i64, options: &CopyOptions) -> String {
317    match DateTime::from_timestamp_millis(millis) {
318        Some(dt) => {
319            let utc: DateTime<Utc> = dt;
320            match &options.datetime_format {
321                Some(fmt) => utc.format(fmt).to_string(),
322                None => utc.format("%Y-%m-%d %H:%M:%S%.6f%z").to_string(),
323            }
324        }
325        None => format!("<invalid timestamp: {millis}>"),
326    }
327}
328
329/// Find a keyword in the string that is not inside parentheses.
330/// Returns the byte offset of the keyword start.
331fn find_keyword_outside_parens(s: &str, keyword: &str) -> Option<usize> {
332    let upper = s.to_uppercase();
333    let kw_upper = keyword.to_uppercase();
334    let kw_len = kw_upper.len();
335    let mut depth: i32 = 0;
336    let mut in_quote = false;
337    let mut quote_char: char = '\'';
338    let bytes = s.as_bytes();
339
340    for (i, ch) in s.char_indices() {
341        if in_quote {
342            if ch == quote_char {
343                in_quote = false;
344            }
345            continue;
346        }
347        match ch {
348            '\'' | '"' => {
349                in_quote = true;
350                quote_char = ch;
351            }
352            '(' => depth += 1,
353            ')' => depth -= 1,
354            _ => {}
355        }
356        if depth == 0 && !in_quote {
357            // Check if keyword matches at this position, surrounded by word boundaries
358            if i + kw_len <= upper.len() && upper[i..i + kw_len] == *kw_upper {
359                // Check word boundaries
360                let is_word_char = |b: u8| b.is_ascii_alphanumeric() || b == b'_';
361                let before_ok = i == 0 || !is_word_char(bytes[i - 1]);
362                let after_ok = i + kw_len >= s.len() || !is_word_char(bytes[i + kw_len]);
363                if before_ok && after_ok {
364                    return Some(i);
365                }
366            }
367        }
368    }
369    None
370}
371
372/// Parse the table spec: `[ks.]table [(col1, col2, ...)]`
373fn parse_table_spec(spec: &str) -> Result<(Option<String>, String, Option<Vec<String>>)> {
374    let spec = spec.trim();
375
376    // Split off column list if present
377    let (table_part, columns) = if let Some(paren_start) = spec.find('(') {
378        let paren_end = spec
379            .rfind(')')
380            .context("unmatched parenthesis in column list")?;
381        let cols_str = &spec[paren_start + 1..paren_end];
382        let cols: Vec<String> = cols_str
383            .split(',')
384            .map(|c| c.trim().to_string())
385            .filter(|c| !c.is_empty())
386            .collect();
387        (spec[..paren_start].trim(), Some(cols))
388    } else {
389        (spec, None)
390    };
391
392    // Split keyspace.table
393    let (keyspace, table) = if let Some(dot_pos) = table_part.find('.') {
394        let ks = table_part[..dot_pos].trim().to_string();
395        let tbl = table_part[dot_pos + 1..].trim().to_string();
396        (Some(ks), tbl)
397    } else {
398        (None, table_part.trim().to_string())
399    };
400
401    if table.is_empty() {
402        bail!("missing table name in COPY statement");
403    }
404
405    Ok((keyspace, table, columns))
406}
407
408/// Parse the target and WITH options after the TO keyword.
409/// Returns `(CopyTarget, Option<options_string>)`.
410fn parse_target_and_options(after_to: &str) -> Result<(CopyTarget, Option<String>)> {
411    let after_to = after_to.trim();
412
413    // Find WITH keyword (case-insensitive) outside of quotes
414    let with_pos = find_keyword_outside_parens(after_to, "WITH");
415
416    let (target_str, options_str) = match with_pos {
417        Some(pos) => {
418            let target = after_to[..pos].trim();
419            let opts = after_to[pos + 4..].trim(); // skip "WITH"
420            (target, Some(opts.to_string()))
421        }
422        None => (after_to, None),
423    };
424
425    let target_str = target_str.trim();
426
427    let target = if target_str.eq_ignore_ascii_case("STDOUT") {
428        CopyTarget::Stdout
429    } else {
430        // Strip surrounding quotes (single quotes)
431        let path_str = if (target_str.starts_with('\'') && target_str.ends_with('\''))
432            || (target_str.starts_with('"') && target_str.ends_with('"'))
433        {
434            &target_str[1..target_str.len() - 1]
435        } else {
436            target_str
437        };
438        CopyTarget::File(PathBuf::from(path_str))
439    };
440
441    Ok((target, options_str))
442}
443
444/// Parse `opt1=val1 AND opt2=val2 ...` pairs into `CopyOptions`.
445fn parse_options(options_str: &str) -> Result<CopyOptions> {
446    let mut opts = CopyOptions::default();
447
448    // Split on AND (case-insensitive)
449    let parts = split_on_and(options_str);
450
451    for part in parts {
452        let part = part.trim();
453        if part.is_empty() {
454            continue;
455        }
456
457        let eq_pos = part
458            .find('=')
459            .with_context(|| format!("invalid option (missing '='): {part}"))?;
460        let key = part[..eq_pos].trim().to_uppercase();
461        let val = unquote(part[eq_pos + 1..].trim());
462
463        match key.as_str() {
464            "DELIMITER" => {
465                opts.delimiter = val
466                    .chars()
467                    .next()
468                    .context("DELIMITER must be a single character")?;
469            }
470            "QUOTE" => {
471                opts.quote = val
472                    .chars()
473                    .next()
474                    .context("QUOTE must be a single character")?;
475            }
476            "ESCAPE" => {
477                opts.escape = val
478                    .chars()
479                    .next()
480                    .context("ESCAPE must be a single character")?;
481            }
482            "HEADER" => {
483                opts.header = parse_bool_option(&val)?;
484            }
485            "NULL" | "NULLVAL" => {
486                opts.null_val = val;
487            }
488            "DATETIMEFORMAT" => {
489                opts.datetime_format = if val.is_empty() { None } else { Some(val) };
490            }
491            "ENCODING" => {
492                opts.encoding = val;
493            }
494            "FLOATPRECISION" => {
495                opts.float_precision = val.parse().context("FLOATPRECISION must be an integer")?;
496            }
497            "DOUBLEPRECISION" => {
498                opts.double_precision =
499                    val.parse().context("DOUBLEPRECISION must be an integer")?;
500            }
501            "DECIMALSEP" => {
502                opts.decimal_sep = val
503                    .chars()
504                    .next()
505                    .context("DECIMALSEP must be a single character")?;
506            }
507            "THOUSANDSSEP" => {
508                opts.thousands_sep = val.chars().next();
509            }
510            "BOOLSTYLE" => {
511                // Format: "True:False"
512                let parts: Vec<&str> = val.splitn(2, ':').collect();
513                if parts.len() == 2 {
514                    opts.bool_style = (parts[0].to_string(), parts[1].to_string());
515                } else {
516                    bail!("BOOLSTYLE must be in format 'TrueVal:FalseVal'");
517                }
518            }
519            "PAGESIZE" => {
520                opts.page_size = val.parse().context("PAGESIZE must be an integer")?;
521            }
522            "MAXOUTPUTSIZE" => {
523                let n: usize = val.parse().context("MAXOUTPUTSIZE must be an integer")?;
524                opts.max_output_size = Some(n);
525            }
526            "REPORTFREQUENCY" => {
527                let n: usize = val.parse().context("REPORTFREQUENCY must be an integer")?;
528                opts.report_frequency = if n == 0 { None } else { Some(n) };
529            }
530            other => {
531                bail!("unknown COPY option: {other}");
532            }
533        }
534    }
535
536    Ok(opts)
537}
538
539/// Split a string on `AND` keywords (case-insensitive), not inside quotes.
540fn split_on_and(s: &str) -> Vec<String> {
541    let mut parts = Vec::new();
542    let mut current = String::new();
543    let upper = s.to_uppercase();
544    let chars: Vec<char> = s.chars().collect();
545    let upper_chars: Vec<char> = upper.chars().collect();
546    let len = chars.len();
547    let mut i = 0;
548    let mut in_quote = false;
549    let mut quote_char = '\'';
550
551    while i < len {
552        if in_quote {
553            if chars[i] == quote_char {
554                in_quote = false;
555            }
556            current.push(chars[i]);
557            i += 1;
558            continue;
559        }
560
561        if chars[i] == '\'' || chars[i] == '"' {
562            in_quote = true;
563            quote_char = chars[i];
564            current.push(chars[i]);
565            i += 1;
566            continue;
567        }
568
569        // Check for " AND " pattern
570        if i + 5 <= len
571            && (i == 0 || chars[i].is_whitespace())
572            && upper_chars[i..].iter().collect::<String>().starts_with(
573                if chars[i].is_whitespace() {
574                    " AND "
575                } else {
576                    "AND "
577                },
578            )
579        {
580            // More precise check
581            let remaining: String = upper_chars[i..].iter().collect();
582            if remaining.starts_with(" AND ") {
583                parts.push(current.clone());
584                current.clear();
585                i += 5; // skip " AND "
586                continue;
587            }
588        }
589
590        current.push(chars[i]);
591        i += 1;
592    }
593
594    if !current.is_empty() {
595        parts.push(current);
596    }
597
598    parts
599}
600
601/// Remove surrounding single or double quotes from a value.
602fn unquote(s: &str) -> String {
603    let s = s.trim();
604    if s.len() >= 2
605        && ((s.starts_with('\'') && s.ends_with('\'')) || (s.starts_with('"') && s.ends_with('"')))
606    {
607        return s[1..s.len() - 1].to_string();
608    }
609    s.to_string()
610}
611
612/// Parse a boolean option value (true/false, yes/no, 1/0).
613fn parse_bool_option(val: &str) -> Result<bool> {
614    match val.to_lowercase().as_str() {
615        "true" | "yes" | "1" => Ok(true),
616        "false" | "no" | "0" => Ok(false),
617        _ => bail!("invalid boolean value: {val}"),
618    }
619}
620
621// ===========================================================================
622// COPY FROM implementation — imports CSV data into a table.
623// ===========================================================================
624
625/// Options specific to COPY FROM (CSV import).
626#[derive(Debug, Clone)]
627pub struct CopyFromOptions {
628    // Shared format options
629    pub delimiter: char,
630    pub quote: char,
631    pub escape: char,
632    pub header: bool,
633    pub null_val: String,
634    pub datetime_format: Option<String>,
635    pub encoding: String,
636    // COPY FROM specific
637    pub chunk_size: usize,
638    pub max_batch_size: usize,
639    pub min_batch_size: usize,
640    pub prepared_statements: bool,
641    pub ttl: Option<u64>,
642    pub max_attempts: usize,
643    pub max_parse_errors: Option<usize>,
644    pub max_insert_errors: Option<usize>,
645    pub err_file: Option<PathBuf>,
646    pub report_frequency: Option<usize>,
647    pub ingest_rate: Option<usize>,
648    pub num_processes: usize,
649}
650
651impl Default for CopyFromOptions {
652    fn default() -> Self {
653        Self {
654            delimiter: ',',
655            quote: '"',
656            escape: '\\',
657            header: false,
658            null_val: String::new(),
659            datetime_format: None,
660            encoding: "utf-8".to_string(),
661            chunk_size: 5000,
662            max_batch_size: 20,
663            min_batch_size: 2,
664            prepared_statements: true,
665            ttl: None,
666            max_attempts: 5,
667            max_parse_errors: None,
668            max_insert_errors: None,
669            err_file: None,
670            report_frequency: None,
671            ingest_rate: None,
672            num_processes: 1,
673        }
674    }
675}
676
677/// A parsed COPY FROM command.
678#[derive(Debug, Clone)]
679pub struct CopyFromCommand {
680    pub keyspace: Option<String>,
681    pub table: String,
682    pub columns: Option<Vec<String>>,
683    pub source: CopyTarget,
684    pub options: CopyFromOptions,
685}
686
687/// Parse a `COPY ... FROM ...` statement.
688///
689/// Format: `COPY [ks.]table [(col1, col2)] FROM 'filename'|STDIN [WITH opt=val AND ...]`
690pub fn parse_copy_from(input: &str) -> Result<CopyFromCommand> {
691    let trimmed = input.trim().trim_end_matches(';').trim();
692
693    let upper = trimmed.to_uppercase();
694    if !upper.starts_with("COPY ") {
695        bail!("not a COPY statement");
696    }
697
698    // Find the FROM keyword (case-insensitive), but not inside parentheses
699    let from_pos = find_keyword_outside_parens(trimmed, "FROM")
700        .context("COPY statement missing FROM keyword")?;
701
702    let before_from = trimmed[4..from_pos].trim(); // skip "COPY"
703    let after_from = trimmed[from_pos + 4..].trim(); // skip "FROM"
704
705    // Parse table spec (before FROM): [ks.]table [(col1, col2)]
706    let (keyspace, table, columns) = parse_table_spec(before_from)?;
707
708    // Parse source and options (after FROM): 'filename'|STDIN [WITH ...]
709    let (source, options_str) = parse_source_and_options(after_from)?;
710
711    let options = if let Some(opts) = options_str {
712        parse_copy_from_options(&opts)?
713    } else {
714        CopyFromOptions::default()
715    };
716
717    Ok(CopyFromCommand {
718        keyspace,
719        table,
720        columns,
721        source,
722        options,
723    })
724}
725
726/// Parse the source and WITH options after the FROM keyword.
727/// Returns `(CopyTarget, Option<options_string>)`.
728fn parse_source_and_options(after_from: &str) -> Result<(CopyTarget, Option<String>)> {
729    let after_from = after_from.trim();
730
731    let with_pos = find_keyword_outside_parens(after_from, "WITH");
732
733    let (source_str, options_str) = match with_pos {
734        Some(pos) => {
735            let source = after_from[..pos].trim();
736            let opts = after_from[pos + 4..].trim();
737            (source, Some(opts.to_string()))
738        }
739        None => (after_from, None),
740    };
741
742    let source_str = source_str.trim();
743
744    let source = if source_str.eq_ignore_ascii_case("STDIN") {
745        CopyTarget::Stdin
746    } else {
747        let path_str = if (source_str.starts_with('\'') && source_str.ends_with('\''))
748            || (source_str.starts_with('"') && source_str.ends_with('"'))
749        {
750            &source_str[1..source_str.len() - 1]
751        } else {
752            source_str
753        };
754        CopyTarget::File(PathBuf::from(path_str))
755    };
756
757    Ok((source, options_str))
758}
759
760/// Parse `opt1=val1 AND opt2=val2 ...` pairs into `CopyFromOptions`.
761fn parse_copy_from_options(options_str: &str) -> Result<CopyFromOptions> {
762    let mut opts = CopyFromOptions::default();
763
764    let parts = split_on_and(options_str);
765
766    for part in parts {
767        let part = part.trim();
768        if part.is_empty() {
769            continue;
770        }
771
772        let eq_pos = part
773            .find('=')
774            .with_context(|| format!("invalid option (missing '='): {part}"))?;
775        let key = part[..eq_pos].trim().to_uppercase();
776        let val = unquote(part[eq_pos + 1..].trim());
777
778        match key.as_str() {
779            "DELIMITER" => {
780                opts.delimiter = val
781                    .chars()
782                    .next()
783                    .context("DELIMITER must be a single character")?;
784            }
785            "QUOTE" => {
786                opts.quote = val
787                    .chars()
788                    .next()
789                    .context("QUOTE must be a single character")?;
790            }
791            "ESCAPE" => {
792                opts.escape = val
793                    .chars()
794                    .next()
795                    .context("ESCAPE must be a single character")?;
796            }
797            "HEADER" => {
798                opts.header = parse_bool_option(&val)?;
799            }
800            "NULL" | "NULLVAL" => {
801                opts.null_val = val;
802            }
803            "DATETIMEFORMAT" => {
804                opts.datetime_format = if val.is_empty() { None } else { Some(val) };
805            }
806            "ENCODING" => {
807                opts.encoding = val;
808            }
809            "CHUNKSIZE" => {
810                opts.chunk_size = val.parse().context("CHUNKSIZE must be an integer")?;
811            }
812            "MAXBATCHSIZE" => {
813                opts.max_batch_size = val.parse().context("MAXBATCHSIZE must be an integer")?;
814            }
815            "MINBATCHSIZE" => {
816                opts.min_batch_size = val.parse().context("MINBATCHSIZE must be an integer")?;
817            }
818            "PREPAREDSTATEMENTS" => {
819                opts.prepared_statements = parse_bool_option(&val)?;
820            }
821            "TTL" => {
822                let n: u64 = val.parse().context("TTL must be a positive integer")?;
823                opts.ttl = Some(n);
824            }
825            "MAXATTEMPTS" => {
826                opts.max_attempts = val.parse().context("MAXATTEMPTS must be an integer")?;
827            }
828            "MAXPARSEERRORS" => {
829                if val == "-1" {
830                    opts.max_parse_errors = None;
831                } else {
832                    let n: usize = val.parse().context("MAXPARSEERRORS must be an integer")?;
833                    opts.max_parse_errors = Some(n);
834                }
835            }
836            "MAXINSERTERRORS" => {
837                if val == "-1" {
838                    opts.max_insert_errors = None;
839                } else {
840                    let n: usize = val.parse().context("MAXINSERTERRORS must be an integer")?;
841                    opts.max_insert_errors = Some(n);
842                }
843            }
844            "ERRFILE" | "ERRORSFILE" => {
845                opts.err_file = if val.is_empty() {
846                    None
847                } else {
848                    Some(PathBuf::from(val))
849                };
850            }
851            "REPORTFREQUENCY" => {
852                let n: usize = val.parse().context("REPORTFREQUENCY must be an integer")?;
853                opts.report_frequency = if n == 0 { None } else { Some(n) };
854            }
855            "INGESTRATE" => {
856                if val == "-1" || val == "0" {
857                    opts.ingest_rate = None;
858                } else {
859                    let n: usize = val.parse().context("INGESTRATE must be an integer")?;
860                    opts.ingest_rate = Some(n);
861                }
862            }
863            "NUMPROCESSES" => {
864                let n: usize = val.parse().context("NUMPROCESSES must be an integer")?;
865                opts.num_processes = n.max(1);
866            }
867            other => {
868                bail!("unknown COPY FROM option: {other}");
869            }
870        }
871    }
872
873    Ok(opts)
874}
875
876// ---------------------------------------------------------------------------
877// Type-aware CSV ↔ CQL conversion
878// ---------------------------------------------------------------------------
879
880/// Convert a CSV string field to a typed `CqlValue` based on the CQL column type.
881///
882/// `type_name` is the raw CQL type string from `system_schema.columns.type`
883/// (e.g. `"int"`, `"text"`, `"list<int>"`, `"frozen<set<uuid>>"`).
884/// Complex nested types (list, set, map, tuple, frozen, udt) are preserved as
885/// `CqlValue::Text` literals — the database will parse them via the unprepared path.
886pub fn csv_str_to_cql_value(field: &str, type_name: &str, null_val: &str) -> Result<CqlValue> {
887    // Null check (exact match with configured null_val, or empty string when null_val is empty)
888    if field == null_val || (null_val.is_empty() && field.is_empty()) {
889        return Ok(CqlValue::Null);
890    }
891
892    let base_type = strip_frozen(type_name).to_lowercase();
893    let base_type = base_type.as_str();
894
895    match base_type {
896        "ascii" => Ok(CqlValue::Ascii(field.to_string())),
897        "text" | "varchar" => Ok(CqlValue::Text(field.to_string())),
898        "boolean" => {
899            let b = match field.to_lowercase().as_str() {
900                "true" | "yes" | "on" | "1" => true,
901                "false" | "no" | "off" | "0" => false,
902                _ => bail!("invalid boolean value: {field:?}"),
903            };
904            Ok(CqlValue::Boolean(b))
905        }
906        "int" => Ok(CqlValue::Int(
907            field
908                .parse::<i32>()
909                .with_context(|| format!("invalid int: {field:?}"))?,
910        )),
911        "bigint" | "counter" => Ok(CqlValue::BigInt(
912            field
913                .parse::<i64>()
914                .with_context(|| format!("invalid bigint: {field:?}"))?,
915        )),
916        "smallint" => Ok(CqlValue::SmallInt(
917            field
918                .parse::<i16>()
919                .with_context(|| format!("invalid smallint: {field:?}"))?,
920        )),
921        "tinyint" => Ok(CqlValue::TinyInt(
922            field
923                .parse::<i8>()
924                .with_context(|| format!("invalid tinyint: {field:?}"))?,
925        )),
926        "float" => Ok(CqlValue::Float(
927            field
928                .parse::<f32>()
929                .with_context(|| format!("invalid float: {field:?}"))?,
930        )),
931        "double" => Ok(CqlValue::Double(
932            field
933                .parse::<f64>()
934                .with_context(|| format!("invalid double: {field:?}"))?,
935        )),
936        "uuid" => {
937            let u =
938                uuid::Uuid::parse_str(field).with_context(|| format!("invalid uuid: {field:?}"))?;
939            Ok(CqlValue::Uuid(u))
940        }
941        "timeuuid" => {
942            let u = uuid::Uuid::parse_str(field)
943                .with_context(|| format!("invalid timeuuid: {field:?}"))?;
944            Ok(CqlValue::TimeUuid(u))
945        }
946        "timestamp" => {
947            // Try RFC 3339 first (handles 'Z' and offset formats)
948            if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(field) {
949                return Ok(CqlValue::Timestamp(dt.timestamp_millis()));
950            }
951            // Try space-separated ISO-8601 with numeric offset
952            let formats = [
953                "%Y-%m-%d %H:%M:%S%.f%z",
954                "%Y-%m-%dT%H:%M:%S%.f%z",
955                "%Y-%m-%dT%H:%M:%S%z",
956                "%Y-%m-%d %H:%M:%S%z",
957                "%Y-%m-%d %H:%M:%S%.3f+0000",
958            ];
959            for fmt in &formats {
960                if let Ok(dt) = DateTime::parse_from_str(field, fmt) {
961                    return Ok(CqlValue::Timestamp(dt.timestamp_millis()));
962                }
963            }
964            // Try plain date (midnight UTC)
965            if let Ok(d) = NaiveDate::parse_from_str(field, "%Y-%m-%d") {
966                let dt = d.and_hms_opt(0, 0, 0).unwrap();
967                return Ok(CqlValue::Timestamp(dt.and_utc().timestamp_millis()));
968            }
969            // Try milliseconds since epoch as a number
970            if let Ok(ms) = field.parse::<i64>() {
971                return Ok(CqlValue::Timestamp(ms));
972            }
973            bail!("invalid timestamp: {field:?}")
974        }
975        "date" => {
976            let d = NaiveDate::parse_from_str(field, "%Y-%m-%d")
977                .with_context(|| format!("invalid date (expected YYYY-MM-DD): {field:?}"))?;
978            Ok(CqlValue::Date(d))
979        }
980        "time" => {
981            // Accept HH:MM:SS, HH:MM:SS.nnn, HH:MM:SS.nnnnnnnnn
982            let formats = ["%H:%M:%S%.f", "%H:%M:%S"];
983            for fmt in &formats {
984                if let Ok(t) = NaiveTime::parse_from_str(field, fmt) {
985                    return Ok(CqlValue::Time(t));
986                }
987            }
988            bail!("invalid time (expected HH:MM:SS[.nnn]): {field:?}")
989        }
990        "inet" => {
991            let addr = field
992                .parse::<IpAddr>()
993                .with_context(|| format!("invalid inet: {field:?}"))?;
994            Ok(CqlValue::Inet(addr))
995        }
996        "blob" => {
997            // Accept "0x..." hex or plain hex
998            let hex = field.strip_prefix("0x").unwrap_or(field);
999            if !hex.len().is_multiple_of(2) {
1000                bail!("invalid blob (odd number of hex digits): {field:?}");
1001            }
1002            let bytes = (0..hex.len())
1003                .step_by(2)
1004                .map(|i| {
1005                    u8::from_str_radix(&hex[i..i + 2], 16)
1006                        .with_context(|| format!("invalid hex byte at offset {i}: {field:?}"))
1007                })
1008                .collect::<Result<Vec<u8>>>()?;
1009            Ok(CqlValue::Blob(bytes))
1010        }
1011        "varint" => {
1012            let n = field
1013                .parse::<num_bigint::BigInt>()
1014                .with_context(|| format!("invalid varint: {field:?}"))?;
1015            Ok(CqlValue::Varint(n))
1016        }
1017        "decimal" => {
1018            let d = field
1019                .parse::<bigdecimal::BigDecimal>()
1020                .with_context(|| format!("invalid decimal: {field:?}"))?;
1021            Ok(CqlValue::Decimal(d))
1022        }
1023        // duration, list<*>, set<*>, map<*>, tuple<*>, and unknown types:
1024        // pass through as Text; the database parses the CQL literal.
1025        _ => Ok(CqlValue::Text(field.to_string())),
1026    }
1027}
1028
1029/// Strip the `frozen<...>` wrapper from a CQL type name, if present.
1030fn strip_frozen(type_name: &str) -> &str {
1031    let lower = type_name.to_lowercase();
1032    if lower.starts_with("frozen<") && type_name.ends_with('>') {
1033        &type_name[7..type_name.len() - 1]
1034    } else {
1035        type_name
1036    }
1037}
1038
1039/// Convert a `CqlValue` to a CQL insert literal string.
1040///
1041/// Used in the unprepared (string-based) INSERT path. Produces values that can
1042/// be embedded directly into a CQL statement without further quoting by the
1043/// caller.
1044fn cql_value_to_insert_literal(v: &CqlValue) -> String {
1045    match v {
1046        CqlValue::Null | CqlValue::Unset => "null".to_string(),
1047        CqlValue::Text(s) | CqlValue::Ascii(s) => {
1048            format!("'{}'", s.replace('\'', "''"))
1049        }
1050        CqlValue::Boolean(b) => if *b { "true" } else { "false" }.to_string(),
1051        CqlValue::Int(n) => n.to_string(),
1052        CqlValue::BigInt(n) | CqlValue::Counter(n) => n.to_string(),
1053        CqlValue::SmallInt(n) => n.to_string(),
1054        CqlValue::TinyInt(n) => n.to_string(),
1055        CqlValue::Float(f) => {
1056            if f.is_nan() {
1057                "NaN".to_string()
1058            } else if f.is_infinite() {
1059                if f.is_sign_positive() {
1060                    "Infinity".to_string()
1061                } else {
1062                    "-Infinity".to_string()
1063                }
1064            } else {
1065                f.to_string()
1066            }
1067        }
1068        CqlValue::Double(d) => {
1069            if d.is_nan() {
1070                "NaN".to_string()
1071            } else if d.is_infinite() {
1072                if d.is_sign_positive() {
1073                    "Infinity".to_string()
1074                } else {
1075                    "-Infinity".to_string()
1076                }
1077            } else {
1078                d.to_string()
1079            }
1080        }
1081        CqlValue::Varint(n) => n.to_string(),
1082        CqlValue::Decimal(d) => d.to_string(),
1083        CqlValue::Uuid(u) | CqlValue::TimeUuid(u) => u.to_string(),
1084        CqlValue::Timestamp(ms) => {
1085            // Format as ISO-8601 string, quoted
1086            match DateTime::from_timestamp_millis(*ms) {
1087                Some(dt) => {
1088                    let utc: DateTime<Utc> = dt;
1089                    format!("'{}'", utc.format("%Y-%m-%d %H:%M:%S%.6f+0000"))
1090                }
1091                None => format!("{ms}"),
1092            }
1093        }
1094        CqlValue::Date(d) => format!("'{d}'"),
1095        CqlValue::Time(t) => format!("'{t}'"),
1096        CqlValue::Inet(addr) => format!("'{addr}'"),
1097        CqlValue::Blob(bytes) => {
1098            let mut s = String::with_capacity(2 + bytes.len() * 2);
1099            s.push_str("0x");
1100            for b in bytes {
1101                s.push_str(&format!("{b:02x}"));
1102            }
1103            s
1104        }
1105        CqlValue::Duration {
1106            months,
1107            days,
1108            nanoseconds,
1109        } => {
1110            format!("{months}mo{days}d{nanoseconds}ns")
1111        }
1112        // Collections and UDTs: use Display which outputs CQL literal format
1113        CqlValue::List(_)
1114        | CqlValue::Set(_)
1115        | CqlValue::Map(_)
1116        | CqlValue::Tuple(_)
1117        | CqlValue::UserDefinedType { .. } => v.to_string(),
1118    }
1119}
1120
1121// ---------------------------------------------------------------------------
1122// Token bucket rate limiter for INGESTRATE
1123// ---------------------------------------------------------------------------
1124
1125/// Simple token bucket for rate limiting row inserts.
1126struct TokenBucket {
1127    rate: f64,
1128    tokens: f64,
1129    last: Instant,
1130}
1131
1132impl TokenBucket {
1133    fn new(rows_per_second: usize) -> Self {
1134        Self {
1135            rate: rows_per_second as f64,
1136            tokens: rows_per_second as f64,
1137            last: Instant::now(),
1138        }
1139    }
1140
1141    async fn acquire(&mut self) {
1142        let now = Instant::now();
1143        let elapsed = now.duration_since(self.last).as_secs_f64();
1144        self.tokens = (self.tokens + elapsed * self.rate).min(self.rate);
1145        self.last = now;
1146
1147        if self.tokens < 1.0 {
1148            let wait_secs = (1.0 - self.tokens) / self.rate;
1149            tokio::time::sleep(Duration::from_secs_f64(wait_secs)).await;
1150            self.tokens = 0.0;
1151        } else {
1152            self.tokens -= 1.0;
1153        }
1154    }
1155}
1156
1157/// Execute a COPY FROM command, importing CSV data into a table.
1158pub async fn execute_copy_from(
1159    session: &CqlSession,
1160    cmd: &CopyFromCommand,
1161    current_keyspace: Option<&str>,
1162) -> Result<()> {
1163    let start = Instant::now();
1164
1165    // Resolve keyspace and table spec
1166    let table_spec = match (&cmd.keyspace, current_keyspace) {
1167        (Some(ks), _) => format!("{}.{}", ks, cmd.table),
1168        (None, Some(ks)) => format!("{}.{}", ks, cmd.table),
1169        (None, None) => cmd.table.clone(),
1170    };
1171
1172    let source_name = match &cmd.source {
1173        CopyTarget::File(path) => format!("'{}'", path.display()),
1174        CopyTarget::Stdin => "STDIN".to_string(),
1175        CopyTarget::Stdout => unreachable!("COPY FROM cannot use STDOUT"),
1176    };
1177
1178    let ttl_clause = match cmd.options.ttl {
1179        Some(ttl) => format!(" USING TTL {ttl}"),
1180        None => String::new(),
1181    };
1182
1183    // Query schema for column metadata: (name, kind, position, type_name)
1184    let ks_for_schema = cmd
1185        .keyspace
1186        .as_deref()
1187        .or(current_keyspace)
1188        .context("no keyspace specified and no current keyspace set")?;
1189    let schema_query = format!(
1190        "SELECT column_name, kind, position, type FROM system_schema.columns \
1191         WHERE keyspace_name = '{}' AND table_name = '{}'",
1192        ks_for_schema, cmd.table
1193    );
1194    let schema_result = session.execute_query(&schema_query).await?;
1195
1196    // Collect and sort into CREATE TABLE order
1197    let mut schema_cols: Vec<(String, String, i32, String)> = Vec::new();
1198    for row in &schema_result.rows {
1199        let name = match row.values.first() {
1200            Some(CqlValue::Text(n)) => n.clone(),
1201            _ => continue,
1202        };
1203        let kind = match row.values.get(1) {
1204            Some(CqlValue::Text(k)) => k.clone(),
1205            _ => "regular".to_string(),
1206        };
1207        let position = match row.values.get(2) {
1208            Some(CqlValue::Int(p)) => *p,
1209            _ => -1,
1210        };
1211        let type_name = match row.values.get(3) {
1212            Some(CqlValue::Text(t)) => t.clone(),
1213            _ => "text".to_string(),
1214        };
1215        schema_cols.push((name, kind, position, type_name));
1216    }
1217    if schema_cols.is_empty() {
1218        bail!(
1219            "could not determine columns for table '{}' — table may not exist",
1220            table_spec
1221        );
1222    }
1223    schema_cols.sort_by(|a, b| {
1224        let kind_order = |k: &str| -> i32 {
1225            match k {
1226                "partition_key" => 0,
1227                "clustering" => 1,
1228                "static" => 2,
1229                _ => 3,
1230            }
1231        };
1232        kind_order(&a.1)
1233            .cmp(&kind_order(&b.1))
1234            .then(a.2.cmp(&b.2))
1235            .then(a.0.cmp(&b.0))
1236    });
1237
1238    // type lookup: column_name → type_name (for header-derived ordering)
1239    let type_map: std::collections::HashMap<String, String> = schema_cols
1240        .iter()
1241        .map(|(n, _, _, t)| (n.clone(), t.clone()))
1242        .collect();
1243
1244    // Preliminary column list: explicit columns or schema order
1245    let prelim_columns: Vec<(String, String)> = match &cmd.columns {
1246        Some(explicit) => explicit
1247            .iter()
1248            .map(|n| {
1249                let t = type_map
1250                    .get(n)
1251                    .cloned()
1252                    .unwrap_or_else(|| "text".to_string());
1253                (n.clone(), t)
1254            })
1255            .collect(),
1256        None => schema_cols.into_iter().map(|(n, _, _, t)| (n, t)).collect(),
1257    };
1258
1259    // Open CSV reader
1260    let reader: Box<dyn std::io::Read> = match &cmd.source {
1261        CopyTarget::File(path) => {
1262            let file = std::fs::File::open(path)
1263                .with_context(|| format!("failed to open file: {}", path.display()))?;
1264            Box::new(std::io::BufReader::new(file))
1265        }
1266        CopyTarget::Stdin => Box::new(std::io::stdin().lock()),
1267        CopyTarget::Stdout => bail!("COPY FROM cannot read from STDOUT"),
1268    };
1269    let mut csv_reader = csv::ReaderBuilder::new()
1270        .delimiter(cmd.options.delimiter as u8)
1271        .quote(cmd.options.quote as u8)
1272        .escape(Some(cmd.options.escape as u8))
1273        .has_headers(cmd.options.header)
1274        .flexible(true)
1275        .from_reader(reader);
1276
1277    // When HEADER=true and no explicit columns, column order comes from CSV header
1278    let columns: Vec<(String, String)> = if cmd.options.header && cmd.columns.is_none() {
1279        let headers = csv_reader
1280            .headers()
1281            .context("failed to read CSV header row")?;
1282        headers
1283            .iter()
1284            .map(|h| {
1285                let name = h.trim().to_string();
1286                let t = type_map
1287                    .get(&name)
1288                    .cloned()
1289                    .unwrap_or_else(|| "text".to_string());
1290                (name, t)
1291            })
1292            .collect()
1293    } else {
1294        prelim_columns
1295    };
1296
1297    let col_list: String = columns
1298        .iter()
1299        .map(|(n, _)| n.as_str())
1300        .collect::<Vec<_>>()
1301        .join(", ");
1302    let col_type_names: Vec<String> = columns.iter().map(|(_, t)| t.clone()).collect();
1303
1304    // Prepare INSERT statement (done after finalizing column list)
1305    let prepared_id = if cmd.options.prepared_statements {
1306        let placeholders = vec!["?"; columns.len()].join(", ");
1307        let insert_template =
1308            format!("INSERT INTO {table_spec} ({col_list}) VALUES ({placeholders}){ttl_clause}");
1309        Some(
1310            session
1311                .prepare(&insert_template)
1312                .await
1313                .with_context(|| format!("failed to prepare: {insert_template}"))?,
1314        )
1315    } else {
1316        None
1317    };
1318
1319    // Open error file
1320    let mut err_writer: Option<std::io::BufWriter<std::fs::File>> = match &cmd.options.err_file {
1321        Some(path) => {
1322            let file = std::fs::File::create(path)
1323                .with_context(|| format!("failed to create error file: {}", path.display()))?;
1324            Some(std::io::BufWriter::new(file))
1325        }
1326        None => None,
1327    };
1328
1329    let mut row_count: usize = 0;
1330    let mut parse_errors: usize = 0;
1331    let mut insert_errors: usize = 0;
1332    let mut rate_limiter = cmd.options.ingest_rate.map(TokenBucket::new);
1333    let num_processes = cmd.options.num_processes.max(1);
1334    let chunk_size = cmd.options.chunk_size.max(1);
1335
1336    let max_attempts = cmd.options.max_attempts;
1337    let max_parse_errors = cmd.options.max_parse_errors;
1338    let max_insert_errors = cmd.options.max_insert_errors;
1339    let report_frequency = cmd.options.report_frequency;
1340    let null_val = &cmd.options.null_val;
1341
1342    let mut csv_records = csv_reader.records();
1343
1344    'outer: loop {
1345        // --- Parse phase: fill a chunk of CHUNKSIZE typed rows ---
1346        let mut chunk: Vec<Vec<CqlValue>> = Vec::with_capacity(chunk_size);
1347
1348        'fill: loop {
1349            if chunk.len() >= chunk_size {
1350                break 'fill;
1351            }
1352            let record = match csv_records.next() {
1353                None => break 'fill,
1354                Some(Err(e)) => {
1355                    parse_errors += 1;
1356                    let msg = format!("CSV parse error on row {}: {e}", row_count + parse_errors);
1357                    eprintln!("{msg}");
1358                    if let Some(ref mut w) = err_writer {
1359                        let _ = writeln!(w, "{msg}");
1360                    }
1361                    if let Some(max) = max_parse_errors {
1362                        if parse_errors > max {
1363                            bail!("Exceeded maximum parse errors ({max}). Aborting.");
1364                        }
1365                    }
1366                    continue 'fill;
1367                }
1368                Some(Ok(r)) => r,
1369            };
1370
1371            if record.len() != col_type_names.len() {
1372                parse_errors += 1;
1373                let msg = format!(
1374                    "Row {}: expected {} columns but got {}",
1375                    row_count + parse_errors,
1376                    col_type_names.len(),
1377                    record.len()
1378                );
1379                eprintln!("{msg}");
1380                if let Some(ref mut w) = err_writer {
1381                    let _ = writeln!(w, "{msg}");
1382                }
1383                if let Some(max) = max_parse_errors {
1384                    if parse_errors > max {
1385                        bail!("Exceeded maximum number of parse errors ({max}). Aborting import.");
1386                    }
1387                }
1388                continue 'fill;
1389            }
1390
1391            let mut row_values: Vec<CqlValue> = Vec::with_capacity(col_type_names.len());
1392            let mut row_ok = true;
1393            for (field, type_name) in record.iter().zip(col_type_names.iter()) {
1394                match csv_str_to_cql_value(field, type_name, null_val) {
1395                    Ok(v) => row_values.push(v),
1396                    Err(e) => {
1397                        parse_errors += 1;
1398                        let msg = format!(
1399                            "Row {}: type error for '{}': {e}",
1400                            row_count + parse_errors,
1401                            type_name
1402                        );
1403                        eprintln!("{msg}");
1404                        if let Some(ref mut w) = err_writer {
1405                            let _ = writeln!(w, "{msg}");
1406                        }
1407                        if let Some(max) = max_parse_errors {
1408                            if parse_errors > max {
1409                                bail!("Exceeded maximum parse errors ({max}). Aborting.");
1410                            }
1411                        }
1412                        row_ok = false;
1413                        break;
1414                    }
1415                }
1416            }
1417            if row_ok {
1418                chunk.push(row_values);
1419            }
1420        }
1421
1422        if chunk.is_empty() {
1423            break 'outer;
1424        }
1425
1426        // --- Rate limiting ---
1427        if let Some(ref mut bucket) = rate_limiter {
1428            for _ in 0..chunk.len() {
1429                bucket.acquire().await;
1430            }
1431        }
1432
1433        // --- Insert phase: execute chunk concurrently ---
1434        let insert_results: Vec<Result<()>> = futures::stream::iter(chunk)
1435            .map(|values| {
1436                let ts = table_spec.as_str();
1437                let cl = col_list.as_str();
1438                let ttl = ttl_clause.as_str();
1439                let pid = prepared_id.as_ref();
1440                async move {
1441                    insert_row_with_retry(session, pid, ts, cl, ttl, &values, max_attempts).await
1442                }
1443            })
1444            .buffer_unordered(num_processes)
1445            .collect()
1446            .await;
1447
1448        for result in insert_results {
1449            match result {
1450                Ok(()) => row_count += 1,
1451                Err(e) => {
1452                    insert_errors += 1;
1453                    let msg = format!("Insert error on row {}: {e}", row_count + insert_errors);
1454                    eprintln!("{msg}");
1455                    if let Some(ref mut w) = err_writer {
1456                        let _ = writeln!(w, "{msg}");
1457                    }
1458                    if let Some(max) = max_insert_errors {
1459                        if insert_errors > max {
1460                            bail!("Exceeded maximum number of insert errors ({max}). Aborting import.");
1461                        }
1462                    }
1463                }
1464            }
1465        }
1466
1467        // --- Progress report ---
1468        if let Some(freq) = report_frequency {
1469            let total = row_count + insert_errors + parse_errors;
1470            if freq > 0 && total > 0 && total.is_multiple_of(freq) {
1471                eprintln!("Processed {} rows...", row_count);
1472            }
1473        }
1474    }
1475
1476    if let Some(ref mut w) = err_writer {
1477        w.flush()?;
1478    }
1479
1480    let elapsed = start.elapsed().as_secs_f64();
1481    println!("{row_count} rows imported from {source_name} in {elapsed:.3}s.");
1482    if parse_errors > 0 {
1483        eprintln!("{parse_errors} parse error(s) encountered.");
1484    }
1485    if insert_errors > 0 {
1486        eprintln!("{insert_errors} insert error(s) encountered.");
1487    }
1488
1489    Ok(())
1490}
1491
1492/// Execute a single row INSERT with retry on failure.
1493///
1494/// When `prepared_id` is `Some`, uses the prepared statement path with typed
1495/// bound values. Otherwise builds a literal-value INSERT string.
1496async fn insert_row_with_retry(
1497    session: &CqlSession,
1498    prepared_id: Option<&PreparedId>,
1499    table_spec: &str,
1500    col_list: &str,
1501    ttl_clause: &str,
1502    values: &[CqlValue],
1503    max_attempts: usize,
1504) -> Result<()> {
1505    let max = max_attempts.max(1);
1506    let mut last_err = anyhow::anyhow!("no attempts made");
1507
1508    for attempt in 1..=max {
1509        let result = if let Some(id) = prepared_id {
1510            session.execute_prepared(id, values).await
1511        } else {
1512            let literals: Vec<String> = values.iter().map(cql_value_to_insert_literal).collect();
1513            let insert = format!(
1514                "INSERT INTO {} ({}) VALUES ({}){};",
1515                table_spec,
1516                col_list,
1517                literals.join(", "),
1518                ttl_clause
1519            );
1520            session.execute_query(&insert).await
1521        };
1522
1523        match result {
1524            Ok(_) => return Ok(()),
1525            Err(e) => {
1526                last_err = e;
1527                if attempt < max {
1528                    // Exponential backoff: 100ms * 2^(attempt-1), capped at 2s
1529                    let wait_ms = (100u64 * (1u64 << (attempt - 1).min(4))).min(2000);
1530                    tokio::time::sleep(Duration::from_millis(wait_ms)).await;
1531                }
1532            }
1533        }
1534    }
1535
1536    Err(last_err)
1537}
1538
1539#[cfg(test)]
1540mod tests {
1541    use super::*;
1542
1543    #[test]
1544    fn parse_copy_to_basic() {
1545        let cmd = parse_copy_to("COPY ks.table TO '/tmp/out.csv'").unwrap();
1546        assert_eq!(cmd.keyspace, Some("ks".to_string()));
1547        assert_eq!(cmd.table, "table");
1548        assert_eq!(cmd.columns, None);
1549        assert_eq!(
1550            cmd.filename,
1551            CopyTarget::File(PathBuf::from("/tmp/out.csv"))
1552        );
1553    }
1554
1555    #[test]
1556    fn parse_copy_to_with_columns() {
1557        let cmd = parse_copy_to("COPY ks.table (col1, col2) TO '/tmp/out.csv'").unwrap();
1558        assert_eq!(cmd.keyspace, Some("ks".to_string()));
1559        assert_eq!(cmd.table, "table");
1560        assert_eq!(
1561            cmd.columns,
1562            Some(vec!["col1".to_string(), "col2".to_string()])
1563        );
1564        assert_eq!(
1565            cmd.filename,
1566            CopyTarget::File(PathBuf::from("/tmp/out.csv"))
1567        );
1568    }
1569
1570    #[test]
1571    fn parse_copy_to_stdout() {
1572        let cmd = parse_copy_to("COPY ks.table TO STDOUT").unwrap();
1573        assert_eq!(cmd.filename, CopyTarget::Stdout);
1574    }
1575
1576    #[test]
1577    fn parse_copy_to_with_options() {
1578        let cmd =
1579            parse_copy_to("COPY ks.table TO '/tmp/out.csv' WITH DELIMITER='|' AND HEADER=true")
1580                .unwrap();
1581        assert_eq!(cmd.options.delimiter, '|');
1582        assert!(cmd.options.header);
1583    }
1584
1585    #[test]
1586    fn format_value_null() {
1587        let opts = CopyOptions::default();
1588        assert_eq!(format_value_for_csv(&CqlValue::Null, &opts), "");
1589    }
1590
1591    #[test]
1592    fn format_value_text() {
1593        let opts = CopyOptions::default();
1594        assert_eq!(
1595            format_value_for_csv(&CqlValue::Text("hello".to_string()), &opts),
1596            "hello"
1597        );
1598    }
1599
1600    #[test]
1601    fn format_value_boolean() {
1602        let opts = CopyOptions::default();
1603        assert_eq!(
1604            format_value_for_csv(&CqlValue::Boolean(true), &opts),
1605            "True"
1606        );
1607        assert_eq!(
1608            format_value_for_csv(&CqlValue::Boolean(false), &opts),
1609            "False"
1610        );
1611    }
1612
1613    #[test]
1614    fn format_value_float_precision() {
1615        let opts = CopyOptions {
1616            float_precision: 3,
1617            ..Default::default()
1618        };
1619        assert_eq!(
1620            format_value_for_csv(&CqlValue::Float(1.23456), &opts),
1621            "1.235"
1622        );
1623    }
1624
1625    #[test]
1626    fn default_options() {
1627        let opts = CopyOptions::default();
1628        assert_eq!(opts.delimiter, ',');
1629        assert_eq!(opts.quote, '"');
1630        assert_eq!(opts.escape, '\\');
1631        assert!(!opts.header);
1632        assert_eq!(opts.null_val, "");
1633        assert_eq!(opts.datetime_format, None);
1634        assert_eq!(opts.encoding, "utf-8");
1635        assert_eq!(opts.float_precision, 5);
1636        assert_eq!(opts.double_precision, 12);
1637        assert_eq!(opts.decimal_sep, '.');
1638        assert_eq!(opts.thousands_sep, None);
1639        assert_eq!(opts.bool_style, ("True".to_string(), "False".to_string()));
1640        assert_eq!(opts.page_size, 1000);
1641        assert_eq!(opts.max_output_size, None);
1642        assert_eq!(opts.report_frequency, None);
1643    }
1644
1645    // -----------------------------------------------------------------------
1646    // COPY FROM tests
1647    // -----------------------------------------------------------------------
1648
1649    #[test]
1650    fn parse_copy_from_basic() {
1651        let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv'").unwrap();
1652        assert_eq!(cmd.keyspace, Some("ks".to_string()));
1653        assert_eq!(cmd.table, "table");
1654        assert_eq!(cmd.columns, None);
1655        assert_eq!(cmd.source, CopyTarget::File(PathBuf::from("/tmp/in.csv")));
1656    }
1657
1658    #[test]
1659    fn parse_copy_from_with_columns() {
1660        let cmd = parse_copy_from("COPY ks.table (col1, col2) FROM '/tmp/in.csv'").unwrap();
1661        assert_eq!(cmd.keyspace, Some("ks".to_string()));
1662        assert_eq!(cmd.table, "table");
1663        assert_eq!(
1664            cmd.columns,
1665            Some(vec!["col1".to_string(), "col2".to_string()])
1666        );
1667    }
1668
1669    #[test]
1670    fn parse_copy_from_stdin() {
1671        let cmd = parse_copy_from("COPY ks.table FROM STDIN").unwrap();
1672        assert_eq!(cmd.source, CopyTarget::Stdin);
1673    }
1674
1675    #[test]
1676    fn parse_copy_from_stdin_case_insensitive() {
1677        let cmd = parse_copy_from("COPY ks.table FROM stdin").unwrap();
1678        assert_eq!(cmd.source, CopyTarget::Stdin);
1679    }
1680
1681    #[test]
1682    fn parse_copy_from_no_keyspace() {
1683        let cmd = parse_copy_from("COPY mytable FROM '/data/file.csv'").unwrap();
1684        assert_eq!(cmd.keyspace, None);
1685        assert_eq!(cmd.table, "mytable");
1686    }
1687
1688    #[test]
1689    fn parse_copy_from_with_options() {
1690        let cmd = parse_copy_from(
1691            "COPY ks.table FROM '/tmp/in.csv' WITH TTL=3600 AND HEADER=true AND CHUNKSIZE=1000 AND DELIMITER='|'",
1692        )
1693        .unwrap();
1694        assert_eq!(cmd.options.ttl, Some(3600));
1695        assert!(cmd.options.header);
1696        assert_eq!(cmd.options.chunk_size, 1000);
1697        assert_eq!(cmd.options.delimiter, '|');
1698    }
1699
1700    #[test]
1701    fn parse_copy_from_with_error_options() {
1702        let cmd = parse_copy_from(
1703            "COPY ks.table FROM '/tmp/in.csv' WITH MAXPARSEERRORS=100 AND MAXINSERTERRORS=50 AND ERRFILE='/tmp/err.log'",
1704        )
1705        .unwrap();
1706        assert_eq!(cmd.options.max_parse_errors, Some(100));
1707        assert_eq!(cmd.options.max_insert_errors, Some(50));
1708        assert_eq!(cmd.options.err_file, Some(PathBuf::from("/tmp/err.log")));
1709    }
1710
1711    #[test]
1712    fn parse_copy_from_with_batch_options() {
1713        let cmd = parse_copy_from(
1714            "COPY ks.table FROM '/tmp/in.csv' WITH MAXBATCHSIZE=50 AND MINBATCHSIZE=5 AND MAXATTEMPTS=10",
1715        )
1716        .unwrap();
1717        assert_eq!(cmd.options.max_batch_size, 50);
1718        assert_eq!(cmd.options.min_batch_size, 5);
1719        assert_eq!(cmd.options.max_attempts, 10);
1720    }
1721
1722    #[test]
1723    fn parse_copy_from_semicolon() {
1724        let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv';").unwrap();
1725        assert_eq!(cmd.source, CopyTarget::File(PathBuf::from("/tmp/in.csv")));
1726    }
1727
1728    #[test]
1729    fn default_copy_from_options() {
1730        let opts = CopyFromOptions::default();
1731        assert_eq!(opts.delimiter, ',');
1732        assert_eq!(opts.quote, '"');
1733        assert_eq!(opts.escape, '\\');
1734        assert!(!opts.header);
1735        assert_eq!(opts.null_val, "");
1736        assert_eq!(opts.datetime_format, None);
1737        assert_eq!(opts.encoding, "utf-8");
1738        assert_eq!(opts.chunk_size, 5000);
1739        assert_eq!(opts.max_batch_size, 20);
1740        assert_eq!(opts.min_batch_size, 2);
1741        assert!(opts.prepared_statements);
1742        assert_eq!(opts.ttl, None);
1743        assert_eq!(opts.max_attempts, 5);
1744        assert_eq!(opts.max_parse_errors, None);
1745        assert_eq!(opts.max_insert_errors, None);
1746        assert_eq!(opts.err_file, None);
1747        assert_eq!(opts.report_frequency, None);
1748        assert_eq!(opts.ingest_rate, None);
1749        assert_eq!(opts.num_processes, 1);
1750    }
1751
1752    // -----------------------------------------------------------------------
1753    // csv_str_to_cql_value unit tests
1754    // -----------------------------------------------------------------------
1755
1756    #[test]
1757    fn csv_to_cql_text_types() {
1758        let v = csv_str_to_cql_value("hello", "text", "").unwrap();
1759        assert_eq!(v, CqlValue::Text("hello".to_string()));
1760
1761        let v = csv_str_to_cql_value("hi", "ascii", "").unwrap();
1762        assert_eq!(v, CqlValue::Ascii("hi".to_string()));
1763
1764        let v = csv_str_to_cql_value("world", "varchar", "").unwrap();
1765        assert_eq!(v, CqlValue::Text("world".to_string())); // varchar → Text
1766    }
1767
1768    #[test]
1769    fn csv_to_cql_int_types() {
1770        assert_eq!(
1771            csv_str_to_cql_value("42", "int", "").unwrap(),
1772            CqlValue::Int(42)
1773        );
1774        assert_eq!(
1775            csv_str_to_cql_value("-100", "bigint", "").unwrap(),
1776            CqlValue::BigInt(-100)
1777        );
1778        assert_eq!(
1779            csv_str_to_cql_value("1000", "counter", "").unwrap(),
1780            CqlValue::BigInt(1000)
1781        );
1782        assert_eq!(
1783            csv_str_to_cql_value("32767", "smallint", "").unwrap(),
1784            CqlValue::SmallInt(32767)
1785        );
1786        assert_eq!(
1787            csv_str_to_cql_value("127", "tinyint", "").unwrap(),
1788            CqlValue::TinyInt(127)
1789        );
1790    }
1791
1792    #[test]
1793    fn csv_to_cql_float_types() {
1794        match csv_str_to_cql_value("1.5", "float", "").unwrap() {
1795            CqlValue::Float(f) => assert!((f - 1.5f32).abs() < 1e-5),
1796            other => panic!("expected Float, got {other:?}"),
1797        }
1798        match csv_str_to_cql_value("1.5", "double", "").unwrap() {
1799            CqlValue::Double(d) => assert!((d - 1.5f64).abs() < 1e-9),
1800            other => panic!("expected Double, got {other:?}"),
1801        }
1802        // Scientific notation
1803        assert!(matches!(
1804            csv_str_to_cql_value("1e10", "double", "").unwrap(),
1805            CqlValue::Double(_)
1806        ));
1807    }
1808
1809    #[test]
1810    fn csv_to_cql_boolean() {
1811        for t in &["true", "True", "TRUE", "yes", "YES", "on", "ON", "1"] {
1812            assert_eq!(
1813                csv_str_to_cql_value(t, "boolean", "").unwrap(),
1814                CqlValue::Boolean(true),
1815                "expected true for {t:?}"
1816            );
1817        }
1818        for f in &["false", "False", "FALSE", "no", "NO", "off", "OFF", "0"] {
1819            assert_eq!(
1820                csv_str_to_cql_value(f, "boolean", "").unwrap(),
1821                CqlValue::Boolean(false),
1822                "expected false for {f:?}"
1823            );
1824        }
1825    }
1826
1827    #[test]
1828    fn csv_to_cql_uuid() {
1829        let uuid_str = "550e8400-e29b-41d4-a716-446655440000";
1830        assert!(matches!(
1831            csv_str_to_cql_value(uuid_str, "uuid", "").unwrap(),
1832            CqlValue::Uuid(_)
1833        ));
1834        assert!(matches!(
1835            csv_str_to_cql_value(uuid_str, "timeuuid", "").unwrap(),
1836            CqlValue::TimeUuid(_)
1837        ));
1838        // Invalid UUID
1839        assert!(csv_str_to_cql_value("not-a-uuid", "uuid", "").is_err());
1840    }
1841
1842    #[test]
1843    fn csv_to_cql_timestamp() {
1844        // ISO-8601 with timezone
1845        let v = csv_str_to_cql_value("2024-01-15T12:34:56Z", "timestamp", "").unwrap();
1846        assert!(matches!(v, CqlValue::Timestamp(_)));
1847
1848        // Milliseconds as integer
1849        let v = csv_str_to_cql_value("1705318496000", "timestamp", "").unwrap();
1850        assert_eq!(v, CqlValue::Timestamp(1705318496000));
1851    }
1852
1853    #[test]
1854    fn csv_to_cql_date() {
1855        use chrono::NaiveDate;
1856        let v = csv_str_to_cql_value("2024-01-15", "date", "").unwrap();
1857        assert_eq!(
1858            v,
1859            CqlValue::Date(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap())
1860        );
1861
1862        assert!(csv_str_to_cql_value("not-a-date", "date", "").is_err());
1863    }
1864
1865    #[test]
1866    fn csv_to_cql_time() {
1867        let v = csv_str_to_cql_value("12:34:56", "time", "").unwrap();
1868        assert!(matches!(v, CqlValue::Time(_)));
1869
1870        let v = csv_str_to_cql_value("12:34:56.789", "time", "").unwrap();
1871        assert!(matches!(v, CqlValue::Time(_)));
1872
1873        assert!(csv_str_to_cql_value("not-a-time", "time", "").is_err());
1874    }
1875
1876    #[test]
1877    fn csv_to_cql_inet() {
1878        let v = csv_str_to_cql_value("127.0.0.1", "inet", "").unwrap();
1879        assert!(matches!(v, CqlValue::Inet(_)));
1880
1881        let v = csv_str_to_cql_value("::1", "inet", "").unwrap();
1882        assert!(matches!(v, CqlValue::Inet(_)));
1883
1884        assert!(csv_str_to_cql_value("not.an.ip", "inet", "").is_err());
1885    }
1886
1887    #[test]
1888    fn csv_to_cql_blob() {
1889        let v = csv_str_to_cql_value("0xdeadbeef", "blob", "").unwrap();
1890        assert_eq!(v, CqlValue::Blob(vec![0xde, 0xad, 0xbe, 0xef]));
1891
1892        // Without 0x prefix
1893        let v = csv_str_to_cql_value("deadbeef", "blob", "").unwrap();
1894        assert_eq!(v, CqlValue::Blob(vec![0xde, 0xad, 0xbe, 0xef]));
1895
1896        // Invalid hex
1897        assert!(csv_str_to_cql_value("0xgg", "blob", "").is_err());
1898        // Odd number of digits
1899        assert!(csv_str_to_cql_value("0xabc", "blob", "").is_err());
1900    }
1901
1902    #[test]
1903    fn csv_to_cql_null_handling() {
1904        // Empty field with empty null_val → Null
1905        assert_eq!(csv_str_to_cql_value("", "int", "").unwrap(), CqlValue::Null);
1906        assert_eq!(
1907            csv_str_to_cql_value("", "text", "").unwrap(),
1908            CqlValue::Null
1909        );
1910    }
1911
1912    #[test]
1913    fn csv_to_cql_null_custom() {
1914        // Custom null_val
1915        assert_eq!(
1916            csv_str_to_cql_value("NULL", "int", "NULL").unwrap(),
1917            CqlValue::Null
1918        );
1919        assert_eq!(
1920            csv_str_to_cql_value("N/A", "text", "N/A").unwrap(),
1921            CqlValue::Null
1922        );
1923        // Non-null value with custom null_val
1924        assert!(matches!(
1925            csv_str_to_cql_value("42", "int", "NULL").unwrap(),
1926            CqlValue::Int(42)
1927        ));
1928    }
1929
1930    #[test]
1931    fn csv_to_cql_unknown_type_fallback() {
1932        // Unknown types fall back to Text
1933        let v = csv_str_to_cql_value("hello", "customtype", "").unwrap();
1934        assert_eq!(v, CqlValue::Text("hello".to_string()));
1935
1936        // Collection types also fall through to Text
1937        let v = csv_str_to_cql_value("[1, 2, 3]", "list<int>", "").unwrap();
1938        assert_eq!(v, CqlValue::Text("[1, 2, 3]".to_string()));
1939    }
1940
1941    #[test]
1942    fn csv_to_cql_parse_error_int() {
1943        // Non-numeric for int type → error
1944        assert!(csv_str_to_cql_value("notanint", "int", "").is_err());
1945        assert!(csv_str_to_cql_value("3.14", "int", "").is_err());
1946        assert!(csv_str_to_cql_value("notanint", "bigint", "").is_err());
1947    }
1948
1949    #[test]
1950    fn csv_to_cql_varint_and_decimal() {
1951        let v = csv_str_to_cql_value("123456789012345678901234567890", "varint", "").unwrap();
1952        assert!(matches!(v, CqlValue::Varint(_)));
1953
1954        let v = csv_str_to_cql_value("3.141592653589793", "decimal", "").unwrap();
1955        assert!(matches!(v, CqlValue::Decimal(_)));
1956    }
1957
1958    #[test]
1959    fn csv_to_cql_frozen_stripped() {
1960        // frozen<set<uuid>> should strip frozen wrapper → set<uuid> → Text fallback
1961        let v = csv_str_to_cql_value("{uuid1, uuid2}", "frozen<set<uuid>>", "").unwrap();
1962        assert!(matches!(v, CqlValue::Text(_)));
1963    }
1964
1965    #[test]
1966    fn parse_copy_from_numprocesses() {
1967        let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv' WITH NUMPROCESSES=4").unwrap();
1968        assert_eq!(cmd.options.num_processes, 4);
1969    }
1970}