1use std::io::Write;
8use std::net::IpAddr;
9use std::path::PathBuf;
10use std::time::{Duration, Instant};
11
12use anyhow::{bail, Context, Result};
13use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
14use futures::StreamExt;
15
16use crate::driver::types::CqlValue;
17use crate::driver::PreparedId;
18use crate::session::CqlSession;
19
20#[derive(Debug, Clone, PartialEq)]
22pub enum CopyTarget {
23 File(PathBuf),
25 Stdout,
27 Stdin,
29}
30
31#[derive(Debug, Clone)]
33pub struct CopyOptions {
34 pub delimiter: char,
35 pub quote: char,
36 pub escape: char,
37 pub header: bool,
38 pub null_val: String,
39 pub datetime_format: Option<String>,
40 pub encoding: String,
41 pub float_precision: usize,
42 pub double_precision: usize,
43 pub decimal_sep: char,
44 pub thousands_sep: Option<char>,
45 pub bool_style: (String, String),
46 pub page_size: usize,
47 pub max_output_size: Option<usize>,
48 pub report_frequency: Option<usize>,
49}
50
51impl Default for CopyOptions {
52 fn default() -> Self {
53 Self {
54 delimiter: ',',
55 quote: '"',
56 escape: '\\',
57 header: false,
58 null_val: String::new(),
59 datetime_format: None,
60 encoding: "utf-8".to_string(),
61 float_precision: 5,
62 double_precision: 12,
63 decimal_sep: '.',
64 thousands_sep: None,
65 bool_style: ("True".to_string(), "False".to_string()),
66 page_size: 1000,
67 max_output_size: None,
68 report_frequency: None,
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
75pub struct CopyToCommand {
76 pub keyspace: Option<String>,
77 pub table: String,
78 pub columns: Option<Vec<String>>,
79 pub filename: CopyTarget,
80 pub options: CopyOptions,
81}
82
83pub fn parse_copy_to(input: &str) -> Result<CopyToCommand> {
87 let trimmed = input.trim().trim_end_matches(';').trim();
88
89 let upper = trimmed.to_uppercase();
91 if !upper.starts_with("COPY ") {
92 bail!("not a COPY statement");
93 }
94
95 let to_pos =
97 find_keyword_outside_parens(trimmed, "TO").context("COPY statement missing TO keyword")?;
98
99 let before_to = trimmed[4..to_pos].trim(); let after_to = trimmed[to_pos + 2..].trim(); let (keyspace, table, columns) = parse_table_spec(before_to)?;
104
105 let (filename, options_str) = parse_target_and_options(after_to)?;
107
108 let options = if let Some(opts) = options_str {
109 parse_options(&opts)?
110 } else {
111 CopyOptions::default()
112 };
113
114 Ok(CopyToCommand {
115 keyspace,
116 table,
117 columns,
118 filename,
119 options,
120 })
121}
122
123pub async fn execute_copy_to(
125 session: &CqlSession,
126 cmd: &CopyToCommand,
127 current_keyspace: Option<&str>,
128) -> Result<()> {
129 let col_spec = match &cmd.columns {
131 Some(cols) => cols.join(", "),
132 None => "*".to_string(),
133 };
134
135 let table_spec = match (&cmd.keyspace, current_keyspace) {
136 (Some(ks), _) => format!("{}.{}", ks, cmd.table),
137 (None, Some(ks)) => format!("{}.{}", ks, cmd.table),
138 (None, None) => cmd.table.clone(),
139 };
140
141 let query = format!("SELECT {} FROM {}", col_spec, table_spec);
142
143 let result = session.execute_query(&query).await?;
144
145 let mut row_count: usize = 0;
147
148 match &cmd.filename {
149 CopyTarget::File(path) => {
150 let file = std::fs::File::create(path)
151 .with_context(|| format!("failed to create file: {}", path.display()))?;
152 let buf = std::io::BufWriter::new(file);
153 let mut wtr = build_csv_writer(&cmd.options, buf);
154
155 if cmd.options.header {
156 let headers: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
157 wtr.write_record(&headers)?;
158 }
159
160 for row in &result.rows {
161 if let Some(max) = cmd.options.max_output_size {
162 if row_count >= max {
163 break;
164 }
165 }
166 let fields: Vec<String> = row
167 .values
168 .iter()
169 .map(|v| format_value_for_csv(v, &cmd.options))
170 .collect();
171 wtr.write_record(&fields)?;
172 row_count += 1;
173
174 if let Some(freq) = cmd.options.report_frequency {
175 if freq > 0 && row_count.is_multiple_of(freq) {
176 eprintln!("Processed {} rows...", row_count);
177 }
178 }
179 }
180
181 wtr.flush()?;
182 println!("{} rows exported to '{}'.", row_count, path.display());
183 }
184 CopyTarget::Stdout => {
185 let stdout = std::io::stdout();
186 let handle = stdout.lock();
187 let mut wtr = build_csv_writer(&cmd.options, handle);
188
189 if cmd.options.header {
190 let headers: Vec<String> = result.columns.iter().map(|c| c.name.clone()).collect();
191 wtr.write_record(&headers)?;
192 }
193
194 for row in &result.rows {
195 if let Some(max) = cmd.options.max_output_size {
196 if row_count >= max {
197 break;
198 }
199 }
200 let fields: Vec<String> = row
201 .values
202 .iter()
203 .map(|v| format_value_for_csv(v, &cmd.options))
204 .collect();
205 wtr.write_record(&fields)?;
206 row_count += 1;
207
208 if let Some(freq) = cmd.options.report_frequency {
209 if freq > 0 && row_count.is_multiple_of(freq) {
210 eprintln!("Processed {} rows...", row_count);
211 }
212 }
213 }
214
215 wtr.flush()?;
216 eprintln!("{} rows exported to STDOUT.", row_count);
217 }
218 CopyTarget::Stdin => {
219 bail!("COPY TO cannot write to STDIN");
220 }
221 }
222
223 Ok(())
224}
225
226pub fn format_value_for_csv(value: &CqlValue, options: &CopyOptions) -> String {
228 match value {
229 CqlValue::Null | CqlValue::Unset => options.null_val.clone(),
230 CqlValue::Text(s) | CqlValue::Ascii(s) => s.clone(),
231 CqlValue::Boolean(b) => {
232 if *b {
233 options.bool_style.0.clone()
234 } else {
235 options.bool_style.1.clone()
236 }
237 }
238 CqlValue::Int(v) => v.to_string(),
239 CqlValue::BigInt(v) => v.to_string(),
240 CqlValue::SmallInt(v) => v.to_string(),
241 CqlValue::TinyInt(v) => v.to_string(),
242 CqlValue::Counter(v) => v.to_string(),
243 CqlValue::Varint(v) => v.to_string(),
244 CqlValue::Float(v) => format_float(*v as f64, options.float_precision, options),
245 CqlValue::Double(v) => format_float(*v, options.double_precision, options),
246 CqlValue::Decimal(v) => {
247 let s = v.to_string();
248 if options.decimal_sep != '.' {
249 s.replace('.', &options.decimal_sep.to_string())
250 } else {
251 s
252 }
253 }
254 CqlValue::Timestamp(millis) => format_timestamp(*millis, options),
255 CqlValue::Uuid(u) | CqlValue::TimeUuid(u) => u.to_string(),
256 CqlValue::Blob(bytes) => {
257 let mut s = String::with_capacity(2 + bytes.len() * 2);
258 s.push_str("0x");
259 for b in bytes {
260 s.push_str(&format!("{b:02x}"));
261 }
262 s
263 }
264 CqlValue::Date(d) => d.to_string(),
265 CqlValue::Time(t) => t.to_string(),
266 CqlValue::Duration {
267 months,
268 days,
269 nanoseconds,
270 } => format!("{months}mo{days}d{nanoseconds}ns"),
271 CqlValue::Inet(addr) => addr.to_string(),
272 CqlValue::List(_)
274 | CqlValue::Set(_)
275 | CqlValue::Map(_)
276 | CqlValue::Tuple(_)
277 | CqlValue::UserDefinedType { .. } => value.to_string(),
278 }
279}
280
281fn build_csv_writer<W: Write>(options: &CopyOptions, writer: W) -> csv::Writer<W> {
287 csv::WriterBuilder::new()
288 .delimiter(options.delimiter as u8)
289 .quote(options.quote as u8)
290 .escape(options.escape as u8)
291 .double_quote(false)
292 .from_writer(writer)
293}
294
295fn format_float(v: f64, precision: usize, options: &CopyOptions) -> String {
297 if v.is_nan() {
298 return "NaN".to_string();
299 }
300 if v.is_infinite() {
301 return if v.is_sign_positive() {
302 "Infinity".to_string()
303 } else {
304 "-Infinity".to_string()
305 };
306 }
307 let s = format!("{v:.prec$}", prec = precision);
308 if options.decimal_sep != '.' {
309 s.replace('.', &options.decimal_sep.to_string())
310 } else {
311 s
312 }
313}
314
315fn format_timestamp(millis: i64, options: &CopyOptions) -> String {
317 match DateTime::from_timestamp_millis(millis) {
318 Some(dt) => {
319 let utc: DateTime<Utc> = dt;
320 match &options.datetime_format {
321 Some(fmt) => utc.format(fmt).to_string(),
322 None => utc.format("%Y-%m-%d %H:%M:%S%.6f%z").to_string(),
323 }
324 }
325 None => format!("<invalid timestamp: {millis}>"),
326 }
327}
328
329fn find_keyword_outside_parens(s: &str, keyword: &str) -> Option<usize> {
332 let upper = s.to_uppercase();
333 let kw_upper = keyword.to_uppercase();
334 let kw_len = kw_upper.len();
335 let mut depth: i32 = 0;
336 let mut in_quote = false;
337 let mut quote_char: char = '\'';
338 let bytes = s.as_bytes();
339
340 for (i, ch) in s.char_indices() {
341 if in_quote {
342 if ch == quote_char {
343 in_quote = false;
344 }
345 continue;
346 }
347 match ch {
348 '\'' | '"' => {
349 in_quote = true;
350 quote_char = ch;
351 }
352 '(' => depth += 1,
353 ')' => depth -= 1,
354 _ => {}
355 }
356 if depth == 0 && !in_quote {
357 if i + kw_len <= upper.len() && upper[i..i + kw_len] == *kw_upper {
359 let before_ok =
361 i == 0 || !(bytes[i - 1].is_ascii_alphanumeric() || bytes[i - 1] == b'_');
362 let after_ok = i + kw_len >= s.len()
363 || !(bytes[i + kw_len].is_ascii_alphanumeric() || bytes[i + kw_len] == b'_');
364 if before_ok && after_ok {
365 return Some(i);
366 }
367 }
368 }
369 }
370 None
371}
372
373fn parse_table_spec(spec: &str) -> Result<(Option<String>, String, Option<Vec<String>>)> {
375 let spec = spec.trim();
376
377 let (table_part, columns) = if let Some(paren_start) = spec.find('(') {
379 let paren_end = spec
380 .rfind(')')
381 .context("unmatched parenthesis in column list")?;
382 let cols_str = &spec[paren_start + 1..paren_end];
383 let cols: Vec<String> = cols_str
384 .split(',')
385 .map(|c| c.trim().to_string())
386 .filter(|c| !c.is_empty())
387 .collect();
388 (spec[..paren_start].trim(), Some(cols))
389 } else {
390 (spec, None)
391 };
392
393 let (keyspace, table) = if let Some(dot_pos) = table_part.find('.') {
395 let ks = table_part[..dot_pos].trim().to_string();
396 let tbl = table_part[dot_pos + 1..].trim().to_string();
397 (Some(ks), tbl)
398 } else {
399 (None, table_part.trim().to_string())
400 };
401
402 if table.is_empty() {
403 bail!("missing table name in COPY statement");
404 }
405
406 Ok((keyspace, table, columns))
407}
408
409fn parse_target_and_options(after_to: &str) -> Result<(CopyTarget, Option<String>)> {
412 let after_to = after_to.trim();
413
414 let with_pos = find_keyword_outside_parens(after_to, "WITH");
416
417 let (target_str, options_str) = match with_pos {
418 Some(pos) => {
419 let target = after_to[..pos].trim();
420 let opts = after_to[pos + 4..].trim(); (target, Some(opts.to_string()))
422 }
423 None => (after_to, None),
424 };
425
426 let target_str = target_str.trim();
427
428 let target = if target_str.eq_ignore_ascii_case("STDOUT") {
429 CopyTarget::Stdout
430 } else {
431 let path_str = if (target_str.starts_with('\'') && target_str.ends_with('\''))
433 || (target_str.starts_with('"') && target_str.ends_with('"'))
434 {
435 &target_str[1..target_str.len() - 1]
436 } else {
437 target_str
438 };
439 CopyTarget::File(PathBuf::from(path_str))
440 };
441
442 Ok((target, options_str))
443}
444
445fn parse_options(options_str: &str) -> Result<CopyOptions> {
447 let mut opts = CopyOptions::default();
448
449 let parts = split_on_and(options_str);
451
452 for part in parts {
453 let part = part.trim();
454 if part.is_empty() {
455 continue;
456 }
457
458 let eq_pos = part
459 .find('=')
460 .with_context(|| format!("invalid option (missing '='): {part}"))?;
461 let key = part[..eq_pos].trim().to_uppercase();
462 let val = unquote(part[eq_pos + 1..].trim());
463
464 match key.as_str() {
465 "DELIMITER" => {
466 opts.delimiter = val
467 .chars()
468 .next()
469 .context("DELIMITER must be a single character")?;
470 }
471 "QUOTE" => {
472 opts.quote = val
473 .chars()
474 .next()
475 .context("QUOTE must be a single character")?;
476 }
477 "ESCAPE" => {
478 opts.escape = val
479 .chars()
480 .next()
481 .context("ESCAPE must be a single character")?;
482 }
483 "HEADER" => {
484 opts.header = parse_bool_option(&val)?;
485 }
486 "NULL" | "NULLVAL" => {
487 opts.null_val = val;
488 }
489 "DATETIMEFORMAT" => {
490 opts.datetime_format = if val.is_empty() { None } else { Some(val) };
491 }
492 "ENCODING" => {
493 opts.encoding = val;
494 }
495 "FLOATPRECISION" => {
496 opts.float_precision = val.parse().context("FLOATPRECISION must be an integer")?;
497 }
498 "DOUBLEPRECISION" => {
499 opts.double_precision =
500 val.parse().context("DOUBLEPRECISION must be an integer")?;
501 }
502 "DECIMALSEP" => {
503 opts.decimal_sep = val
504 .chars()
505 .next()
506 .context("DECIMALSEP must be a single character")?;
507 }
508 "THOUSANDSSEP" => {
509 opts.thousands_sep = val.chars().next();
510 }
511 "BOOLSTYLE" => {
512 let parts: Vec<&str> = val.splitn(2, ':').collect();
514 if parts.len() == 2 {
515 opts.bool_style = (parts[0].to_string(), parts[1].to_string());
516 } else {
517 bail!("BOOLSTYLE must be in format 'TrueVal:FalseVal'");
518 }
519 }
520 "PAGESIZE" => {
521 opts.page_size = val.parse().context("PAGESIZE must be an integer")?;
522 }
523 "MAXOUTPUTSIZE" => {
524 let n: usize = val.parse().context("MAXOUTPUTSIZE must be an integer")?;
525 opts.max_output_size = Some(n);
526 }
527 "REPORTFREQUENCY" => {
528 let n: usize = val.parse().context("REPORTFREQUENCY must be an integer")?;
529 opts.report_frequency = if n == 0 { None } else { Some(n) };
530 }
531 other => {
532 bail!("unknown COPY option: {other}");
533 }
534 }
535 }
536
537 Ok(opts)
538}
539
540fn split_on_and(s: &str) -> Vec<String> {
542 let mut parts = Vec::new();
543 let mut current = String::new();
544 let upper = s.to_uppercase();
545 let chars: Vec<char> = s.chars().collect();
546 let upper_chars: Vec<char> = upper.chars().collect();
547 let len = chars.len();
548 let mut i = 0;
549 let mut in_quote = false;
550 let mut quote_char = '\'';
551
552 while i < len {
553 if in_quote {
554 if chars[i] == quote_char {
555 in_quote = false;
556 }
557 current.push(chars[i]);
558 i += 1;
559 continue;
560 }
561
562 if chars[i] == '\'' || chars[i] == '"' {
563 in_quote = true;
564 quote_char = chars[i];
565 current.push(chars[i]);
566 i += 1;
567 continue;
568 }
569
570 if i + 5 <= len
572 && (i == 0 || chars[i].is_whitespace())
573 && upper_chars[i..].iter().collect::<String>().starts_with(
574 if chars[i].is_whitespace() {
575 " AND "
576 } else {
577 "AND "
578 },
579 )
580 {
581 let remaining: String = upper_chars[i..].iter().collect();
583 if remaining.starts_with(" AND ") {
584 parts.push(current.clone());
585 current.clear();
586 i += 5; continue;
588 }
589 }
590
591 current.push(chars[i]);
592 i += 1;
593 }
594
595 if !current.is_empty() {
596 parts.push(current);
597 }
598
599 parts
600}
601
602fn unquote(s: &str) -> String {
604 let s = s.trim();
605 if s.len() >= 2
606 && ((s.starts_with('\'') && s.ends_with('\'')) || (s.starts_with('"') && s.ends_with('"')))
607 {
608 return s[1..s.len() - 1].to_string();
609 }
610 s.to_string()
611}
612
613fn parse_bool_option(val: &str) -> Result<bool> {
615 match val.to_lowercase().as_str() {
616 "true" | "yes" | "1" => Ok(true),
617 "false" | "no" | "0" => Ok(false),
618 _ => bail!("invalid boolean value: {val}"),
619 }
620}
621
622#[derive(Debug, Clone)]
628pub struct CopyFromOptions {
629 pub delimiter: char,
631 pub quote: char,
632 pub escape: char,
633 pub header: bool,
634 pub null_val: String,
635 pub datetime_format: Option<String>,
636 pub encoding: String,
637 pub chunk_size: usize,
639 pub max_batch_size: usize,
640 pub min_batch_size: usize,
641 pub prepared_statements: bool,
642 pub ttl: Option<u64>,
643 pub max_attempts: usize,
644 pub max_parse_errors: Option<usize>,
645 pub max_insert_errors: Option<usize>,
646 pub err_file: Option<PathBuf>,
647 pub report_frequency: Option<usize>,
648 pub ingest_rate: Option<usize>,
649 pub num_processes: usize,
650}
651
652impl Default for CopyFromOptions {
653 fn default() -> Self {
654 Self {
655 delimiter: ',',
656 quote: '"',
657 escape: '\\',
658 header: false,
659 null_val: String::new(),
660 datetime_format: None,
661 encoding: "utf-8".to_string(),
662 chunk_size: 5000,
663 max_batch_size: 20,
664 min_batch_size: 2,
665 prepared_statements: true,
666 ttl: None,
667 max_attempts: 5,
668 max_parse_errors: None,
669 max_insert_errors: None,
670 err_file: None,
671 report_frequency: None,
672 ingest_rate: None,
673 num_processes: 1,
674 }
675 }
676}
677
678#[derive(Debug, Clone)]
680pub struct CopyFromCommand {
681 pub keyspace: Option<String>,
682 pub table: String,
683 pub columns: Option<Vec<String>>,
684 pub source: CopyTarget,
685 pub options: CopyFromOptions,
686}
687
688pub fn parse_copy_from(input: &str) -> Result<CopyFromCommand> {
692 let trimmed = input.trim().trim_end_matches(';').trim();
693
694 let upper = trimmed.to_uppercase();
695 if !upper.starts_with("COPY ") {
696 bail!("not a COPY statement");
697 }
698
699 let from_pos = find_keyword_outside_parens(trimmed, "FROM")
701 .context("COPY statement missing FROM keyword")?;
702
703 let before_from = trimmed[4..from_pos].trim(); let after_from = trimmed[from_pos + 4..].trim(); let (keyspace, table, columns) = parse_table_spec(before_from)?;
708
709 let (source, options_str) = parse_source_and_options(after_from)?;
711
712 let options = if let Some(opts) = options_str {
713 parse_copy_from_options(&opts)?
714 } else {
715 CopyFromOptions::default()
716 };
717
718 Ok(CopyFromCommand {
719 keyspace,
720 table,
721 columns,
722 source,
723 options,
724 })
725}
726
727fn parse_source_and_options(after_from: &str) -> Result<(CopyTarget, Option<String>)> {
730 let after_from = after_from.trim();
731
732 let with_pos = find_keyword_outside_parens(after_from, "WITH");
733
734 let (source_str, options_str) = match with_pos {
735 Some(pos) => {
736 let source = after_from[..pos].trim();
737 let opts = after_from[pos + 4..].trim();
738 (source, Some(opts.to_string()))
739 }
740 None => (after_from, None),
741 };
742
743 let source_str = source_str.trim();
744
745 let source = if source_str.eq_ignore_ascii_case("STDIN") {
746 CopyTarget::Stdin
747 } else {
748 let path_str = if (source_str.starts_with('\'') && source_str.ends_with('\''))
749 || (source_str.starts_with('"') && source_str.ends_with('"'))
750 {
751 &source_str[1..source_str.len() - 1]
752 } else {
753 source_str
754 };
755 CopyTarget::File(PathBuf::from(path_str))
756 };
757
758 Ok((source, options_str))
759}
760
761fn parse_copy_from_options(options_str: &str) -> Result<CopyFromOptions> {
763 let mut opts = CopyFromOptions::default();
764
765 let parts = split_on_and(options_str);
766
767 for part in parts {
768 let part = part.trim();
769 if part.is_empty() {
770 continue;
771 }
772
773 let eq_pos = part
774 .find('=')
775 .with_context(|| format!("invalid option (missing '='): {part}"))?;
776 let key = part[..eq_pos].trim().to_uppercase();
777 let val = unquote(part[eq_pos + 1..].trim());
778
779 match key.as_str() {
780 "DELIMITER" => {
781 opts.delimiter = val
782 .chars()
783 .next()
784 .context("DELIMITER must be a single character")?;
785 }
786 "QUOTE" => {
787 opts.quote = val
788 .chars()
789 .next()
790 .context("QUOTE must be a single character")?;
791 }
792 "ESCAPE" => {
793 opts.escape = val
794 .chars()
795 .next()
796 .context("ESCAPE must be a single character")?;
797 }
798 "HEADER" => {
799 opts.header = parse_bool_option(&val)?;
800 }
801 "NULL" | "NULLVAL" => {
802 opts.null_val = val;
803 }
804 "DATETIMEFORMAT" => {
805 opts.datetime_format = if val.is_empty() { None } else { Some(val) };
806 }
807 "ENCODING" => {
808 opts.encoding = val;
809 }
810 "CHUNKSIZE" => {
811 opts.chunk_size = val.parse().context("CHUNKSIZE must be an integer")?;
812 }
813 "MAXBATCHSIZE" => {
814 opts.max_batch_size = val.parse().context("MAXBATCHSIZE must be an integer")?;
815 }
816 "MINBATCHSIZE" => {
817 opts.min_batch_size = val.parse().context("MINBATCHSIZE must be an integer")?;
818 }
819 "PREPAREDSTATEMENTS" => {
820 opts.prepared_statements = parse_bool_option(&val)?;
821 }
822 "TTL" => {
823 let n: u64 = val.parse().context("TTL must be a positive integer")?;
824 opts.ttl = Some(n);
825 }
826 "MAXATTEMPTS" => {
827 opts.max_attempts = val.parse().context("MAXATTEMPTS must be an integer")?;
828 }
829 "MAXPARSEERRORS" => {
830 if val == "-1" {
831 opts.max_parse_errors = None;
832 } else {
833 let n: usize = val.parse().context("MAXPARSEERRORS must be an integer")?;
834 opts.max_parse_errors = Some(n);
835 }
836 }
837 "MAXINSERTERRORS" => {
838 if val == "-1" {
839 opts.max_insert_errors = None;
840 } else {
841 let n: usize = val.parse().context("MAXINSERTERRORS must be an integer")?;
842 opts.max_insert_errors = Some(n);
843 }
844 }
845 "ERRFILE" | "ERRORSFILE" => {
846 opts.err_file = if val.is_empty() {
847 None
848 } else {
849 Some(PathBuf::from(val))
850 };
851 }
852 "REPORTFREQUENCY" => {
853 let n: usize = val.parse().context("REPORTFREQUENCY must be an integer")?;
854 opts.report_frequency = if n == 0 { None } else { Some(n) };
855 }
856 "INGESTRATE" => {
857 if val == "-1" || val == "0" {
858 opts.ingest_rate = None;
859 } else {
860 let n: usize = val.parse().context("INGESTRATE must be an integer")?;
861 opts.ingest_rate = Some(n);
862 }
863 }
864 "NUMPROCESSES" => {
865 let n: usize = val.parse().context("NUMPROCESSES must be an integer")?;
866 opts.num_processes = n.max(1);
867 }
868 other => {
869 bail!("unknown COPY FROM option: {other}");
870 }
871 }
872 }
873
874 Ok(opts)
875}
876
877pub fn csv_str_to_cql_value(field: &str, type_name: &str, null_val: &str) -> Result<CqlValue> {
888 if field == null_val || (null_val.is_empty() && field.is_empty()) {
890 return Ok(CqlValue::Null);
891 }
892
893 let base_type = strip_frozen(type_name).to_lowercase();
894 let base_type = base_type.as_str();
895
896 match base_type {
897 "ascii" => Ok(CqlValue::Ascii(field.to_string())),
898 "text" | "varchar" => Ok(CqlValue::Text(field.to_string())),
899 "boolean" => {
900 let b = match field.to_lowercase().as_str() {
901 "true" | "yes" | "on" | "1" => true,
902 "false" | "no" | "off" | "0" => false,
903 _ => bail!("invalid boolean value: {field:?}"),
904 };
905 Ok(CqlValue::Boolean(b))
906 }
907 "int" => Ok(CqlValue::Int(
908 field
909 .parse::<i32>()
910 .with_context(|| format!("invalid int: {field:?}"))?,
911 )),
912 "bigint" | "counter" => Ok(CqlValue::BigInt(
913 field
914 .parse::<i64>()
915 .with_context(|| format!("invalid bigint: {field:?}"))?,
916 )),
917 "smallint" => Ok(CqlValue::SmallInt(
918 field
919 .parse::<i16>()
920 .with_context(|| format!("invalid smallint: {field:?}"))?,
921 )),
922 "tinyint" => Ok(CqlValue::TinyInt(
923 field
924 .parse::<i8>()
925 .with_context(|| format!("invalid tinyint: {field:?}"))?,
926 )),
927 "float" => Ok(CqlValue::Float(
928 field
929 .parse::<f32>()
930 .with_context(|| format!("invalid float: {field:?}"))?,
931 )),
932 "double" => Ok(CqlValue::Double(
933 field
934 .parse::<f64>()
935 .with_context(|| format!("invalid double: {field:?}"))?,
936 )),
937 "uuid" => {
938 let u =
939 uuid::Uuid::parse_str(field).with_context(|| format!("invalid uuid: {field:?}"))?;
940 Ok(CqlValue::Uuid(u))
941 }
942 "timeuuid" => {
943 let u = uuid::Uuid::parse_str(field)
944 .with_context(|| format!("invalid timeuuid: {field:?}"))?;
945 Ok(CqlValue::TimeUuid(u))
946 }
947 "timestamp" => {
948 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(field) {
950 return Ok(CqlValue::Timestamp(dt.timestamp_millis()));
951 }
952 let formats = [
954 "%Y-%m-%d %H:%M:%S%.f%z",
955 "%Y-%m-%dT%H:%M:%S%.f%z",
956 "%Y-%m-%dT%H:%M:%S%z",
957 "%Y-%m-%d %H:%M:%S%z",
958 "%Y-%m-%d %H:%M:%S%.3f+0000",
959 ];
960 for fmt in &formats {
961 if let Ok(dt) = DateTime::parse_from_str(field, fmt) {
962 return Ok(CqlValue::Timestamp(dt.timestamp_millis()));
963 }
964 }
965 if let Ok(d) = NaiveDate::parse_from_str(field, "%Y-%m-%d") {
967 let dt = d.and_hms_opt(0, 0, 0).unwrap();
968 return Ok(CqlValue::Timestamp(dt.and_utc().timestamp_millis()));
969 }
970 if let Ok(ms) = field.parse::<i64>() {
972 return Ok(CqlValue::Timestamp(ms));
973 }
974 bail!("invalid timestamp: {field:?}")
975 }
976 "date" => {
977 let d = NaiveDate::parse_from_str(field, "%Y-%m-%d")
978 .with_context(|| format!("invalid date (expected YYYY-MM-DD): {field:?}"))?;
979 Ok(CqlValue::Date(d))
980 }
981 "time" => {
982 let formats = ["%H:%M:%S%.f", "%H:%M:%S"];
984 for fmt in &formats {
985 if let Ok(t) = NaiveTime::parse_from_str(field, fmt) {
986 return Ok(CqlValue::Time(t));
987 }
988 }
989 bail!("invalid time (expected HH:MM:SS[.nnn]): {field:?}")
990 }
991 "inet" => {
992 let addr = field
993 .parse::<IpAddr>()
994 .with_context(|| format!("invalid inet: {field:?}"))?;
995 Ok(CqlValue::Inet(addr))
996 }
997 "blob" => {
998 let hex = field.strip_prefix("0x").unwrap_or(field);
1000 if !hex.len().is_multiple_of(2) {
1001 bail!("invalid blob (odd number of hex digits): {field:?}");
1002 }
1003 let bytes = (0..hex.len())
1004 .step_by(2)
1005 .map(|i| {
1006 u8::from_str_radix(&hex[i..i + 2], 16)
1007 .with_context(|| format!("invalid hex byte at offset {i}: {field:?}"))
1008 })
1009 .collect::<Result<Vec<u8>>>()?;
1010 Ok(CqlValue::Blob(bytes))
1011 }
1012 "varint" => {
1013 let n = field
1014 .parse::<num_bigint::BigInt>()
1015 .with_context(|| format!("invalid varint: {field:?}"))?;
1016 Ok(CqlValue::Varint(n))
1017 }
1018 "decimal" => {
1019 let d = field
1020 .parse::<bigdecimal::BigDecimal>()
1021 .with_context(|| format!("invalid decimal: {field:?}"))?;
1022 Ok(CqlValue::Decimal(d))
1023 }
1024 _ => Ok(CqlValue::Text(field.to_string())),
1027 }
1028}
1029
1030fn strip_frozen(type_name: &str) -> &str {
1032 let lower = type_name.to_lowercase();
1033 if lower.starts_with("frozen<") && type_name.ends_with('>') {
1034 &type_name[7..type_name.len() - 1]
1035 } else {
1036 type_name
1037 }
1038}
1039
1040fn cql_value_to_insert_literal(v: &CqlValue) -> String {
1046 match v {
1047 CqlValue::Null | CqlValue::Unset => "null".to_string(),
1048 CqlValue::Text(s) | CqlValue::Ascii(s) => {
1049 format!("'{}'", s.replace('\'', "''"))
1050 }
1051 CqlValue::Boolean(b) => if *b { "true" } else { "false" }.to_string(),
1052 CqlValue::Int(n) => n.to_string(),
1053 CqlValue::BigInt(n) | CqlValue::Counter(n) => n.to_string(),
1054 CqlValue::SmallInt(n) => n.to_string(),
1055 CqlValue::TinyInt(n) => n.to_string(),
1056 CqlValue::Float(f) => {
1057 if f.is_nan() {
1058 "NaN".to_string()
1059 } else if f.is_infinite() {
1060 if f.is_sign_positive() {
1061 "Infinity".to_string()
1062 } else {
1063 "-Infinity".to_string()
1064 }
1065 } else {
1066 f.to_string()
1067 }
1068 }
1069 CqlValue::Double(d) => {
1070 if d.is_nan() {
1071 "NaN".to_string()
1072 } else if d.is_infinite() {
1073 if d.is_sign_positive() {
1074 "Infinity".to_string()
1075 } else {
1076 "-Infinity".to_string()
1077 }
1078 } else {
1079 d.to_string()
1080 }
1081 }
1082 CqlValue::Varint(n) => n.to_string(),
1083 CqlValue::Decimal(d) => d.to_string(),
1084 CqlValue::Uuid(u) | CqlValue::TimeUuid(u) => u.to_string(),
1085 CqlValue::Timestamp(ms) => {
1086 match DateTime::from_timestamp_millis(*ms) {
1088 Some(dt) => {
1089 let utc: DateTime<Utc> = dt;
1090 format!("'{}'", utc.format("%Y-%m-%d %H:%M:%S%.6f+0000"))
1091 }
1092 None => format!("{ms}"),
1093 }
1094 }
1095 CqlValue::Date(d) => format!("'{d}'"),
1096 CqlValue::Time(t) => format!("'{t}'"),
1097 CqlValue::Inet(addr) => format!("'{addr}'"),
1098 CqlValue::Blob(bytes) => {
1099 let mut s = String::with_capacity(2 + bytes.len() * 2);
1100 s.push_str("0x");
1101 for b in bytes {
1102 s.push_str(&format!("{b:02x}"));
1103 }
1104 s
1105 }
1106 CqlValue::Duration {
1107 months,
1108 days,
1109 nanoseconds,
1110 } => {
1111 format!("{months}mo{days}d{nanoseconds}ns")
1112 }
1113 CqlValue::List(_)
1115 | CqlValue::Set(_)
1116 | CqlValue::Map(_)
1117 | CqlValue::Tuple(_)
1118 | CqlValue::UserDefinedType { .. } => v.to_string(),
1119 }
1120}
1121
1122struct TokenBucket {
1128 rate: f64,
1129 tokens: f64,
1130 last: Instant,
1131}
1132
1133impl TokenBucket {
1134 fn new(rows_per_second: usize) -> Self {
1135 Self {
1136 rate: rows_per_second as f64,
1137 tokens: rows_per_second as f64,
1138 last: Instant::now(),
1139 }
1140 }
1141
1142 async fn acquire(&mut self) {
1143 let now = Instant::now();
1144 let elapsed = now.duration_since(self.last).as_secs_f64();
1145 self.tokens = (self.tokens + elapsed * self.rate).min(self.rate);
1146 self.last = now;
1147
1148 if self.tokens < 1.0 {
1149 let wait_secs = (1.0 - self.tokens) / self.rate;
1150 tokio::time::sleep(Duration::from_secs_f64(wait_secs)).await;
1151 self.tokens = 0.0;
1152 } else {
1153 self.tokens -= 1.0;
1154 }
1155 }
1156}
1157
1158pub async fn execute_copy_from(
1160 session: &CqlSession,
1161 cmd: &CopyFromCommand,
1162 current_keyspace: Option<&str>,
1163) -> Result<()> {
1164 let start = Instant::now();
1165
1166 let table_spec = match (&cmd.keyspace, current_keyspace) {
1168 (Some(ks), _) => format!("{}.{}", ks, cmd.table),
1169 (None, Some(ks)) => format!("{}.{}", ks, cmd.table),
1170 (None, None) => cmd.table.clone(),
1171 };
1172
1173 let source_name = match &cmd.source {
1174 CopyTarget::File(path) => format!("'{}'", path.display()),
1175 CopyTarget::Stdin => "STDIN".to_string(),
1176 CopyTarget::Stdout => unreachable!("COPY FROM cannot use STDOUT"),
1177 };
1178
1179 let ttl_clause = match cmd.options.ttl {
1180 Some(ttl) => format!(" USING TTL {ttl}"),
1181 None => String::new(),
1182 };
1183
1184 let ks_for_schema = cmd
1186 .keyspace
1187 .as_deref()
1188 .or(current_keyspace)
1189 .context("no keyspace specified and no current keyspace set")?;
1190 let schema_query = format!(
1191 "SELECT column_name, kind, position, type FROM system_schema.columns \
1192 WHERE keyspace_name = '{}' AND table_name = '{}'",
1193 ks_for_schema, cmd.table
1194 );
1195 let schema_result = session.execute_query(&schema_query).await?;
1196
1197 let mut schema_cols: Vec<(String, String, i32, String)> = Vec::new();
1199 for row in &schema_result.rows {
1200 let name = match row.values.first() {
1201 Some(CqlValue::Text(n)) => n.clone(),
1202 _ => continue,
1203 };
1204 let kind = match row.values.get(1) {
1205 Some(CqlValue::Text(k)) => k.clone(),
1206 _ => "regular".to_string(),
1207 };
1208 let position = match row.values.get(2) {
1209 Some(CqlValue::Int(p)) => *p,
1210 _ => -1,
1211 };
1212 let type_name = match row.values.get(3) {
1213 Some(CqlValue::Text(t)) => t.clone(),
1214 _ => "text".to_string(),
1215 };
1216 schema_cols.push((name, kind, position, type_name));
1217 }
1218 if schema_cols.is_empty() {
1219 bail!(
1220 "could not determine columns for table '{}' — table may not exist",
1221 table_spec
1222 );
1223 }
1224 schema_cols.sort_by(|a, b| {
1225 let kind_order = |k: &str| -> i32 {
1226 match k {
1227 "partition_key" => 0,
1228 "clustering" => 1,
1229 "static" => 2,
1230 _ => 3,
1231 }
1232 };
1233 kind_order(&a.1)
1234 .cmp(&kind_order(&b.1))
1235 .then(a.2.cmp(&b.2))
1236 .then(a.0.cmp(&b.0))
1237 });
1238
1239 let type_map: std::collections::HashMap<String, String> = schema_cols
1241 .iter()
1242 .map(|(n, _, _, t)| (n.clone(), t.clone()))
1243 .collect();
1244
1245 let prelim_columns: Vec<(String, String)> = match &cmd.columns {
1247 Some(explicit) => explicit
1248 .iter()
1249 .map(|n| {
1250 let t = type_map
1251 .get(n)
1252 .cloned()
1253 .unwrap_or_else(|| "text".to_string());
1254 (n.clone(), t)
1255 })
1256 .collect(),
1257 None => schema_cols.into_iter().map(|(n, _, _, t)| (n, t)).collect(),
1258 };
1259
1260 let reader: Box<dyn std::io::Read> = match &cmd.source {
1262 CopyTarget::File(path) => {
1263 let file = std::fs::File::open(path)
1264 .with_context(|| format!("failed to open file: {}", path.display()))?;
1265 Box::new(std::io::BufReader::new(file))
1266 }
1267 CopyTarget::Stdin => Box::new(std::io::stdin().lock()),
1268 CopyTarget::Stdout => bail!("COPY FROM cannot read from STDOUT"),
1269 };
1270 let mut csv_reader = csv::ReaderBuilder::new()
1271 .delimiter(cmd.options.delimiter as u8)
1272 .quote(cmd.options.quote as u8)
1273 .escape(Some(cmd.options.escape as u8))
1274 .has_headers(cmd.options.header)
1275 .flexible(true)
1276 .from_reader(reader);
1277
1278 let columns: Vec<(String, String)> = if cmd.options.header && cmd.columns.is_none() {
1280 let headers = csv_reader
1281 .headers()
1282 .context("failed to read CSV header row")?;
1283 headers
1284 .iter()
1285 .map(|h| {
1286 let name = h.trim().to_string();
1287 let t = type_map
1288 .get(&name)
1289 .cloned()
1290 .unwrap_or_else(|| "text".to_string());
1291 (name, t)
1292 })
1293 .collect()
1294 } else {
1295 prelim_columns
1296 };
1297
1298 let col_list: String = columns
1299 .iter()
1300 .map(|(n, _)| n.as_str())
1301 .collect::<Vec<_>>()
1302 .join(", ");
1303 let col_type_names: Vec<String> = columns.iter().map(|(_, t)| t.clone()).collect();
1304
1305 let prepared_id = if cmd.options.prepared_statements {
1307 let placeholders = vec!["?"; columns.len()].join(", ");
1308 let insert_template =
1309 format!("INSERT INTO {table_spec} ({col_list}) VALUES ({placeholders}){ttl_clause}");
1310 Some(
1311 session
1312 .prepare(&insert_template)
1313 .await
1314 .with_context(|| format!("failed to prepare: {insert_template}"))?,
1315 )
1316 } else {
1317 None
1318 };
1319
1320 let mut err_writer: Option<std::io::BufWriter<std::fs::File>> = match &cmd.options.err_file {
1322 Some(path) => {
1323 let file = std::fs::File::create(path)
1324 .with_context(|| format!("failed to create error file: {}", path.display()))?;
1325 Some(std::io::BufWriter::new(file))
1326 }
1327 None => None,
1328 };
1329
1330 let mut row_count: usize = 0;
1331 let mut parse_errors: usize = 0;
1332 let mut insert_errors: usize = 0;
1333 let mut rate_limiter = cmd.options.ingest_rate.map(TokenBucket::new);
1334 let num_processes = cmd.options.num_processes.max(1);
1335 let chunk_size = cmd.options.chunk_size.max(1);
1336
1337 let max_attempts = cmd.options.max_attempts;
1338 let max_parse_errors = cmd.options.max_parse_errors;
1339 let max_insert_errors = cmd.options.max_insert_errors;
1340 let report_frequency = cmd.options.report_frequency;
1341 let null_val = &cmd.options.null_val;
1342
1343 let mut csv_records = csv_reader.records();
1344
1345 'outer: loop {
1346 let mut chunk: Vec<Vec<CqlValue>> = Vec::with_capacity(chunk_size);
1348
1349 'fill: loop {
1350 if chunk.len() >= chunk_size {
1351 break 'fill;
1352 }
1353 let record = match csv_records.next() {
1354 None => break 'fill,
1355 Some(Err(e)) => {
1356 parse_errors += 1;
1357 let msg = format!("CSV parse error on row {}: {e}", row_count + parse_errors);
1358 eprintln!("{msg}");
1359 if let Some(ref mut w) = err_writer {
1360 let _ = writeln!(w, "{msg}");
1361 }
1362 if let Some(max) = max_parse_errors {
1363 if parse_errors > max {
1364 bail!("Exceeded maximum parse errors ({max}). Aborting.");
1365 }
1366 }
1367 continue 'fill;
1368 }
1369 Some(Ok(r)) => r,
1370 };
1371
1372 if record.len() != col_type_names.len() {
1373 parse_errors += 1;
1374 let msg = format!(
1375 "Row {}: expected {} columns but got {}",
1376 row_count + parse_errors,
1377 col_type_names.len(),
1378 record.len()
1379 );
1380 eprintln!("{msg}");
1381 if let Some(ref mut w) = err_writer {
1382 let _ = writeln!(w, "{msg}");
1383 }
1384 if let Some(max) = max_parse_errors {
1385 if parse_errors > max {
1386 bail!("Exceeded maximum number of parse errors ({max}). Aborting import.");
1387 }
1388 }
1389 continue 'fill;
1390 }
1391
1392 let mut row_values: Vec<CqlValue> = Vec::with_capacity(col_type_names.len());
1393 let mut row_ok = true;
1394 for (field, type_name) in record.iter().zip(col_type_names.iter()) {
1395 match csv_str_to_cql_value(field, type_name, null_val) {
1396 Ok(v) => row_values.push(v),
1397 Err(e) => {
1398 parse_errors += 1;
1399 let msg = format!(
1400 "Row {}: type error for '{}': {e}",
1401 row_count + parse_errors,
1402 type_name
1403 );
1404 eprintln!("{msg}");
1405 if let Some(ref mut w) = err_writer {
1406 let _ = writeln!(w, "{msg}");
1407 }
1408 if let Some(max) = max_parse_errors {
1409 if parse_errors > max {
1410 bail!("Exceeded maximum parse errors ({max}). Aborting.");
1411 }
1412 }
1413 row_ok = false;
1414 break;
1415 }
1416 }
1417 }
1418 if row_ok {
1419 chunk.push(row_values);
1420 }
1421 }
1422
1423 if chunk.is_empty() {
1424 break 'outer;
1425 }
1426
1427 if let Some(ref mut bucket) = rate_limiter {
1429 for _ in 0..chunk.len() {
1430 bucket.acquire().await;
1431 }
1432 }
1433
1434 let insert_results: Vec<Result<()>> = futures::stream::iter(chunk)
1436 .map(|values| {
1437 let ts = table_spec.as_str();
1438 let cl = col_list.as_str();
1439 let ttl = ttl_clause.as_str();
1440 let pid = prepared_id.as_ref();
1441 async move {
1442 insert_row_with_retry(session, pid, ts, cl, ttl, &values, max_attempts).await
1443 }
1444 })
1445 .buffer_unordered(num_processes)
1446 .collect()
1447 .await;
1448
1449 for result in insert_results {
1450 match result {
1451 Ok(()) => row_count += 1,
1452 Err(e) => {
1453 insert_errors += 1;
1454 let msg = format!("Insert error on row {}: {e}", row_count + insert_errors);
1455 eprintln!("{msg}");
1456 if let Some(ref mut w) = err_writer {
1457 let _ = writeln!(w, "{msg}");
1458 }
1459 if let Some(max) = max_insert_errors {
1460 if insert_errors > max {
1461 bail!("Exceeded maximum number of insert errors ({max}). Aborting import.");
1462 }
1463 }
1464 }
1465 }
1466 }
1467
1468 if let Some(freq) = report_frequency {
1470 let total = row_count + insert_errors + parse_errors;
1471 if freq > 0 && total > 0 && total.is_multiple_of(freq) {
1472 eprintln!("Processed {} rows...", row_count);
1473 }
1474 }
1475 }
1476
1477 if let Some(ref mut w) = err_writer {
1478 w.flush()?;
1479 }
1480
1481 let elapsed = start.elapsed().as_secs_f64();
1482 println!("{row_count} rows imported from {source_name} in {elapsed:.3}s.");
1483 if parse_errors > 0 {
1484 eprintln!("{parse_errors} parse error(s) encountered.");
1485 }
1486 if insert_errors > 0 {
1487 eprintln!("{insert_errors} insert error(s) encountered.");
1488 }
1489
1490 Ok(())
1491}
1492
1493async fn insert_row_with_retry(
1498 session: &CqlSession,
1499 prepared_id: Option<&PreparedId>,
1500 table_spec: &str,
1501 col_list: &str,
1502 ttl_clause: &str,
1503 values: &[CqlValue],
1504 max_attempts: usize,
1505) -> Result<()> {
1506 let max = max_attempts.max(1);
1507 let mut last_err = anyhow::anyhow!("no attempts made");
1508
1509 for attempt in 1..=max {
1510 let result = if let Some(id) = prepared_id {
1511 session.execute_prepared(id, values).await
1512 } else {
1513 let literals: Vec<String> = values.iter().map(cql_value_to_insert_literal).collect();
1514 let insert = format!(
1515 "INSERT INTO {} ({}) VALUES ({}){};",
1516 table_spec,
1517 col_list,
1518 literals.join(", "),
1519 ttl_clause
1520 );
1521 session.execute_query(&insert).await
1522 };
1523
1524 match result {
1525 Ok(_) => return Ok(()),
1526 Err(e) => {
1527 last_err = e;
1528 if attempt < max {
1529 let wait_ms = (100u64 * (1u64 << (attempt - 1).min(4))).min(2000);
1531 tokio::time::sleep(Duration::from_millis(wait_ms)).await;
1532 }
1533 }
1534 }
1535 }
1536
1537 Err(last_err)
1538}
1539
1540#[cfg(test)]
1541mod tests {
1542 use super::*;
1543
1544 #[test]
1545 fn parse_copy_to_basic() {
1546 let cmd = parse_copy_to("COPY ks.table TO '/tmp/out.csv'").unwrap();
1547 assert_eq!(cmd.keyspace, Some("ks".to_string()));
1548 assert_eq!(cmd.table, "table");
1549 assert_eq!(cmd.columns, None);
1550 assert_eq!(
1551 cmd.filename,
1552 CopyTarget::File(PathBuf::from("/tmp/out.csv"))
1553 );
1554 }
1555
1556 #[test]
1557 fn parse_copy_to_with_columns() {
1558 let cmd = parse_copy_to("COPY ks.table (col1, col2) TO '/tmp/out.csv'").unwrap();
1559 assert_eq!(cmd.keyspace, Some("ks".to_string()));
1560 assert_eq!(cmd.table, "table");
1561 assert_eq!(
1562 cmd.columns,
1563 Some(vec!["col1".to_string(), "col2".to_string()])
1564 );
1565 assert_eq!(
1566 cmd.filename,
1567 CopyTarget::File(PathBuf::from("/tmp/out.csv"))
1568 );
1569 }
1570
1571 #[test]
1572 fn parse_copy_to_stdout() {
1573 let cmd = parse_copy_to("COPY ks.table TO STDOUT").unwrap();
1574 assert_eq!(cmd.filename, CopyTarget::Stdout);
1575 }
1576
1577 #[test]
1578 fn parse_copy_to_with_options() {
1579 let cmd =
1580 parse_copy_to("COPY ks.table TO '/tmp/out.csv' WITH DELIMITER='|' AND HEADER=true")
1581 .unwrap();
1582 assert_eq!(cmd.options.delimiter, '|');
1583 assert!(cmd.options.header);
1584 }
1585
1586 #[test]
1587 fn format_value_null() {
1588 let opts = CopyOptions::default();
1589 assert_eq!(format_value_for_csv(&CqlValue::Null, &opts), "");
1590 }
1591
1592 #[test]
1593 fn format_value_text() {
1594 let opts = CopyOptions::default();
1595 assert_eq!(
1596 format_value_for_csv(&CqlValue::Text("hello".to_string()), &opts),
1597 "hello"
1598 );
1599 }
1600
1601 #[test]
1602 fn format_value_boolean() {
1603 let opts = CopyOptions::default();
1604 assert_eq!(
1605 format_value_for_csv(&CqlValue::Boolean(true), &opts),
1606 "True"
1607 );
1608 assert_eq!(
1609 format_value_for_csv(&CqlValue::Boolean(false), &opts),
1610 "False"
1611 );
1612 }
1613
1614 #[test]
1615 fn format_value_float_precision() {
1616 let opts = CopyOptions {
1617 float_precision: 3,
1618 ..Default::default()
1619 };
1620 assert_eq!(
1621 format_value_for_csv(&CqlValue::Float(1.23456), &opts),
1622 "1.235"
1623 );
1624 }
1625
1626 #[test]
1627 fn default_options() {
1628 let opts = CopyOptions::default();
1629 assert_eq!(opts.delimiter, ',');
1630 assert_eq!(opts.quote, '"');
1631 assert_eq!(opts.escape, '\\');
1632 assert!(!opts.header);
1633 assert_eq!(opts.null_val, "");
1634 assert_eq!(opts.datetime_format, None);
1635 assert_eq!(opts.encoding, "utf-8");
1636 assert_eq!(opts.float_precision, 5);
1637 assert_eq!(opts.double_precision, 12);
1638 assert_eq!(opts.decimal_sep, '.');
1639 assert_eq!(opts.thousands_sep, None);
1640 assert_eq!(opts.bool_style, ("True".to_string(), "False".to_string()));
1641 assert_eq!(opts.page_size, 1000);
1642 assert_eq!(opts.max_output_size, None);
1643 assert_eq!(opts.report_frequency, None);
1644 }
1645
1646 #[test]
1651 fn parse_copy_from_basic() {
1652 let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv'").unwrap();
1653 assert_eq!(cmd.keyspace, Some("ks".to_string()));
1654 assert_eq!(cmd.table, "table");
1655 assert_eq!(cmd.columns, None);
1656 assert_eq!(cmd.source, CopyTarget::File(PathBuf::from("/tmp/in.csv")));
1657 }
1658
1659 #[test]
1660 fn parse_copy_from_with_columns() {
1661 let cmd = parse_copy_from("COPY ks.table (col1, col2) FROM '/tmp/in.csv'").unwrap();
1662 assert_eq!(cmd.keyspace, Some("ks".to_string()));
1663 assert_eq!(cmd.table, "table");
1664 assert_eq!(
1665 cmd.columns,
1666 Some(vec!["col1".to_string(), "col2".to_string()])
1667 );
1668 }
1669
1670 #[test]
1671 fn parse_copy_from_stdin() {
1672 let cmd = parse_copy_from("COPY ks.table FROM STDIN").unwrap();
1673 assert_eq!(cmd.source, CopyTarget::Stdin);
1674 }
1675
1676 #[test]
1677 fn parse_copy_from_stdin_case_insensitive() {
1678 let cmd = parse_copy_from("COPY ks.table FROM stdin").unwrap();
1679 assert_eq!(cmd.source, CopyTarget::Stdin);
1680 }
1681
1682 #[test]
1683 fn parse_copy_from_no_keyspace() {
1684 let cmd = parse_copy_from("COPY mytable FROM '/data/file.csv'").unwrap();
1685 assert_eq!(cmd.keyspace, None);
1686 assert_eq!(cmd.table, "mytable");
1687 }
1688
1689 #[test]
1690 fn parse_copy_from_with_options() {
1691 let cmd = parse_copy_from(
1692 "COPY ks.table FROM '/tmp/in.csv' WITH TTL=3600 AND HEADER=true AND CHUNKSIZE=1000 AND DELIMITER='|'",
1693 )
1694 .unwrap();
1695 assert_eq!(cmd.options.ttl, Some(3600));
1696 assert!(cmd.options.header);
1697 assert_eq!(cmd.options.chunk_size, 1000);
1698 assert_eq!(cmd.options.delimiter, '|');
1699 }
1700
1701 #[test]
1702 fn parse_copy_from_with_error_options() {
1703 let cmd = parse_copy_from(
1704 "COPY ks.table FROM '/tmp/in.csv' WITH MAXPARSEERRORS=100 AND MAXINSERTERRORS=50 AND ERRFILE='/tmp/err.log'",
1705 )
1706 .unwrap();
1707 assert_eq!(cmd.options.max_parse_errors, Some(100));
1708 assert_eq!(cmd.options.max_insert_errors, Some(50));
1709 assert_eq!(cmd.options.err_file, Some(PathBuf::from("/tmp/err.log")));
1710 }
1711
1712 #[test]
1713 fn parse_copy_from_with_batch_options() {
1714 let cmd = parse_copy_from(
1715 "COPY ks.table FROM '/tmp/in.csv' WITH MAXBATCHSIZE=50 AND MINBATCHSIZE=5 AND MAXATTEMPTS=10",
1716 )
1717 .unwrap();
1718 assert_eq!(cmd.options.max_batch_size, 50);
1719 assert_eq!(cmd.options.min_batch_size, 5);
1720 assert_eq!(cmd.options.max_attempts, 10);
1721 }
1722
1723 #[test]
1724 fn parse_copy_from_semicolon() {
1725 let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv';").unwrap();
1726 assert_eq!(cmd.source, CopyTarget::File(PathBuf::from("/tmp/in.csv")));
1727 }
1728
1729 #[test]
1730 fn default_copy_from_options() {
1731 let opts = CopyFromOptions::default();
1732 assert_eq!(opts.delimiter, ',');
1733 assert_eq!(opts.quote, '"');
1734 assert_eq!(opts.escape, '\\');
1735 assert!(!opts.header);
1736 assert_eq!(opts.null_val, "");
1737 assert_eq!(opts.datetime_format, None);
1738 assert_eq!(opts.encoding, "utf-8");
1739 assert_eq!(opts.chunk_size, 5000);
1740 assert_eq!(opts.max_batch_size, 20);
1741 assert_eq!(opts.min_batch_size, 2);
1742 assert!(opts.prepared_statements);
1743 assert_eq!(opts.ttl, None);
1744 assert_eq!(opts.max_attempts, 5);
1745 assert_eq!(opts.max_parse_errors, None);
1746 assert_eq!(opts.max_insert_errors, None);
1747 assert_eq!(opts.err_file, None);
1748 assert_eq!(opts.report_frequency, None);
1749 assert_eq!(opts.ingest_rate, None);
1750 assert_eq!(opts.num_processes, 1);
1751 }
1752
1753 #[test]
1758 fn csv_to_cql_text_types() {
1759 let v = csv_str_to_cql_value("hello", "text", "").unwrap();
1760 assert_eq!(v, CqlValue::Text("hello".to_string()));
1761
1762 let v = csv_str_to_cql_value("hi", "ascii", "").unwrap();
1763 assert_eq!(v, CqlValue::Ascii("hi".to_string()));
1764
1765 let v = csv_str_to_cql_value("world", "varchar", "").unwrap();
1766 assert_eq!(v, CqlValue::Text("world".to_string())); }
1768
1769 #[test]
1770 fn csv_to_cql_int_types() {
1771 assert_eq!(
1772 csv_str_to_cql_value("42", "int", "").unwrap(),
1773 CqlValue::Int(42)
1774 );
1775 assert_eq!(
1776 csv_str_to_cql_value("-100", "bigint", "").unwrap(),
1777 CqlValue::BigInt(-100)
1778 );
1779 assert_eq!(
1780 csv_str_to_cql_value("1000", "counter", "").unwrap(),
1781 CqlValue::BigInt(1000)
1782 );
1783 assert_eq!(
1784 csv_str_to_cql_value("32767", "smallint", "").unwrap(),
1785 CqlValue::SmallInt(32767)
1786 );
1787 assert_eq!(
1788 csv_str_to_cql_value("127", "tinyint", "").unwrap(),
1789 CqlValue::TinyInt(127)
1790 );
1791 }
1792
1793 #[test]
1794 fn csv_to_cql_float_types() {
1795 match csv_str_to_cql_value("1.5", "float", "").unwrap() {
1796 CqlValue::Float(f) => assert!((f - 1.5f32).abs() < 1e-5),
1797 other => panic!("expected Float, got {other:?}"),
1798 }
1799 match csv_str_to_cql_value("1.5", "double", "").unwrap() {
1800 CqlValue::Double(d) => assert!((d - 1.5f64).abs() < 1e-9),
1801 other => panic!("expected Double, got {other:?}"),
1802 }
1803 assert!(matches!(
1805 csv_str_to_cql_value("1e10", "double", "").unwrap(),
1806 CqlValue::Double(_)
1807 ));
1808 }
1809
1810 #[test]
1811 fn csv_to_cql_boolean() {
1812 for t in &["true", "True", "TRUE", "yes", "YES", "on", "ON", "1"] {
1813 assert_eq!(
1814 csv_str_to_cql_value(t, "boolean", "").unwrap(),
1815 CqlValue::Boolean(true),
1816 "expected true for {t:?}"
1817 );
1818 }
1819 for f in &["false", "False", "FALSE", "no", "NO", "off", "OFF", "0"] {
1820 assert_eq!(
1821 csv_str_to_cql_value(f, "boolean", "").unwrap(),
1822 CqlValue::Boolean(false),
1823 "expected false for {f:?}"
1824 );
1825 }
1826 }
1827
1828 #[test]
1829 fn csv_to_cql_uuid() {
1830 let uuid_str = "550e8400-e29b-41d4-a716-446655440000";
1831 assert!(matches!(
1832 csv_str_to_cql_value(uuid_str, "uuid", "").unwrap(),
1833 CqlValue::Uuid(_)
1834 ));
1835 assert!(matches!(
1836 csv_str_to_cql_value(uuid_str, "timeuuid", "").unwrap(),
1837 CqlValue::TimeUuid(_)
1838 ));
1839 assert!(csv_str_to_cql_value("not-a-uuid", "uuid", "").is_err());
1841 }
1842
1843 #[test]
1844 fn csv_to_cql_timestamp() {
1845 let v = csv_str_to_cql_value("2024-01-15T12:34:56Z", "timestamp", "").unwrap();
1847 assert!(matches!(v, CqlValue::Timestamp(_)));
1848
1849 let v = csv_str_to_cql_value("1705318496000", "timestamp", "").unwrap();
1851 assert_eq!(v, CqlValue::Timestamp(1705318496000));
1852 }
1853
1854 #[test]
1855 fn csv_to_cql_date() {
1856 use chrono::NaiveDate;
1857 let v = csv_str_to_cql_value("2024-01-15", "date", "").unwrap();
1858 assert_eq!(
1859 v,
1860 CqlValue::Date(NaiveDate::from_ymd_opt(2024, 1, 15).unwrap())
1861 );
1862
1863 assert!(csv_str_to_cql_value("not-a-date", "date", "").is_err());
1864 }
1865
1866 #[test]
1867 fn csv_to_cql_time() {
1868 let v = csv_str_to_cql_value("12:34:56", "time", "").unwrap();
1869 assert!(matches!(v, CqlValue::Time(_)));
1870
1871 let v = csv_str_to_cql_value("12:34:56.789", "time", "").unwrap();
1872 assert!(matches!(v, CqlValue::Time(_)));
1873
1874 assert!(csv_str_to_cql_value("not-a-time", "time", "").is_err());
1875 }
1876
1877 #[test]
1878 fn csv_to_cql_inet() {
1879 let v = csv_str_to_cql_value("127.0.0.1", "inet", "").unwrap();
1880 assert!(matches!(v, CqlValue::Inet(_)));
1881
1882 let v = csv_str_to_cql_value("::1", "inet", "").unwrap();
1883 assert!(matches!(v, CqlValue::Inet(_)));
1884
1885 assert!(csv_str_to_cql_value("not.an.ip", "inet", "").is_err());
1886 }
1887
1888 #[test]
1889 fn csv_to_cql_blob() {
1890 let v = csv_str_to_cql_value("0xdeadbeef", "blob", "").unwrap();
1891 assert_eq!(v, CqlValue::Blob(vec![0xde, 0xad, 0xbe, 0xef]));
1892
1893 let v = csv_str_to_cql_value("deadbeef", "blob", "").unwrap();
1895 assert_eq!(v, CqlValue::Blob(vec![0xde, 0xad, 0xbe, 0xef]));
1896
1897 assert!(csv_str_to_cql_value("0xgg", "blob", "").is_err());
1899 assert!(csv_str_to_cql_value("0xabc", "blob", "").is_err());
1901 }
1902
1903 #[test]
1904 fn csv_to_cql_null_handling() {
1905 assert_eq!(csv_str_to_cql_value("", "int", "").unwrap(), CqlValue::Null);
1907 assert_eq!(
1908 csv_str_to_cql_value("", "text", "").unwrap(),
1909 CqlValue::Null
1910 );
1911 }
1912
1913 #[test]
1914 fn csv_to_cql_null_custom() {
1915 assert_eq!(
1917 csv_str_to_cql_value("NULL", "int", "NULL").unwrap(),
1918 CqlValue::Null
1919 );
1920 assert_eq!(
1921 csv_str_to_cql_value("N/A", "text", "N/A").unwrap(),
1922 CqlValue::Null
1923 );
1924 assert!(matches!(
1926 csv_str_to_cql_value("42", "int", "NULL").unwrap(),
1927 CqlValue::Int(42)
1928 ));
1929 }
1930
1931 #[test]
1932 fn csv_to_cql_unknown_type_fallback() {
1933 let v = csv_str_to_cql_value("hello", "customtype", "").unwrap();
1935 assert_eq!(v, CqlValue::Text("hello".to_string()));
1936
1937 let v = csv_str_to_cql_value("[1, 2, 3]", "list<int>", "").unwrap();
1939 assert_eq!(v, CqlValue::Text("[1, 2, 3]".to_string()));
1940 }
1941
1942 #[test]
1943 fn csv_to_cql_parse_error_int() {
1944 assert!(csv_str_to_cql_value("notanint", "int", "").is_err());
1946 assert!(csv_str_to_cql_value("3.14", "int", "").is_err());
1947 assert!(csv_str_to_cql_value("notanint", "bigint", "").is_err());
1948 }
1949
1950 #[test]
1951 fn csv_to_cql_varint_and_decimal() {
1952 let v = csv_str_to_cql_value("123456789012345678901234567890", "varint", "").unwrap();
1953 assert!(matches!(v, CqlValue::Varint(_)));
1954
1955 let v = csv_str_to_cql_value("3.141592653589793", "decimal", "").unwrap();
1956 assert!(matches!(v, CqlValue::Decimal(_)));
1957 }
1958
1959 #[test]
1960 fn csv_to_cql_frozen_stripped() {
1961 let v = csv_str_to_cql_value("{uuid1, uuid2}", "frozen<set<uuid>>", "").unwrap();
1963 assert!(matches!(v, CqlValue::Text(_)));
1964 }
1965
1966 #[test]
1967 fn parse_copy_from_numprocesses() {
1968 let cmd = parse_copy_from("COPY ks.table FROM '/tmp/in.csv' WITH NUMPROCESSES=4").unwrap();
1969 assert_eq!(cmd.options.num_processes, 4);
1970 }
1971
1972 #[test]
1973 fn format_value_integers() {
1974 let opts = CopyOptions::default();
1975 assert_eq!(format_value_for_csv(&CqlValue::Int(42), &opts), "42");
1976 assert_eq!(format_value_for_csv(&CqlValue::BigInt(-100), &opts), "-100");
1977 assert_eq!(format_value_for_csv(&CqlValue::SmallInt(7), &opts), "7");
1978 assert_eq!(format_value_for_csv(&CqlValue::TinyInt(-1), &opts), "-1");
1979 assert_eq!(format_value_for_csv(&CqlValue::Counter(99), &opts), "99");
1980 }
1981
1982 #[test]
1983 fn format_value_blob() {
1984 let opts = CopyOptions::default();
1985 assert_eq!(
1986 format_value_for_csv(&CqlValue::Blob(vec![0xca, 0xfe]), &opts),
1987 "0xcafe"
1988 );
1989 }
1990
1991 #[test]
1992 fn format_value_uuid() {
1993 let opts = CopyOptions::default();
1994 let output = format_value_for_csv(&CqlValue::Uuid(uuid::Uuid::nil()), &opts);
1995 assert_eq!(output, "00000000-0000-0000-0000-000000000000");
1996 }
1997
1998 #[test]
1999 fn format_value_inet() {
2000 let opts = CopyOptions::default();
2001 let output = format_value_for_csv(&CqlValue::Inet("127.0.0.1".parse().unwrap()), &opts);
2002 assert_eq!(output, "127.0.0.1");
2003 }
2004
2005 #[test]
2006 fn format_value_duration() {
2007 let opts = CopyOptions::default();
2008 let dur = CqlValue::Duration {
2009 months: 1,
2010 days: 2,
2011 nanoseconds: 3,
2012 };
2013 assert_eq!(format_value_for_csv(&dur, &opts), "1mo2d3ns");
2014 }
2015
2016 #[test]
2017 fn format_value_decimal_custom_sep() {
2018 use bigdecimal::BigDecimal;
2019 use std::str::FromStr;
2020 let opts = CopyOptions {
2021 decimal_sep: ',',
2022 ..Default::default()
2023 };
2024 let dec = CqlValue::Decimal(BigDecimal::from_str("3.14").unwrap());
2025 assert_eq!(format_value_for_csv(&dec, &opts), "3,14");
2026 }
2027
2028 #[test]
2029 fn format_value_varint() {
2030 let opts = CopyOptions::default();
2031 let v = CqlValue::Varint(num_bigint::BigInt::from(12345));
2032 assert_eq!(format_value_for_csv(&v, &opts), "12345");
2033 }
2034
2035 #[test]
2036 fn format_value_float_nan_inf() {
2037 let opts = CopyOptions::default();
2038 assert_eq!(
2039 format_value_for_csv(&CqlValue::Float(f32::NAN), &opts),
2040 "NaN"
2041 );
2042 assert_eq!(
2043 format_value_for_csv(&CqlValue::Float(f32::INFINITY), &opts),
2044 "Infinity"
2045 );
2046 assert_eq!(
2047 format_value_for_csv(&CqlValue::Float(f32::NEG_INFINITY), &opts),
2048 "-Infinity"
2049 );
2050 }
2051
2052 #[test]
2053 fn format_value_double_precision() {
2054 let opts = CopyOptions {
2055 double_precision: 3,
2056 ..Default::default()
2057 };
2058 assert_eq!(
2059 format_value_for_csv(&CqlValue::Double(1.23456), &opts),
2060 "1.235"
2061 );
2062 }
2063
2064 #[test]
2065 fn format_value_float_custom_decimal_sep() {
2066 let opts = CopyOptions {
2067 float_precision: 2,
2068 decimal_sep: ',',
2069 ..Default::default()
2070 };
2071 assert_eq!(format_value_for_csv(&CqlValue::Float(1.5), &opts), "1,50");
2072 }
2073
2074 #[test]
2075 fn format_value_timestamp() {
2076 let opts = CopyOptions::default();
2077 let output = format_value_for_csv(&CqlValue::Timestamp(0), &opts);
2078 assert!(output.contains("1970-01-01"));
2079 }
2080
2081 #[test]
2082 fn format_value_timestamp_custom_format() {
2083 let opts = CopyOptions {
2084 datetime_format: Some("%Y/%m/%d".to_string()),
2085 ..Default::default()
2086 };
2087 let output = format_value_for_csv(&CqlValue::Timestamp(0), &opts);
2088 assert_eq!(output, "1970/01/01");
2089 }
2090
2091 #[test]
2092 fn format_value_collections() {
2093 let opts = CopyOptions::default();
2094 let list = CqlValue::List(vec![CqlValue::Int(1), CqlValue::Int(2)]);
2095 assert_eq!(format_value_for_csv(&list, &opts), "[1, 2]");
2096
2097 let map = CqlValue::Map(vec![(CqlValue::Text("k".to_string()), CqlValue::Int(1))]);
2098 assert_eq!(format_value_for_csv(&map, &opts), "{'k': 1}");
2099 }
2100
2101 #[test]
2102 fn format_value_unset() {
2103 let opts = CopyOptions::default();
2104 assert_eq!(format_value_for_csv(&CqlValue::Unset, &opts), "");
2105 }
2106
2107 #[test]
2108 fn parse_copy_to_no_keyspace() {
2109 let cmd = parse_copy_to("COPY mytable TO '/tmp/out.csv'").unwrap();
2110 assert_eq!(cmd.keyspace, None);
2111 assert_eq!(cmd.table, "mytable");
2112 }
2113
2114 #[test]
2115 fn parse_copy_to_case_insensitive() {
2116 let cmd = parse_copy_to("copy ks.table to STDOUT").unwrap();
2117 assert_eq!(cmd.filename, CopyTarget::Stdout);
2118 }
2119
2120 #[test]
2121 fn parse_copy_to_underscore_in_names() {
2122 let cmd = parse_copy_to("COPY test_copy_to_f49405.copy_test TO '/tmp/out.csv'").unwrap();
2123 assert_eq!(cmd.keyspace.as_deref(), Some("test_copy_to_f49405"));
2124 assert_eq!(cmd.table, "copy_test");
2125 }
2126
2127 #[test]
2128 fn parse_copy_from_underscore_in_names() {
2129 let cmd = parse_copy_from("COPY test_from_data.my_table FROM '/tmp/in.csv'").unwrap();
2130 assert_eq!(cmd.keyspace.as_deref(), Some("test_from_data"));
2131 assert_eq!(cmd.table, "my_table");
2132 }
2133
2134 #[test]
2135 fn parse_copy_to_invalid_not_copy() {
2136 assert!(parse_copy_to("SELECT * FROM foo").is_err());
2137 }
2138
2139 #[test]
2140 fn parse_copy_to_missing_to() {
2141 assert!(parse_copy_to("COPY ks.table '/tmp/out.csv'").is_err());
2142 }
2143
2144 #[test]
2145 fn parse_copy_from_invalid_not_copy() {
2146 assert!(parse_copy_from("SELECT * FROM foo").is_err());
2147 }
2148}