1use std::io::Write;
8use std::net::IpAddr;
9use std::path::PathBuf;
10use std::time::{Duration, Instant};
11
12use anyhow::{bail, Context, Result};
13use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
14use futures::StreamExt;
15
16use crate::driver::types::CqlValue;
17use crate::driver::PreparedId;
18use crate::session::CqlSession;
19
20#[derive(Debug, Clone, PartialEq)]
22pub enum CopyTarget {
23 File(PathBuf),
25 Stdout,
27 Stdin,
29}
30
31#[derive(Debug, Clone)]
33pub struct CopyOptions {
34 pub delimiter: char,
35 pub quote: char,
36 pub escape: char,
37 pub header: bool,
38 pub null_val: String,
39 pub datetime_format: Option<String>,
40 pub encoding: String,
41 pub float_precision: usize,
42 pub double_precision: usize,
43 pub decimal_sep: char,
44 pub thousands_sep: Option<char>,
45 pub bool_style: (String, String),
46 pub page_size: usize,
47 pub max_output_size: Option<usize>,
48 pub report_frequency: Option<usize>,
49}
50
51impl Default for CopyOptions {
52 fn default() -> Self {
53 Self {
54 delimiter: ',',
55 quote: '"',
56 escape: '\\',
57 header: false,
58 null_val: String::new(),
59 datetime_format: None,
60 encoding: "utf-8".to_string(),
61 float_precision: 5,
62 double_precision: 12,
63 decimal_sep: '.',
64 thousands_sep: None,
65 bool_style: ("True".to_string(), "False".to_string()),
66 page_size: 1000,
67 max_output_size: None,
68 report_frequency: None,
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
75pub struct CopyToCommand {
76 pub keyspace: Option<String>,
77 pub table: String,
78 pub columns: Option<Vec<String>>,
79 pub filename: CopyTarget,
80 pub options: CopyOptions,
81}
82
83pub fn parse_copy_to(input: &str) -> Result<CopyToCommand> {
87 let trimmed = input.trim().trim_end_matches(';').trim();
88
89 let upper = trimmed.to_uppercase();
91 if !upper.starts_with("COPY ") {
92 bail!("not a COPY statement");
93 }
94
95 let to_pos =
97 find_keyword_outside_parens(trimmed, "TO").context("COPY statement missing TO keyword")?;
98
99 let before_to = trimmed[4..to_pos].trim(); let after_to = trimmed[to_pos + 2..].trim(); let (keyspace, table, columns) = parse_table_spec(before_to)?;
104
105 let (filename, options_str) = parse_target_and_options(after_to)?;
107
108 let options = if let Some(opts) = options_str {
109 parse_options(&opts)?
110 } else {
111 CopyOptions::default()
112 };
113
114 Ok(CopyToCommand {
115 keyspace,
116 table,
117 columns,
118 filename,
119 options,
120 })
121}
122
123pub async fn execute_copy_to(
125 session: &CqlSession,
126 cmd: &CopyToCommand,
127 current_keyspace: Option<&str>,
128) -> Result<()> {
129 let col_spec = match &cmd.columns {
131 Some(cols) => cols.join(", "),
132 None => "*".to_string(),
133 };
134
135 let table_spec = match (&cmd.keyspace, current_keyspace) {
136 (Some(ks), _) => format!("{}.{}", ks, cmd.table),
137 (None, Some(ks)) => format!("{}.{}", ks, cmd.table),
138 (None, None) => cmd.table.clone(),
139 };
140
141 let query = format!("SELECT {} FROM {}", col_spec, table_spec);
142
143 let result = session.execute_query(&query).await?;
144
145 let mut row_count: usize = 0;
147
148 match &cmd.filename {
149 CopyTarget::File(path) => {
150 let file = std::fs::File::create(path)
151 .with_context(|| format!("failed to create file: {}", path.display()))?;
152 let buf = std::io::BufWriter::new(file);
153 let mut wtr = build_csv_writer(&cmd.options, buf);
154
155 if cmd.options.header {
156 let headers: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
157 wtr.write_record(&headers)?;
158 }
159
160 for row in &result.rows {
161 if let Some(max) = cmd.options.max_output_size {
162 if row_count >= max {
163 break;
164 }
165 }
166 let fields: Vec<String> = row
167 .values
168 .iter()
169 .map(|v| format_value_for_csv(v, &cmd.options))
170 .collect();
171 wtr.write_record(&fields)?;
172 row_count += 1;
173
174 if let Some(freq) = cmd.options.report_frequency {
175 if freq > 0 && row_count.is_multiple_of(freq) {
176 eprintln!("Processed {} rows...", row_count);
177 }
178 }
179 }
180
181 wtr.flush()?;
182 println!("{} rows exported to '{}'.", row_count, path.display());
183 }
184 CopyTarget::Stdout => {
185 let stdout = std::io::stdout();
186 let handle = stdout.lock();
187 let mut wtr = build_csv_writer(&cmd.options, handle);
188
189 if cmd.options.header {
190 let headers: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
191 wtr.write_record(&headers)?;
192 }
193
194 for row in &result.rows {
195 if let Some(max) = cmd.options.max_output_size {
196 if row_count >= max {
197 break;
198 }
199 }
200 let fields: Vec<String> = row
201 .values
202 .iter()
203 .map(|v| format_value_for_csv(v, &cmd.options))
204 .collect();
205 wtr.write_record(&fields)?;
206 row_count += 1;
207
208 if let Some(freq) = cmd.options.report_frequency {
209 if freq > 0 && row_count.is_multiple_of(freq) {
210 eprintln!("Processed {} rows...", row_count);
211 }
212 }
213 }
214
215 wtr.flush()?;
216 eprintln!("{} rows exported to STDOUT.", row_count);
217 }
218 CopyTarget::Stdin => {
219 bail!("COPY TO cannot write to STDIN");
220 }
221 }
222
223 Ok(())
224}
225
226pub fn format_value_for_csv(value: &CqlValue, options: &CopyOptions) -> String {
228 match value {
229 CqlValue::Null | CqlValue::Unset => options.null_val.clone(),
230 CqlValue::Text(s) | CqlValue::Ascii(s) => s.clone(),
231 CqlValue::Boolean(b) => {
232 if *b {
233 options.bool_style.0.clone()
234 } else {
235 options.bool_style.1.clone()
236 }
237 }
238 CqlValue::Int(v) => v.to_string(),
239 CqlValue::BigInt(v) => v.to_string(),
240 CqlValue::SmallInt(v) => v.to_string(),
241 CqlValue::TinyInt(v) => v.to_string(),
242 CqlValue::Counter(v) => v.to_string(),
243 CqlValue::Varint(v) => v.to_string(),
244 CqlValue::Float(v) => format_float(*v as f64, options.float_precision, options),
245 CqlValue::Double(v) => format_float(*v, options.double_precision, options),
246 CqlValue::Decimal(v) => {
247 let s = v.to_string();
248 if options.decimal_sep != '.' {
249 s.replace('.', &options.decimal_sep.to_string())
250 } else {
251 s
252 }
253 }
254 CqlValue::Timestamp(millis) => format_timestamp(*millis, options),
255 CqlValue::Uuid(u) | CqlValue::TimeUuid(u) => u.to_string(),
256 CqlValue::Blob(bytes) => {
257 let mut s = String::with_capacity(2 + bytes.len() * 2);
258 s.push_str("0x");
259 for b in bytes {
260 s.push_str(&format!("{b:02x}"));
261 }
262 s
263 }
264 CqlValue::Date(d) => d.to_string(),
265 CqlValue::Time(t) => t.to_string(),
266 CqlValue::Duration {
267 months,
268 days,
269 nanoseconds,
270 } => format!("{months}mo{days}d{nanoseconds}ns"),
271 CqlValue::Inet(addr) => addr.to_string(),
272 CqlValue::List(_)
274 | CqlValue::Set(_)
275 | CqlValue::Map(_)
276 | CqlValue::Tuple(_)
277 | CqlValue::UserDefinedType { .. } => value.to_string(),
278 }
279}
280
281fn build_csv_writer<W: Write>(options: &CopyOptions, writer: W) -> csv::Writer<W> {
287 csv::WriterBuilder::new()
288 .delimiter(options.delimiter as u8)
289 .quote(options.quote as u8)
290 .escape(options.escape as u8)
291 .double_quote(false)
292 .from_writer(writer)
293}
294
295fn format_float(v: f64, precision: usize, options: &CopyOptions) -> String {
297 if v.is_nan() {
298 return "NaN".to_string();
299 }
300 if v.is_infinite() {
301 return if v.is_sign_positive() {
302 "Infinity".to_string()
303 } else {
304 "-Infinity".to_string()
305 };
306 }
307 let s = format!("{v:.prec$}", prec = precision);
308 if options.decimal_sep != '.' {
309 s.replace('.', &options.decimal_sep.to_string())
310 } else {
311 s
312 }
313}
314
315fn format_timestamp(millis: i64, options: &CopyOptions) -> String {
317 match DateTime::from_timestamp_millis(millis) {
318 Some(dt) => {
319 let utc: DateTime<Utc> = dt;
320 match &options.datetime_format {
321 Some(fmt) => utc.format(fmt).to_string(),
322 None => utc.format("%Y-%m-%d %H:%M:%S%.6f%z").to_string(),
323 }
324 }
325 None => format!("<invalid timestamp: {millis}>"),
326 }
327}
328
329fn find_keyword_outside_parens(s: &str, keyword: &str) -> Option<usize> {
332 let upper = s.to_uppercase();
333 let kw_upper = keyword.to_uppercase();
334 let kw_len = kw_upper.len();
335 let mut depth: i32 = 0;
336 let mut in_quote = false;
337 let mut quote_char: char = '\'';
338 let bytes = s.as_bytes();
339
340 for (i, ch) in s.char_indices() {
341 if in_quote {
342 if ch == quote_char {
343 in_quote = false;
344 }
345 continue;
346 }
347 match ch {
348 '\'' | '"' => {
349 in_quote = true;
350 quote_char = ch;
351 }
352 '(' => depth += 1,
353 ')' => depth -= 1,
354 _ => {}
355 }
356 if depth == 0 && !in_quote {
357 if i + kw_len <= upper.len() && upper[i..i + kw_len] == *kw_upper {
359 let is_word_char = |b: u8| b.is_ascii_alphanumeric() || b == b'_';
361 let before_ok = i == 0 || !is_word_char(bytes[i - 1]);
362 let after_ok = i + kw_len >= s.len() || !is_word_char(bytes[i + kw_len]);
363 if before_ok && after_ok {
364 return Some(i);
365 }
366 }
367 }
368 }
369 None
370}
371
372fn parse_table_spec(spec: &str) -> Result<(Option<String>, String, Option<Vec<String>>)> {
374 let spec = spec.trim();
375
376 let (table_part, columns) = if let Some(paren_start) = spec.find('(') {
378 let paren_end = spec
379 .rfind(')')
380 .context("unmatched parenthesis in column list")?;
381 let cols_str = &spec[paren_start + 1..paren_end];
382 let cols: Vec<String> = cols_str
383 .split(',')
384 .map(|c| c.trim().to_string())
385 .filter(|c| !c.is_empty())
386 .collect();
387 (spec[..paren_start].trim(), Some(cols))
388 } else {
389 (spec, None)
390 };
391
392 let (keyspace, table) = if let Some(dot_pos) = table_part.find('.') {
394 let ks = table_part[..dot_pos].trim().to_string();
395 let tbl = table_part[dot_pos + 1..].trim().to_string();
396 (Some(ks), tbl)
397 } else {
398 (None, table_part.trim().to_string())
399 };
400
401 if table.is_empty() {
402 bail!("missing table name in COPY statement");
403 }
404
405 Ok((keyspace, table, columns))
406}
407
408fn parse_target_and_options(after_to: &str) -> Result<(CopyTarget, Option<String>)> {
411 let after_to = after_to.trim();
412
413 let with_pos = find_keyword_outside_parens(after_to, "WITH");
415
416 let (target_str, options_str) = match with_pos {
417 Some(pos) => {
418 let target = after_to[..pos].trim();
419 let opts = after_to[pos + 4..].trim(); (target, Some(opts.to_string()))
421 }
422 None => (after_to, None),
423 };
424
425 let target_str = target_str.trim();
426
427 let target = if target_str.eq_ignore_ascii_case("STDOUT") {
428 CopyTarget::Stdout
429 } else {
430 let path_str = if (target_str.starts_with('\'') && target_str.ends_with('\''))
432 || (target_str.starts_with('"') && target_str.ends_with('"'))
433 {
434 &target_str[1..target_str.len() - 1]
435 } else {
436 target_str
437 };
438 CopyTarget::File(PathBuf::from(path_str))
439 };
440
441 Ok((target, options_str))
442}
443
444fn parse_options(options_str: &str) -> Result<CopyOptions> {
446 let mut opts = CopyOptions::default();
447
448 let parts = split_on_and(options_str);
450
451 for part in parts {
452 let part = part.trim();
453 if part.is_empty() {
454 continue;
455 }
456
457 let eq_pos = part
458 .find('=')
459 .with_context(|| format!("invalid option (missing '='): {part}"))?;
460 let key = part[..eq_pos].trim().to_uppercase();
461 let val = unquote(part[eq_pos + 1..].trim());
462
463 match key.as_str() {
464 "DELIMITER" => {
465 opts.delimiter = val
466 .chars()
467 .next()
468 .context("DELIMITER must be a single character")?;
469 }
470 "QUOTE" => {
471 opts.quote = val
472 .chars()
473 .next()
474 .context("QUOTE must be a single character")?;
475 }
476 "ESCAPE" => {
477 opts.escape = val
478 .chars()
479 .next()
480 .context("ESCAPE must be a single character")?;
481 }
482 "HEADER" => {
483 opts.header = parse_bool_option(&val)?;
484 }
485 "NULL" | "NULLVAL" => {
486 opts.null_val = val;
487 }
488 "DATETIMEFORMAT" => {
489 opts.datetime_format = if val.is_empty() { None } else { Some(val) };
490 }
491 "ENCODING" => {
492 opts.encoding = val;
493 }
494 "FLOATPRECISION" => {
495 opts.float_precision = val.parse().context("FLOATPRECISION must be an integer")?;
496 }
497 "DOUBLEPRECISION" => {
498 opts.double_precision =
499 val.parse().context("DOUBLEPRECISION must be an integer")?;
500 }
501 "DECIMALSEP" => {
502 opts.decimal_sep = val
503 .chars()
504 .next()
505 .context("DECIMALSEP must be a single character")?;
506 }
507 "THOUSANDSSEP" => {
508 opts.thousands_sep = val.chars().next();
509 }
510 "BOOLSTYLE" => {
511 let parts: Vec<&str> = val.splitn(2, ':').collect();
513 if parts.len() == 2 {
514 opts.bool_style = (parts[0].to_string(), parts[1].to_string());
515 } else {
516 bail!("BOOLSTYLE must be in format 'TrueVal:FalseVal'");
517 }
518 }
519 "PAGESIZE" => {
520 opts.page_size = val.parse().context("PAGESIZE must be an integer")?;
521 }
522 "MAXOUTPUTSIZE" => {
523 let n: usize = val.parse().context("MAXOUTPUTSIZE must be an integer")?;
524 opts.max_output_size = Some(n);
525 }
526 "REPORTFREQUENCY" => {
527 let n: usize = val.parse().context("REPORTFREQUENCY must be an integer")?;
528 opts.report_frequency = if n == 0 { None } else { Some(n) };
529 }
530 other => {
531 bail!("unknown COPY option: {other}");
532 }
533 }
534 }
535
536 Ok(opts)
537}
538
539fn split_on_and(s: &str) -> Vec<String> {
541 let mut parts = Vec::new();
542 let mut current = String::new();
543 let upper = s.to_uppercase();
544 let chars: Vec<char> = s.chars().collect();
545 let upper_chars: Vec<char> = upper.chars().collect();
546 let len = chars.len();
547 let mut i = 0;
548 let mut in_quote = false;
549 let mut quote_char = '\'';
550
551 while i < len {
552 if in_quote {
553 if chars[i] == quote_char {
554 in_quote = false;
555 }
556 current.push(chars[i]);
557 i += 1;
558 continue;
559 }
560
561 if chars[i] == '\'' || chars[i] == '"' {
562 in_quote = true;
563 quote_char = chars[i];
564 current.push(chars[i]);
565 i += 1;
566 continue;
567 }
568
569 if i + 5 <= len
571 && (i == 0 || chars[i].is_whitespace())
572 && upper_chars[i..].iter().collect::<String>().starts_with(
573 if chars[i].is_whitespace() {
574 " AND "
575 } else {
576 "AND "
577 },
578 )
579 {
580 let remaining: String = upper_chars[i..].iter().collect();
582 if remaining.starts_with(" AND ") {
583 parts.push(current.clone());
584 current.clear();
585 i += 5; continue;
587 }
588 }
589
590 current.push(chars[i]);
591 i += 1;
592 }
593
594 if !current.is_empty() {
595 parts.push(current);
596 }
597
598 parts
599}
600
601fn unquote(s: &str) -> String {
603 let s = s.trim();
604 if s.len() >= 2
605 && ((s.starts_with('\'') && s.ends_with('\'')) || (s.starts_with('"') && s.ends_with('"')))
606 {
607 return s[1..s.len() - 1].to_string();
608 }
609 s.to_string()
610}
611
612fn parse_bool_option(val: &str) -> Result<bool> {
614 match val.to_lowercase().as_str() {
615 "true" | "yes" | "1" => Ok(true),
616 "false" | "no" | "0" => Ok(false),
617 _ => bail!("invalid boolean value: {val}"),
618 }
619}
620
621#[derive(Debug, Clone)]
627pub struct CopyFromOptions {
628 pub delimiter: char,
630 pub quote: char,
631 pub escape: char,
632 pub header: bool,
633 pub null_val: String,
634 pub datetime_format: Option<String>,
635 pub encoding: String,
636 pub chunk_size: usize,
638 pub max_batch_size: usize,
639 pub min_batch_size: usize,
640 pub prepared_statements: bool,
641 pub ttl: Option<u64>,
642 pub max_attempts: usize,
643 pub max_parse_errors: Option<usize>,
644 pub max_insert_errors: Option<usize>,
645 pub err_file: Option<PathBuf>,
646 pub report_frequency: Option<usize>,
647 pub ingest_rate: Option<usize>,
648 pub num_processes: usize,
649}
650
651impl Default for CopyFromOptions {
652 fn default() -> Self {
653 Self {
654 delimiter: ',',
655 quote: '"',
656 escape: '\\',
657 header: false,
658 null_val: String::new(),
659 datetime_format: None,
660 encoding: "utf-8".to_string(),
661 chunk_size: 5000,
662 max_batch_size: 20,
663 min_batch_size: 2,
664 prepared_statements: true,
665 ttl: None,
666 max_attempts: 5,
667 max_parse_errors: None,
668 max_insert_errors: None,
669 err_file: None,
670 report_frequency: None,
671 ingest_rate: None,
672 num_processes: 1,
673 }
674 }
675}
676
677#[derive(Debug, Clone)]
679pub struct CopyFromCommand {
680 pub keyspace: Option<String>,
681 pub table: String,
682 pub columns: Option<Vec<String>>,
683 pub source: CopyTarget,
684 pub options: CopyFromOptions,
685}
686
687pub fn parse_copy_from(input: &str) -> Result<CopyFromCommand> {
691 let trimmed = input.trim().trim_end_matches(';').trim();
692
693 let upper = trimmed.to_uppercase();
694 if !upper.starts_with("COPY ") {
695 bail!("not a COPY statement");
696 }
697
698 let from_pos = find_keyword_outside_parens(trimmed, "FROM")
700 .context("COPY statement missing FROM keyword")?;
701
702 let before_from = trimmed[4..from_pos].trim(); let after_from = trimmed[from_pos + 4..].trim(); let (keyspace, table, columns) = parse_table_spec(before_from)?;
707
708 let (source, options_str) = parse_source_and_options(after_from)?;
710
711 let options = if let Some(opts) = options_str {
712 parse_copy_from_options(&opts)?
713 } else {
714 CopyFromOptions::default()
715 };
716
717 Ok(CopyFromCommand {
718 keyspace,
719 table,
720 columns,
721 source,
722 options,
723 })
724}
725
726fn parse_source_and_options(after_from: &str) -> Result<(CopyTarget, Option<String>)> {
729 let after_from = after_from.trim();
730
731 let with_pos = find_keyword_outside_parens(after_from, "WITH");
732
733 let (source_str, options_str) = match with_pos {
734 Some(pos) => {
735 let source = after_from[..pos].trim();
736 let opts = after_from[pos + 4..].trim();
737 (source, Some(opts.to_string()))
738 }
739 None => (after_from, None),
740 };
741
742 let source_str = source_str.trim();
743
744 let source = if source_str.eq_ignore_ascii_case("STDIN") {
745 CopyTarget::Stdin
746 } else {
747 let path_str = if (source_str.starts_with('\'') && source_str.ends_with('\''))
748 || (source_str.starts_with('"') && source_str.ends_with('"'))
749 {
750 &source_str[1..source_str.len() - 1]
751 } else {
752 source_str
753 };
754 CopyTarget::File(PathBuf::from(path_str))
755 };
756
757 Ok((source, options_str))
758}
759
760fn parse_copy_from_options(options_str: &str) -> Result<CopyFromOptions> {
762 let mut opts = CopyFromOptions::default();
763
764 let parts = split_on_and(options_str);
765
766 for part in parts {
767 let part = part.trim();
768 if part.is_empty() {
769 continue;
770 }
771
772 let eq_pos = part
773 .find('=')
774 .with_context(|| format!("invalid option (missing '='): {part}"))?;
775 let key = part[..eq_pos].trim().to_uppercase();
776 let val = unquote(part[eq_pos + 1..].trim());
777
778 match key.as_str() {
779 "DELIMITER" => {
780 opts.delimiter = val
781 .chars()
782 .next()
783 .context("DELIMITER must be a single character")?;
784 }
785 "QUOTE" => {
786 opts.quote = val
787 .chars()
788 .next()
789 .context("QUOTE must be a single character")?;
790 }
791 "ESCAPE" => {
792 opts.escape = val
793 .chars()
794 .next()
795 .context("ESCAPE must be a single character")?;
796 }
797 "HEADER" => {
798 opts.header = parse_bool_option(&val)?;
799 }
800 "NULL" | "NULLVAL" => {
801 opts.null_val = val;
802 }
803 "DATETIMEFORMAT" => {
804 opts.datetime_format = if val.is_empty() { None } else { Some(val) };
805 }
806 "ENCODING" => {
807 opts.encoding = val;
808 }
809 "CHUNKSIZE" => {
810 opts.chunk_size = val.parse().context("CHUNKSIZE must be an integer")?;
811 }
812 "MAXBATCHSIZE" => {
813 opts.max_batch_size = val.parse().context("MAXBATCHSIZE must be an integer")?;
814 }
815 "MINBATCHSIZE" => {
816 opts.min_batch_size = val.parse().context("MINBATCHSIZE must be an integer")?;
817 }
818 "PREPAREDSTATEMENTS" => {
819 opts.prepared_statements = parse_bool_option(&val)?;
820 }
821 "TTL" => {
822 let n: u64 = val.parse().context("TTL must be a positive integer")?;
823 opts.ttl = Some(n);
824 }
825 "MAXATTEMPTS" => {
826 opts.max_attempts = val.parse().context("MAXATTEMPTS must be an integer")?;
827 }
828 "MAXPARSEERRORS" => {
829 if val == "-1" {
830 opts.max_parse_errors = None;
831 } else {
832 let n: usize = val.parse().context("MAXPARSEERRORS must be an integer")?;
833 opts.max_parse_errors = Some(n);
834 }
835 }
836 "MAXINSERTERRORS" => {
837 if val == "-1" {
838 opts.max_insert_errors = None;
839 } else {
840 let n: usize = val.parse().context("MAXINSERTERRORS must be an integer")?;
841 opts.max_insert_errors = Some(n);
842 }
843 }
844 "ERRFILE" | "ERRORSFILE" => {
845 opts.err_file = if val.is_empty() {
846 None
847 } else {
848 Some(PathBuf::from(val))
849 };
850 }
851 "REPORTFREQUENCY" => {
852 let n: usize = val.parse().context("REPORTFREQUENCY must be an integer")?;
853 opts.report_frequency = if n == 0 { None } else { Some(n) };
854 }
855 "INGESTRATE" => {
856 if val == "-1" || val == "0" {
857 opts.ingest_rate = None;
858 } else {
859 let n: usize = val.parse().context("INGESTRATE must be an integer")?;
860 opts.ingest_rate = Some(n);
861 }
862 }
863 "NUMPROCESSES" => {
864 let n: usize = val.parse().context("NUMPROCESSES must be an integer")?;
865 opts.num_processes = n.max(1);
866 }
867 other => {
868 bail!("unknown COPY FROM option: {other}");
869 }
870 }
871 }
872
873 Ok(opts)
874}
875
876pub fn csv_str_to_cql_value(field: &str, type_name: &str, null_val: &str) -> Result<CqlValue> {
887 if field == null_val || (null_val.is_empty() && field.is_empty()) {
889 return Ok(CqlValue::Null);
890 }
891
892 let base_type = strip_frozen(type_name).to_lowercase();
893 let base_type = base_type.as_str();
894
895 match base_type {
896 "ascii" => Ok(CqlValue::Ascii(field.to_string())),
897 "text" | "varchar" => Ok(CqlValue::Text(field.to_string())),
898 "boolean" => {
899 let b = match field.to_lowercase().as_str() {
900 "true" | "yes" | "on" | "1" => true,
901 "false" | "no" | "off" | "0" => false,
902 _ => bail!("invalid boolean value: {field:?}"),
903 };
904 Ok(CqlValue::Boolean(b))
905 }
906 "int" => Ok(CqlValue::Int(
907 field
908 .parse::<i32>()
909 .with_context(|| format!("invalid int: {field:?}"))?,
910 )),
911 "bigint" | "counter" => Ok(CqlValue::BigInt(
912 field
913 .parse::<i64>()
914 .with_context(|| format!("invalid bigint: {field:?}"))?,
915 )),
916 "smallint" => Ok(CqlValue::SmallInt(
917 field
918 .parse::<i16>()
919 .with_context(|| format!("invalid smallint: {field:?}"))?,
920 )),
921 "tinyint" => Ok(CqlValue::TinyInt(
922 field
923 .parse::<i8>()
924 .with_context(|| format!("invalid tinyint: {field:?}"))?,
925 )),
926 "float" => Ok(CqlValue::Float(
927 field
928 .parse::<f32>()
929 .with_context(|| format!("invalid float: {field:?}"))?,
930 )),
931 "double" => Ok(CqlValue::Double(
932 field
933 .parse::<f64>()
934 .with_context(|| format!("invalid double: {field:?}"))?,
935 )),
936 "uuid" => {
937 let u =
938 uuid::Uuid::parse_str(field).with_context(|| format!("invalid uuid: {field:?}"))?;
939 Ok(CqlValue::Uuid(u))
940 }
941 "timeuuid" => {
942 let u = uuid::Uuid::parse_str(field)
943 .with_context(|| format!("invalid timeuuid: {field:?}"))?;
944 Ok(CqlValue::TimeUuid(u))
945 }
946 "timestamp" => {
947 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(field) {
949 return Ok(CqlValue::Timestamp(dt.timestamp_millis()));
950 }
951 let formats = [
953 "%Y-%m-%d %H:%M:%S%.f%z",
954 "%Y-%m-%dT%H:%M:%S%.f%z",
955 "%Y-%m-%dT%H:%M:%S%z",
956 "%Y-%m-%d %H:%M:%S%z",
957 "%Y-%m-%d %H:%M:%S%.3f+0000",
958 ];
959 for fmt in &formats {
960 if let Ok(dt) = DateTime::parse_from_str(field, fmt) {
961 return Ok(CqlValue::Timestamp(dt.timestamp_millis()));
962 }
963 }
964 if let Ok(d) = NaiveDate::parse_from_str(field, "%Y-%m-%d") {
966 let dt = d.and_hms_opt(0, 0, 0).unwrap();
967 return Ok(CqlValue::Timestamp(dt.and_utc().timestamp_millis()));
968 }
969 if let Ok(ms) = field.parse::<i64>() {
971 return Ok(CqlValue::Timestamp(ms));
972 }
973 bail!("invalid timestamp: {field:?}")
974 }
975 "date" => {
976 let d = NaiveDate::parse_from_str(field, "%Y-%m-%d")
977 .with_context(|| format!("invalid date (expected YYYY-MM-DD): {field:?}"))?;
978 Ok(CqlValue::Date(d))
979 }
980 "time" => {
981 let formats = ["%H:%M:%S%.f", "%H:%M:%S"];
983 for fmt in &formats {
984 if let Ok(t) = NaiveTime::parse_from_str(field, fmt) {
985 return Ok(CqlValue::Time(t));
986 }
987 }
988 bail!("invalid time (expected HH:MM:SS[.nnn]): {field:?}")
989 }
990 "inet" => {
991 let addr = field
992 .parse::<IpAddr>()
993 .with_context(|| format!("invalid inet: {field:?}"))?;
994 Ok(CqlValue::Inet(addr))
995 }
996 "blob" => {
997 let hex = field.strip_prefix("0x").unwrap_or(field);
999 if !hex.len().is_multiple_of(2) {
1000 bail!("invalid blob (odd number of hex digits): {field:?}");
1001 }
1002 let bytes = (0..hex.len())
1003 .step_by(2)
1004 .map(|i| {
1005 u8::from_str_radix(&hex[i..i + 2], 16)
1006 .with_context(|| format!("invalid hex byte at offset {i}: {field:?}"))
1007 })
1008 .collect::<Result<Vec<u8>>>()?;
1009 Ok(CqlValue::Blob(bytes))
1010 }
1011 "varint" => {
1012 let n = field
1013 .parse::<num_bigint::BigInt>()
1014 .with_context(|| format!("invalid varint: {field:?}"))?;
1015 Ok(CqlValue::Varint(n))
1016 }
1017 "decimal" => {
1018 let d = field
1019 .parse::<bigdecimal::BigDecimal>()
1020 .with_context(|| format!("invalid decimal: {field:?}"))?;
1021 Ok(CqlValue::Decimal(d))
1022 }
1023 _ => Ok(CqlValue::Text(field.to_string())),
1026 }
1027}
1028
1029fn strip_frozen(type_name: &str) -> &str {
1031 let lower = type_name.to_lowercase();
1032 if lower.starts_with("frozen<") && type_name.ends_with('>') {
1033 &type_name[7..type_name.len() - 1]
1034 } else {
1035 type_name
1036 }
1037}
1038
1039fn cql_value_to_insert_literal(v: &CqlValue) -> String {
1045 match v {
1046 CqlValue::Null | CqlValue::Unset => "null".to_string(),
1047 CqlValue::Text(s) | CqlValue::Ascii(s) => {
1048 format!("'{}'", s.replace('\'', "''"))
1049 }
1050 CqlValue::Boolean(b) => if *b { "true" } else { "false" }.to_string(),
1051 CqlValue::Int(n) => n.to_string(),
1052 CqlValue::BigInt(n) | CqlValue::Counter(n) => n.to_string(),
1053 CqlValue::SmallInt(n) => n.to_string(),
1054 CqlValue::TinyInt(n) => n.to_string(),
1055 CqlValue::Float(f) => {
1056 if f.is_nan() {
1057 "NaN".to_string()
1058 } else if f.is_infinite() {
1059 if f.is_sign_positive() {
1060 "Infinity".to_string()
1061 } else {
1062 "-Infinity".to_string()
1063 }
1064 } else {
1065 f.to_string()
1066 }
1067 }
1068 CqlValue::Double(d) => {
1069 if d.is_nan() {
1070 "NaN".to_string()
1071 } else if d.is_infinite() {
1072 if d.is_sign_positive() {
1073 "Infinity".to_string()
1074 } else {
1075 "-Infinity".to_string()
1076 }
1077 } else {
1078 d.to_string()
1079 }
1080 }
1081 CqlValue::Varint(n) => n.to_string(),
1082 CqlValue::Decimal(d) => d.to_string(),
1083 CqlValue::Uuid(u) | CqlValue::TimeUuid(u) => u.to_string(),
1084 CqlValue::Timestamp(ms) => {
1085 match DateTime::from_timestamp_millis(*ms) {
1087 Some(dt) => {
1088 let utc: DateTime<Utc> = dt;
1089 format!("'{}'", utc.format("%Y-%m-%d %H:%M:%S%.6f+0000"))
1090 }
1091 None => format!("{ms}"),
1092 }
1093 }
1094 CqlValue::Date(d) => format!("'{d}'"),
1095 CqlValue::Time(t) => format!("'{t}'"),
1096 CqlValue::Inet(addr) => format!("'{addr}'"),
1097 CqlValue::Blob(bytes) => {
1098 let mut s = String::with_capacity(2 + bytes.len() * 2);
1099 s.push_str("0x");
1100 for b in bytes {
1101 s.push_str(&format!("{b:02x}"));
1102 }
1103 s
1104 }
1105 CqlValue::Duration {
1106 months,
1107 days,
1108 nanoseconds,
1109 } => {
1110 format!("{months}mo{days}d{nanoseconds}ns")
1111 }
1112 CqlValue::List(_)
1114 | CqlValue::Set(_)
1115 | CqlValue::Map(_)
1116 | CqlValue::Tuple(_)
1117 | CqlValue::UserDefinedType { .. } => v.to_string(),
1118 }
1119}
1120
1121struct TokenBucket {
1127 rate: f64,
1128 tokens: f64,
1129 last: Instant,
1130}
1131
1132impl TokenBucket {
1133 fn new(rows_per_second: usize) -> Self {
1134 Self {
1135 rate: rows_per_second as f64,
1136 tokens: rows_per_second as f64,
1137 last: Instant::now(),
1138 }
1139 }
1140
1141 async fn acquire(&mut self) {
1142 let now = Instant::now();
1143 let elapsed = now.duration_since(self.last).as_secs_f64();
1144 self.tokens = (self.tokens + elapsed * self.rate).min(self.rate);
1145 self.last = now;
1146
1147 if self.tokens < 1.0 {
1148 let wait_secs = (1.0 - self.tokens) / self.rate;
1149 tokio::time::sleep(Duration::from_secs_f64(wait_secs)).await;
1150 self.tokens = 0.0;
1151 } else {
1152 self.tokens -= 1.0;
1153 }
1154 }
1155}
1156
1157pub async fn execute_copy_from(
1159 session: &CqlSession,
1160 cmd: &CopyFromCommand,
1161 current_keyspace: Option<&str>,
1162) -> Result<()> {
1163 let start = Instant::now();
1164
1165 let table_spec = match (&cmd.keyspace, current_keyspace) {
1167 (Some(ks), _) => format!("{}.{}", ks, cmd.table),
1168 (None, Some(ks)) => format!("{}.{}", ks, cmd.table),
1169 (None, None) => cmd.table.clone(),
1170 };
1171
1172 let source_name = match &cmd.source {
1173 CopyTarget::File(path) => format!("'{}'", path.display()),
1174 CopyTarget::Stdin => "STDIN".to_string(),
1175 CopyTarget::Stdout => unreachable!("COPY FROM cannot use STDOUT"),
1176 };
1177
1178 let ttl_clause = match cmd.options.ttl {
1179 Some(ttl) => format!(" USING TTL {ttl}"),
1180 None => String::new(),
1181 };
1182
1183 let ks_for_schema = cmd
1185 .keyspace
1186 .as_deref()
1187 .or(current_keyspace)
1188 .context("no keyspace specified and no current keyspace set")?;
1189 let schema_query = format!(
1190 "SELECT column_name, kind, position, type FROM system_schema.columns \
1191 WHERE keyspace_name = '{}' AND table_name = '{}'",
1192 ks_for_schema, cmd.table
1193 );
1194 let schema_result = session.execute_query(&schema_query).await?;
1195
1196 let mut schema_cols: Vec<(String, String, i32, String)> = Vec::new();
1198 for row in &schema_result.rows {
1199 let name = match row.values.first() {
1200 Some(CqlValue::Text(n)) => n.clone(),
1201 _ => continue,
1202 };
1203 let kind = match row.values.get(1) {
1204 Some(CqlValue::Text(k)) => k.clone(),
1205 _ => "regular".to_string(),
1206 };
1207 let position = match row.values.get(2) {
1208 Some(CqlValue::Int(p)) => *p,
1209 _ => -1,
1210 };
1211 let type_name = match row.values.get(3) {
1212 Some(CqlValue::Text(t)) => t.clone(),
1213 _ => "text".to_string(),
1214 };
1215 schema_cols.push((name, kind, position, type_name));
1216 }
1217 if schema_cols.is_empty() {
1218 bail!(
1219 "could not determine columns for table '{}' — table may not exist",
1220 table_spec
1221 );
1222 }
1223 schema_cols.sort_by(|a, b| {
1224 let kind_order = |k: &str| -> i32 {
1225 match k {
1226 "partition_key" => 0,
1227 "clustering" => 1,
1228 "static" => 2,
1229 _ => 3,
1230 }
1231 };
1232 kind_order(&a.1)
1233 .cmp(&kind_order(&b.1))
1234 .then(a.2.cmp(&b.2))
1235 .then(a.0.cmp(&b.0))
1236 });
1237
1238 let type_map: std::collections::HashMap<String, String> = schema_cols
1240 .iter()
1241 .map(|(n, _, _, t)| (n.clone(), t.clone()))
1242 .collect();
1243
1244 let prelim_columns: Vec<(String, String)> = match &cmd.columns {
1246 Some(explicit) => explicit
1247 .iter()
1248 .map(|n| {
1249 let t = type_map
1250 .get(n)
1251 .cloned()
1252 .unwrap_or_else(|| "text".to_string());
1253 (n.clone(), t)
1254 })
1255 .collect(),
1256 None => schema_cols.into_iter().map(|(n, _, _, t)| (n, t)).collect(),
1257 };
1258
1259 let reader: Box<dyn std::io::Read> = match &cmd.source {
1261 CopyTarget::File(path) => {
1262 let file = std::fs::File::open(path)
1263 .with_context(|| format!("failed to open file: {}", path.display()))?;
1264 Box::new(std::io::BufReader::new(file))
1265 }
1266 CopyTarget::Stdin => Box::new(std::io::stdin().lock()),
1267 CopyTarget::Stdout => bail!("COPY FROM cannot read from STDOUT"),
1268 };
1269 let mut csv_reader = csv::ReaderBuilder::new()
1270 .delimiter(cmd.options.delimiter as u8)
1271 .quote(cmd.options.quote as u8)
1272 .escape(Some(cmd.options.escape as u8))
1273 .has_headers(cmd.options.header)
1274 .flexible(true)
1275 .from_reader(reader);
1276
1277 let columns: Vec<(String, String)> = if cmd.options.header && cmd.columns.is_none() {
1279 let headers = csv_reader
1280 .headers()
1281 .context("failed to read CSV header row")?;
1282 headers
1283 .iter()
1284 .map(|h| {
1285 let name = h.trim().to_string();
1286 let t = type_map
1287 .get(&name)
1288 .cloned()
1289 .unwrap_or_else(|| "text".to_string());
1290 (name, t)
1291 })
1292 .collect()
1293 } else {
1294 prelim_columns
1295 };
1296
1297 let col_list: String = columns
1298 .iter()
1299 .map(|(n, _)| n.as_str())
1300 .collect::<Vec<_>>()
1301 .join(", ");
1302 let col_type_names: Vec<String> = columns.iter().map(|(_, t)| t.clone()).collect();
1303
1304 let prepared_id = if cmd.options.prepared_statements {
1306 let placeholders = vec!["?"; columns.len()].join(", ");
1307 let insert_template =
1308 format!("INSERT INTO {table_spec} ({col_list}) VALUES ({placeholders}){ttl_clause}");
1309 Some(
1310 session
1311 .prepare(&insert_template)
1312 .await
1313 .with_context(|| format!("failed to prepare: {insert_template}"))?,
1314 )
1315 } else {
1316 None
1317 };
1318
1319 let mut err_writer: Option<std::io::BufWriter<std::fs::File>> = match &cmd.options.err_file {
1321 Some(path) => {
1322 let file = std::fs::File::create(path)
1323 .with_context(|| format!("failed to create error file: {}", path.display()))?;
1324 Some(std::io::BufWriter::new(file))
1325 }
1326 None => None,
1327 };
1328
1329 let mut row_count: usize = 0;
1330 let mut parse_errors: usize = 0;
1331 let mut insert_errors: usize = 0;
1332 let mut rate_limiter = cmd.options.ingest_rate.map(TokenBucket::new);
1333 let num_processes = cmd.options.num_processes.max(1);
1334 let chunk_size = cmd.options.chunk_size.max(1);
1335
1336 let max_attempts = cmd.options.max_attempts;
1337 let max_parse_errors = cmd.options.max_parse_errors;
1338 let max_insert_errors = cmd.options.max_insert_errors;
1339 let report_frequency = cmd.options.report_frequency;
1340 let null_val = &cmd.options.null_val;
1341
1342 let mut csv_records = csv_reader.records();
1343
1344 'outer: loop {
1345 let mut chunk: Vec<Vec<CqlValue>> = Vec::with_capacity(chunk_size);
1347
1348 'fill: loop {
1349 if chunk.len() >= chunk_size {
1350 break 'fill;
1351 }
1352 let record = match csv_records.next() {
1353 None => break 'fill,
1354 Some(Err(e)) => {
1355 parse_errors += 1;
1356 let msg = format!("CSV parse error on row {}: {e}", row_count + parse_errors);
1357 eprintln!("{msg}");
1358 if let Some(ref mut w) = err_writer {
1359 let _ = writeln!(w, "{msg}");
1360 }
1361 if let Some(max) = max_parse_errors {
1362 if parse_errors > max {
1363 bail!("Exceeded maximum parse errors ({max}). Aborting.");
1364 }
1365 }
1366 continue 'fill;
1367 }
1368 Some(Ok(r)) => r,
1369 };
1370
1371 if record.len() != col_type_names.len() {
1372 parse_errors += 1;
1373 let msg = format!(
1374 "Row {}: expected {} columns but got {}",
1375 row_count + parse_errors,
1376 col_type_names.len(),
1377 record.len()
1378 );
1379 eprintln!("{msg}");
1380 if let Some(ref mut w) = err_writer {
1381 let _ = writeln!(w, "{msg}");
1382 }
1383 if let Some(max) = max_parse_errors {
1384 if parse_errors > max {
1385 bail!("Exceeded maximum number of parse errors ({max}). Aborting import.");
1386 }
1387 }
1388 continue 'fill;
1389 }
1390
1391 let mut row_values: Vec<CqlValue> = Vec::with_capacity(col_type_names.len());
1392 let mut row_ok = true;
1393 for (field, type_name) in record.iter().zip(col_type_names.iter()) {
1394 match csv_str_to_cql_value(field, type_name, null_val) {
1395 Ok(v) => row_values.push(v),
1396 Err(e) => {
1397 parse_errors += 1;
1398 let msg = format!(
1399 "Row {}: type error for '{}': {e}",
1400 row_count + parse_errors,
1401 type_name
1402 );
1403 eprintln!("{msg}");
1404 if let Some(ref mut w) = err_writer {
1405 let _ = writeln!(w, "{msg}");
1406 }
1407 if let Some(max) = max_parse_errors {
1408 if parse_errors > max {
1409 bail!("Exceeded maximum parse errors ({max}). Aborting.");
1410 }
1411 }
1412 row_ok = false;
1413 break;
1414 }
1415 }
1416 }
1417 if row_ok {
1418 chunk.push(row_values);
1419 }
1420 }
1421
1422 if chunk.is_empty() {
1423 break 'outer;
1424 }
1425
1426 if let Some(ref mut bucket) = rate_limiter {
1428 for _ in 0..chunk.len() {
1429 bucket.acquire().await;
1430 }
1431 }
1432
1433 let insert_results: Vec<Result<()>> = futures::stream::iter(chunk)
1435 .map(|values| {
1436 let ts = table_spec.as_str();
1437 let cl = col_list.as_str();
1438 let ttl = ttl_clause.as_str();
1439 let pid = prepared_id.as_ref();
1440 async move {
1441 insert_row_with_retry(session, pid, ts, cl, ttl, &values, max_attempts).await
1442 }
1443 })
1444 .buffer_unordered(num_processes)
1445 .collect()
1446 .await;
1447
1448 for result in insert_results {
1449 match result {
1450 Ok(()) => row_count += 1,
1451 Err(e) => {
1452 insert_errors += 1;
1453 let msg = format!("Insert error on row {}: {e}", row_count + insert_errors);
1454 eprintln!("{msg}");
1455 if let Some(ref mut w) = err_writer {
1456 let _ = writeln!(w, "{msg}");
1457 }
1458 if let Some(max) = max_insert_errors {
1459 if insert_errors > max {
1460 bail!("Exceeded maximum number of insert errors ({max}). Aborting import.");
1461 }
1462 }
1463 }
1464 }
1465 }
1466
1467 if let Some(freq) = report_frequency {
1469 let total = row_count + insert_errors + parse_errors;
1470 if freq > 0 && total > 0 && total.is_multiple_of(freq) {
1471 eprintln!("Processed {} rows...", row_count);
1472 }
1473 }
1474 }
1475
1476 if let Some(ref mut w) = err_writer {
1477 w.flush()?;
1478 }
1479
1480 let elapsed = start.elapsed().as_secs_f64();
1481 println!("{row_count} rows imported from {source_name} in {elapsed:.3}s.");
1482 if parse_errors > 0 {
1483 eprintln!("{parse_errors} parse error(s) encountered.");
1484 }
1485 if insert_errors > 0 {
1486 eprintln!("{insert_errors} insert error(s) encountered.");
1487 }
1488
1489 Ok(())
1490}
1491
1492async fn insert_row_with_retry(
1497 session: &CqlSession,
1498 prepared_id: Option<&PreparedId>,
1499 table_spec: &str,
1500 col_list: &str,
1501 ttl_clause: &str,
1502 values: &[CqlValue],
1503 max_attempts: usize,
1504) -> Result<()> {
1505 let max = max_attempts.max(1);
1506 let mut last_err = anyhow::anyhow!("no attempts made");
1507
1508 for attempt in 1..=max {
1509 let result = if let Some(id) = prepared_id {
1510 session.execute_prepared(id, values).await
1511 } else {
1512 let literals: Vec<String> = values.iter().map(cql_value_to_insert_literal).collect();
1513 let insert = format!(
1514 "INSERT INTO {} ({}) VALUES ({}){};",
1515 table_spec,
1516 col_list,
1517 literals.join(", "),
1518 ttl_clause
1519 );
1520 session.execute_query(&insert).await
1521 };
1522
1523 match result {
1524 Ok(_) => return Ok(()),
1525 Err(e) => {
1526 last_err = e;
1527 if attempt < max {
1528 let wait_ms = (100u64 * (1u64 << (attempt - 1).min(4))).min(2000);
1530 tokio::time::sleep(Duration::from_millis(wait_ms)).await;
1531 }
1532 }
1533 }
1534 }
1535
1536 Err(last_err)
1537}
1538
1539#[cfg(test)]
1540mod tests {
1541 use super::*;
1542
1543 #[test]
1544 fn parse_copy_to_basic() {
1545 let cmd = parse_copy_to("COPY ks.table TO '/tmp/out.csv'").unwrap();
1546 assert_eq!(cmd.keyspace, Some("ks".to_string()));
1547 assert_eq!(cmd.table, "table");
1548 assert_eq!(cmd.columns, None);
1549 assert_eq!(
1550 cmd.filename,
1551 CopyTarget::File(PathBuf::from("/tmp/out.csv"))
1552 );
1553 }
1554
1555 #[test]
1556 fn parse_copy_to_with_columns() {
1557 let cmd = parse_copy_to("COPY ks.table (col1, col2) TO '/tmp/out.csv'").unwrap();
1558 assert_eq!(cmd.keyspace, Some("ks".to_string()));
1559 assert_eq!(cmd.table, "table");
1560 assert_eq!(
1561 cmd.columns,
1562 Some(vec!["col1".to_string(), "col2".to_string()])
1563 );
1564 assert_eq!(
1565 cmd.filename,
1566 CopyTarget::File(PathBuf::from("/tmp/out.csv"))
1567 );
1568 }
1569
1570 #[test]
1571 fn parse_copy_to_stdout() {
1572 let cmd = parse_copy_to("COPY ks.table TO STDOUT").unwrap();
1573 assert_eq!(cmd.filename, CopyTarget::Stdout);
1574 }
1575
1576 #[test]
1577 fn parse_copy_to_with_options() {
1578 let cmd =
1579 parse_copy_to("COPY ks.table TO '/tmp/out.csv' WITH DELIMITER='|' AND HEADER=true")
1580 .unwrap();
1581 assert_eq!(cmd.options.delimiter, '|');
1582 assert!(cmd.options.header);
1583 }
1584
1585 #[test]
1586 fn format_value_null() {
1587 let opts = CopyOptions::default();
1588 assert_eq!(format_value_for_csv(&CqlValue::Null, &opts), "");
1589 }
1590
1591 #[test]
1592 fn format_value_text() {
1593 let opts = CopyOptions::default();
1594 assert_eq!(
1595 format_value_for_csv(&CqlValue::Text("hello".to_string()), &opts),
1596 "hello"
1597 );
1598 }
1599
1600 #[test]
1601 fn format_value_boolean() {
1602 let opts = CopyOptions::default();
1603 assert_eq!(
1604 format_value_for_csv(&CqlValue::Boolean(true), &opts),
1605 "True"
1606 );
1607 assert_eq!(
1608 format_value_for_csv(&CqlValue::Boolean(false), &opts),
1609 "False"
1610 );
1611 }
1612
1613 #[test]
1614 fn format_value_float_precision() {
1615 let opts = CopyOptions {
1616 float_precision: 3,
1617 ..Default::default()
1618 };
1619 assert_eq!(
1620 format_value_for_csv(&CqlValue::Float(1.23456), &opts),
1621 "1.235"
1622 );
1623 }
1624
1625 #[test]
1626 fn default_options() {
1627 let opts = CopyOptions::default();
1628 assert_eq!(opts.delimiter, ',');
1629 assert_eq!(opts.quote, '"');
1630 assert_eq!(opts.escape, '\\');
1631 assert!(!opts.header);
1632 assert_eq!(opts.null_val, "");
1633 assert_eq!(opts.datetime_format, None);
1634 assert_eq!(opts.encoding, "utf-8");
1635 assert_eq!(opts.float_precision, 5);
1636 assert_eq!(opts.double_precision, 12);
1637 assert_eq!(opts.decimal_sep, '.');
1638 assert_eq!(opts.thousands_sep, None);
1639 assert_eq!(opts.bool_style, ("True".to_string(), "False".to_string()));
1640 assert_eq!(opts.page_size, 1000);
1641 assert_eq!(opts.max_output_size, None);
1642 assert_eq!(opts.report_frequency, None);
1643 }
1644
1645 #[test]
1650 fn parse_copy_from_basic() {
1651 let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv'").unwrap();
1652 assert_eq!(cmd.keyspace, Some("ks".to_string()));
1653 assert_eq!(cmd.table, "table");
1654 assert_eq!(cmd.columns, None);
1655 assert_eq!(cmd.source, CopyTarget::File(PathBuf::from("/tmp/in.csv")));
1656 }
1657
1658 #[test]
1659 fn parse_copy_from_with_columns() {
1660 let cmd = parse_copy_from("COPY ks.table (col1, col2) FROM '/tmp/in.csv'").unwrap();
1661 assert_eq!(cmd.keyspace, Some("ks".to_string()));
1662 assert_eq!(cmd.table, "table");
1663 assert_eq!(
1664 cmd.columns,
1665 Some(vec!["col1".to_string(), "col2".to_string()])
1666 );
1667 }
1668
1669 #[test]
1670 fn parse_copy_from_stdin() {
1671 let cmd = parse_copy_from("COPY ks.table FROM STDIN").unwrap();
1672 assert_eq!(cmd.source, CopyTarget::Stdin);
1673 }
1674
1675 #[test]
1676 fn parse_copy_from_stdin_case_insensitive() {
1677 let cmd = parse_copy_from("COPY ks.table FROM stdin").unwrap();
1678 assert_eq!(cmd.source, CopyTarget::Stdin);
1679 }
1680
1681 #[test]
1682 fn parse_copy_from_no_keyspace() {
1683 let cmd = parse_copy_from("COPY mytable FROM '/data/file.csv'").unwrap();
1684 assert_eq!(cmd.keyspace, None);
1685 assert_eq!(cmd.table, "mytable");
1686 }
1687
1688 #[test]
1689 fn parse_copy_from_with_options() {
1690 let cmd = parse_copy_from(
1691 "COPY ks.table FROM '/tmp/in.csv' WITH TTL=3600 AND HEADER=true AND CHUNKSIZE=1000 AND DELIMITER='|'",
1692 )
1693 .unwrap();
1694 assert_eq!(cmd.options.ttl, Some(3600));
1695 assert!(cmd.options.header);
1696 assert_eq!(cmd.options.chunk_size, 1000);
1697 assert_eq!(cmd.options.delimiter, '|');
1698 }
1699
1700 #[test]
1701 fn parse_copy_from_with_error_options() {
1702 let cmd = parse_copy_from(
1703 "COPY ks.table FROM '/tmp/in.csv' WITH MAXPARSEERRORS=100 AND MAXINSERTERRORS=50 AND ERRFILE='/tmp/err.log'",
1704 )
1705 .unwrap();
1706 assert_eq!(cmd.options.max_parse_errors, Some(100));
1707 assert_eq!(cmd.options.max_insert_errors, Some(50));
1708 assert_eq!(cmd.options.err_file, Some(PathBuf::from("/tmp/err.log")));
1709 }
1710
1711 #[test]
1712 fn parse_copy_from_with_batch_options() {
1713 let cmd = parse_copy_from(
1714 "COPY ks.table FROM '/tmp/in.csv' WITH MAXBATCHSIZE=50 AND MINBATCHSIZE=5 AND MAXATTEMPTS=10",
1715 )
1716 .unwrap();
1717 assert_eq!(cmd.options.max_batch_size, 50);
1718 assert_eq!(cmd.options.min_batch_size, 5);
1719 assert_eq!(cmd.options.max_attempts, 10);
1720 }
1721
1722 #[test]
1723 fn parse_copy_from_semicolon() {
1724 let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv';").unwrap();
1725 assert_eq!(cmd.source, CopyTarget::File(PathBuf::from("/tmp/in.csv")));
1726 }
1727
1728 #[test]
1729 fn default_copy_from_options() {
1730 let opts = CopyFromOptions::default();
1731 assert_eq!(opts.delimiter, ',');
1732 assert_eq!(opts.quote, '"');
1733 assert_eq!(opts.escape, '\\');
1734 assert!(!opts.header);
1735 assert_eq!(opts.null_val, "");
1736 assert_eq!(opts.datetime_format, None);
1737 assert_eq!(opts.encoding, "utf-8");
1738 assert_eq!(opts.chunk_size, 5000);
1739 assert_eq!(opts.max_batch_size, 20);
1740 assert_eq!(opts.min_batch_size, 2);
1741 assert!(opts.prepared_statements);
1742 assert_eq!(opts.ttl, None);
1743 assert_eq!(opts.max_attempts, 5);
1744 assert_eq!(opts.max_parse_errors, None);
1745 assert_eq!(opts.max_insert_errors, None);
1746 assert_eq!(opts.err_file, None);
1747 assert_eq!(opts.report_frequency, None);
1748 assert_eq!(opts.ingest_rate, None);
1749 assert_eq!(opts.num_processes, 1);
1750 }
1751
1752 #[test]
1757 fn csv_to_cql_text_types() {
1758 let v = csv_str_to_cql_value("hello", "text", "").unwrap();
1759 assert_eq!(v, CqlValue::Text("hello".to_string()));
1760
1761 let v = csv_str_to_cql_value("hi", "ascii", "").unwrap();
1762 assert_eq!(v, CqlValue::Ascii("hi".to_string()));
1763
1764 let v = csv_str_to_cql_value("world", "varchar", "").unwrap();
1765 assert_eq!(v, CqlValue::Text("world".to_string())); }
1767
1768 #[test]
1769 fn csv_to_cql_int_types() {
1770 assert_eq!(
1771 csv_str_to_cql_value("42", "int", "").unwrap(),
1772 CqlValue::Int(42)
1773 );
1774 assert_eq!(
1775 csv_str_to_cql_value("-100", "bigint", "").unwrap(),
1776 CqlValue::BigInt(-100)
1777 );
1778 assert_eq!(
1779 csv_str_to_cql_value("1000", "counter", "").unwrap(),
1780 CqlValue::BigInt(1000)
1781 );
1782 assert_eq!(
1783 csv_str_to_cql_value("32767", "smallint", "").unwrap(),
1784 CqlValue::SmallInt(32767)
1785 );
1786 assert_eq!(
1787 csv_str_to_cql_value("127", "tinyint", "").unwrap(),
1788 CqlValue::TinyInt(127)
1789 );
1790 }
1791
1792 #[test]
1793 fn csv_to_cql_float_types() {
1794 match csv_str_to_cql_value("1.5", "float", "").unwrap() {
1795 CqlValue::Float(f) => assert!((f - 1.5f32).abs() < 1e-5),
1796 other => panic!("expected Float, got {other:?}"),
1797 }
1798 match csv_str_to_cql_value("1.5", "double", "").unwrap() {
1799 CqlValue::Double(d) => assert!((d - 1.5f64).abs() < 1e-9),
1800 other => panic!("expected Double, got {other:?}"),
1801 }
1802 assert!(matches!(
1804 csv_str_to_cql_value("1e10", "double", "").unwrap(),
1805 CqlValue::Double(_)
1806 ));
1807 }
1808
1809 #[test]
1810 fn csv_to_cql_boolean() {
1811 for t in &["true", "True", "TRUE", "yes", "YES", "on", "ON", "1"] {
1812 assert_eq!(
1813 csv_str_to_cql_value(t, "boolean", "").unwrap(),
1814 CqlValue::Boolean(true),
1815 "expected true for {t:?}"
1816 );
1817 }
1818 for f in &["false", "False", "FALSE", "no", "NO", "off", "OFF", "0"] {
1819 assert_eq!(
1820 csv_str_to_cql_value(f, "boolean", "").unwrap(),
1821 CqlValue::Boolean(false),
1822 "expected false for {f:?}"
1823 );
1824 }
1825 }
1826
1827 #[test]
1828 fn csv_to_cql_uuid() {
1829 let uuid_str = "550e8400-e29b-41d4-a716-446655440000";
1830 assert!(matches!(
1831 csv_str_to_cql_value(uuid_str, "uuid", "").unwrap(),
1832 CqlValue::Uuid(_)
1833 ));
1834 assert!(matches!(
1835 csv_str_to_cql_value(uuid_str, "timeuuid", "").unwrap(),
1836 CqlValue::TimeUuid(_)
1837 ));
1838 assert!(csv_str_to_cql_value("not-a-uuid", "uuid", "").is_err());
1840 }
1841
1842 #[test]
1843 fn csv_to_cql_timestamp() {
1844 let v = csv_str_to_cql_value("2024-01-15T12:34:56Z", "timestamp", "").unwrap();
1846 assert!(matches!(v, CqlValue::Timestamp(_)));
1847
1848 let v = csv_str_to_cql_value("1705318496000", "timestamp", "").unwrap();
1850 assert_eq!(v, CqlValue::Timestamp(1705318496000));
1851 }
1852
1853 #[test]
1854 fn csv_to_cql_date() {
1855 use chrono::NaiveDate;
1856 let v = csv_str_to_cql_value("2024-01-15", "date", "").unwrap();
1857 assert_eq!(
1858 v,
1859 CqlValue::Date(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap())
1860 );
1861
1862 assert!(csv_str_to_cql_value("not-a-date", "date", "").is_err());
1863 }
1864
1865 #[test]
1866 fn csv_to_cql_time() {
1867 let v = csv_str_to_cql_value("12:34:56", "time", "").unwrap();
1868 assert!(matches!(v, CqlValue::Time(_)));
1869
1870 let v = csv_str_to_cql_value("12:34:56.789", "time", "").unwrap();
1871 assert!(matches!(v, CqlValue::Time(_)));
1872
1873 assert!(csv_str_to_cql_value("not-a-time", "time", "").is_err());
1874 }
1875
1876 #[test]
1877 fn csv_to_cql_inet() {
1878 let v = csv_str_to_cql_value("127.0.0.1", "inet", "").unwrap();
1879 assert!(matches!(v, CqlValue::Inet(_)));
1880
1881 let v = csv_str_to_cql_value("::1", "inet", "").unwrap();
1882 assert!(matches!(v, CqlValue::Inet(_)));
1883
1884 assert!(csv_str_to_cql_value("not.an.ip", "inet", "").is_err());
1885 }
1886
1887 #[test]
1888 fn csv_to_cql_blob() {
1889 let v = csv_str_to_cql_value("0xdeadbeef", "blob", "").unwrap();
1890 assert_eq!(v, CqlValue::Blob(vec![0xde, 0xad, 0xbe, 0xef]));
1891
1892 let v = csv_str_to_cql_value("deadbeef", "blob", "").unwrap();
1894 assert_eq!(v, CqlValue::Blob(vec![0xde, 0xad, 0xbe, 0xef]));
1895
1896 assert!(csv_str_to_cql_value("0xgg", "blob", "").is_err());
1898 assert!(csv_str_to_cql_value("0xabc", "blob", "").is_err());
1900 }
1901
1902 #[test]
1903 fn csv_to_cql_null_handling() {
1904 assert_eq!(csv_str_to_cql_value("", "int", "").unwrap(), CqlValue::Null);
1906 assert_eq!(
1907 csv_str_to_cql_value("", "text", "").unwrap(),
1908 CqlValue::Null
1909 );
1910 }
1911
1912 #[test]
1913 fn csv_to_cql_null_custom() {
1914 assert_eq!(
1916 csv_str_to_cql_value("NULL", "int", "NULL").unwrap(),
1917 CqlValue::Null
1918 );
1919 assert_eq!(
1920 csv_str_to_cql_value("N/A", "text", "N/A").unwrap(),
1921 CqlValue::Null
1922 );
1923 assert!(matches!(
1925 csv_str_to_cql_value("42", "int", "NULL").unwrap(),
1926 CqlValue::Int(42)
1927 ));
1928 }
1929
1930 #[test]
1931 fn csv_to_cql_unknown_type_fallback() {
1932 let v = csv_str_to_cql_value("hello", "customtype", "").unwrap();
1934 assert_eq!(v, CqlValue::Text("hello".to_string()));
1935
1936 let v = csv_str_to_cql_value("[1, 2, 3]", "list<int>", "").unwrap();
1938 assert_eq!(v, CqlValue::Text("[1, 2, 3]".to_string()));
1939 }
1940
1941 #[test]
1942 fn csv_to_cql_parse_error_int() {
1943 assert!(csv_str_to_cql_value("notanint", "int", "").is_err());
1945 assert!(csv_str_to_cql_value("3.14", "int", "").is_err());
1946 assert!(csv_str_to_cql_value("notanint", "bigint", "").is_err());
1947 }
1948
1949 #[test]
1950 fn csv_to_cql_varint_and_decimal() {
1951 let v = csv_str_to_cql_value("123456789012345678901234567890", "varint", "").unwrap();
1952 assert!(matches!(v, CqlValue::Varint(_)));
1953
1954 let v = csv_str_to_cql_value("3.141592653589793", "decimal", "").unwrap();
1955 assert!(matches!(v, CqlValue::Decimal(_)));
1956 }
1957
1958 #[test]
1959 fn csv_to_cql_frozen_stripped() {
1960 let v = csv_str_to_cql_value("{uuid1, uuid2}", "frozen<set<uuid>>", "").unwrap();
1962 assert!(matches!(v, CqlValue::Text(_)));
1963 }
1964
1965 #[test]
1966 fn parse_copy_from_numprocesses() {
1967 let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv' WITH NUMPROCESSES=4").unwrap();
1968 assert_eq!(cmd.options.num_processes, 4);
1969 }
1970}