1use std::collections::HashMap;
10use std::time::{Duration, Instant};
11
12use anyhow::Result;
13
14use crate::driver::{
15 AggregateMetadata, FunctionMetadata, KeyspaceMetadata, TableMetadata, UdtMetadata,
16};
17use crate::session::CqlSession;
18
19const DEFAULT_TTL: Duration = Duration::from_secs(30);
21
22pub struct SchemaCache {
28 keyspaces: Vec<KeyspaceMetadata>,
30 tables: HashMap<String, Vec<TableMetadata>>,
32 udts: HashMap<String, Vec<UdtMetadata>>,
34 functions: HashMap<String, Vec<FunctionMetadata>>,
36 aggregates: HashMap<String, Vec<AggregateMetadata>>,
38 last_refresh: Option<Instant>,
40 ttl: Duration,
42}
43
44impl SchemaCache {
45 pub fn new() -> Self {
47 Self::with_ttl(DEFAULT_TTL)
48 }
49
50 pub fn with_ttl(ttl: Duration) -> Self {
52 SchemaCache {
53 keyspaces: Vec::new(),
54 tables: HashMap::new(),
55 udts: HashMap::new(),
56 functions: HashMap::new(),
57 aggregates: HashMap::new(),
58 last_refresh: None,
59 ttl,
60 }
61 }
62
63 pub fn is_stale(&self) -> bool {
65 match self.last_refresh {
66 None => true,
67 Some(refreshed_at) => refreshed_at.elapsed() >= self.ttl,
68 }
69 }
70
71 pub fn invalidate(&mut self) {
73 self.last_refresh = None;
74 }
75
76 pub async fn refresh(&mut self, session: &CqlSession) -> Result<()> {
82 let keyspaces = session.get_keyspaces().await?;
83
84 let mut tables: HashMap<String, Vec<TableMetadata>> = HashMap::new();
85 let mut udts: HashMap<String, Vec<UdtMetadata>> = HashMap::new();
86 let mut functions: HashMap<String, Vec<FunctionMetadata>> = HashMap::new();
87 let mut aggregates: HashMap<String, Vec<AggregateMetadata>> = HashMap::new();
88
89 for ks in &keyspaces {
90 let ks_name = ks.name.as_str();
91
92 if let Ok(t) = session.get_tables(ks_name).await {
94 tables.insert(ks_name.to_string(), t);
95 }
96 if let Ok(u) = session.get_udts(ks_name).await {
97 udts.insert(ks_name.to_string(), u);
98 }
99 if let Ok(f) = session.get_functions(ks_name).await {
100 functions.insert(ks_name.to_string(), f);
101 }
102 if let Ok(a) = session.get_aggregates(ks_name).await {
103 aggregates.insert(ks_name.to_string(), a);
104 }
105 }
106
107 self.keyspaces = keyspaces;
108 self.tables = tables;
109 self.udts = udts;
110 self.functions = functions;
111 self.aggregates = aggregates;
112 self.last_refresh = Some(Instant::now());
113
114 Ok(())
115 }
116
117 pub fn keyspace_names(&self) -> Vec<&str> {
121 self.keyspaces.iter().map(|ks| ks.name.as_str()).collect()
122 }
123
124 pub fn table_names(&self, keyspace: &str) -> Vec<&str> {
128 self.tables
129 .get(keyspace)
130 .map(|tables| tables.iter().map(|t| t.name.as_str()).collect())
131 .unwrap_or_default()
132 }
133
134 pub fn column_names(&self, keyspace: &str, table: &str) -> Vec<&str> {
138 self.tables
139 .get(keyspace)
140 .and_then(|tables| tables.iter().find(|t| t.name == table))
141 .map(|t| t.columns.iter().map(|c| c.name.as_str()).collect())
142 .unwrap_or_default()
143 }
144
145 pub fn udt_names(&self, keyspace: &str) -> Vec<&str> {
149 self.udts
150 .get(keyspace)
151 .map(|udts| udts.iter().map(|u| u.name.as_str()).collect())
152 .unwrap_or_default()
153 }
154
155 pub fn function_names(&self, keyspace: &str) -> Vec<&str> {
159 self.functions
160 .get(keyspace)
161 .map(|fns| fns.iter().map(|f| f.name.as_str()).collect())
162 .unwrap_or_default()
163 }
164
165 pub fn aggregate_names(&self, keyspace: &str) -> Vec<&str> {
169 self.aggregates
170 .get(keyspace)
171 .map(|aggs| aggs.iter().map(|a| a.name.as_str()).collect())
172 .unwrap_or_default()
173 }
174
175 #[doc(hidden)]
179 pub fn from_test_data(
180 keyspaces: Vec<crate::driver::KeyspaceMetadata>,
181 tables: std::collections::HashMap<String, Vec<crate::driver::TableMetadata>>,
182 ) -> Self {
183 SchemaCache {
184 keyspaces,
185 tables,
186 udts: Default::default(),
187 functions: Default::default(),
188 aggregates: Default::default(),
189 last_refresh: Some(std::time::Instant::now()),
190 ttl: DEFAULT_TTL,
191 }
192 }
193}
194
195impl Default for SchemaCache {
196 fn default() -> Self {
197 Self::new()
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use crate::driver::{ColumnMetadata, KeyspaceMetadata, TableMetadata, UdtMetadata};
205
206 fn make_keyspace(name: &str) -> KeyspaceMetadata {
209 KeyspaceMetadata {
210 name: name.to_string(),
211 replication: HashMap::new(),
212 durable_writes: true,
213 }
214 }
215
216 fn make_table(keyspace: &str, name: &str, columns: &[(&str, &str)]) -> TableMetadata {
217 TableMetadata {
218 keyspace: keyspace.to_string(),
219 name: name.to_string(),
220 columns: columns
221 .iter()
222 .map(|(col_name, col_type)| ColumnMetadata {
223 name: col_name.to_string(),
224 type_name: col_type.to_string(),
225 })
226 .collect(),
227 partition_key: vec![],
228 clustering_key: vec![],
229 clustering_order: vec![],
230 properties: std::collections::BTreeMap::new(),
231 }
232 }
233
234 fn make_udt(keyspace: &str, name: &str) -> UdtMetadata {
235 UdtMetadata {
236 keyspace: keyspace.to_string(),
237 name: name.to_string(),
238 field_names: vec!["street".to_string()],
239 field_types: vec!["text".to_string()],
240 }
241 }
242
243 fn make_function(keyspace: &str, name: &str) -> FunctionMetadata {
244 FunctionMetadata {
245 keyspace: keyspace.to_string(),
246 name: name.to_string(),
247 argument_types: vec![],
248 return_type: "text".to_string(),
249 }
250 }
251
252 fn make_aggregate(keyspace: &str, name: &str) -> AggregateMetadata {
253 AggregateMetadata {
254 keyspace: keyspace.to_string(),
255 name: name.to_string(),
256 argument_types: vec![],
257 return_type: "int".to_string(),
258 }
259 }
260
261 fn populated_cache() -> SchemaCache {
263 let mut cache = SchemaCache::new();
264
265 cache.keyspaces = vec![make_keyspace("ks1"), make_keyspace("ks2")];
266
267 cache.tables.insert(
268 "ks1".to_string(),
269 vec![
270 make_table("ks1", "users", &[("id", "uuid"), ("name", "text")]),
271 make_table(
272 "ks1",
273 "orders",
274 &[("order_id", "uuid"), ("total", "decimal")],
275 ),
276 ],
277 );
278 cache.tables.insert(
279 "ks2".to_string(),
280 vec![make_table("ks2", "events", &[("event_id", "uuid")])],
281 );
282
283 cache
284 .udts
285 .insert("ks1".to_string(), vec![make_udt("ks1", "address")]);
286
287 cache
288 .functions
289 .insert("ks1".to_string(), vec![make_function("ks1", "my_func")]);
290
291 cache
292 .aggregates
293 .insert("ks1".to_string(), vec![make_aggregate("ks1", "my_agg")]);
294
295 cache.last_refresh = Some(Instant::now());
297
298 cache
299 }
300
301 #[test]
304 fn new_cache_is_empty_and_stale() {
305 let cache = SchemaCache::new();
306 assert!(
307 cache.is_stale(),
308 "fresh cache should be stale (never refreshed)"
309 );
310 assert!(cache.keyspace_names().is_empty());
311 }
312
313 #[test]
314 fn default_ttl_is_thirty_seconds() {
315 let cache = SchemaCache::new();
316 assert_eq!(cache.ttl, Duration::from_secs(30));
317 }
318
319 #[test]
320 fn with_ttl_stores_custom_ttl() {
321 let cache = SchemaCache::with_ttl(Duration::from_secs(60));
322 assert_eq!(cache.ttl, Duration::from_secs(60));
323 }
324
325 #[test]
326 fn default_impl_equals_new() {
327 let a = SchemaCache::new();
328 let b = SchemaCache::default();
329 assert_eq!(a.ttl, b.ttl);
330 assert!(a.keyspace_names().is_empty());
331 assert!(b.keyspace_names().is_empty());
332 }
333
334 #[test]
337 fn freshly_refreshed_cache_is_not_stale() {
338 let cache = populated_cache();
339 assert!(!cache.is_stale());
340 }
341
342 #[test]
343 fn expired_cache_is_stale() {
344 let mut cache = SchemaCache::with_ttl(Duration::from_millis(1));
345 cache.last_refresh = Some(Instant::now() - Duration::from_millis(10));
347 assert!(cache.is_stale());
348 }
349
350 #[test]
351 fn invalidate_marks_cache_stale() {
352 let mut cache = populated_cache();
353 assert!(!cache.is_stale());
354 cache.invalidate();
355 assert!(cache.is_stale());
356 }
357
358 #[test]
359 fn invalidate_preserves_cached_data() {
360 let mut cache = populated_cache();
361 cache.invalidate();
362 assert!(!cache.keyspace_names().is_empty());
364 assert!(!cache.table_names("ks1").is_empty());
365 }
366
367 #[test]
370 fn keyspace_names_returns_all_keyspaces() {
371 let cache = populated_cache();
372 let mut names = cache.keyspace_names();
373 names.sort();
374 assert_eq!(names, vec!["ks1", "ks2"]);
375 }
376
377 #[test]
378 fn keyspace_names_empty_when_no_data() {
379 let cache = SchemaCache::new();
380 assert!(cache.keyspace_names().is_empty());
381 }
382
383 #[test]
386 fn table_names_returns_tables_for_keyspace() {
387 let cache = populated_cache();
388 let mut tables = cache.table_names("ks1");
389 tables.sort();
390 assert_eq!(tables, vec!["orders", "users"]);
391 }
392
393 #[test]
394 fn table_names_empty_for_unknown_keyspace() {
395 let cache = populated_cache();
396 assert!(cache.table_names("nonexistent").is_empty());
397 }
398
399 #[test]
400 fn table_names_single_table_keyspace() {
401 let cache = populated_cache();
402 assert_eq!(cache.table_names("ks2"), vec!["events"]);
403 }
404
405 #[test]
408 fn column_names_returns_columns_for_table() {
409 let cache = populated_cache();
410 let mut cols = cache.column_names("ks1", "users");
411 cols.sort();
412 assert_eq!(cols, vec!["id", "name"]);
413 }
414
415 #[test]
416 fn column_names_empty_for_unknown_table() {
417 let cache = populated_cache();
418 assert!(cache.column_names("ks1", "nonexistent").is_empty());
419 }
420
421 #[test]
422 fn column_names_empty_for_unknown_keyspace() {
423 let cache = populated_cache();
424 assert!(cache.column_names("nonexistent", "users").is_empty());
425 }
426
427 #[test]
428 fn column_names_orders_table() {
429 let cache = populated_cache();
430 let mut cols = cache.column_names("ks1", "orders");
431 cols.sort();
432 assert_eq!(cols, vec!["order_id", "total"]);
433 }
434
435 #[test]
438 fn udt_names_returns_udts_for_keyspace() {
439 let cache = populated_cache();
440 assert_eq!(cache.udt_names("ks1"), vec!["address"]);
441 }
442
443 #[test]
444 fn udt_names_empty_for_keyspace_with_no_udts() {
445 let cache = populated_cache();
446 assert!(cache.udt_names("ks2").is_empty());
447 }
448
449 #[test]
450 fn udt_names_empty_for_unknown_keyspace() {
451 let cache = populated_cache();
452 assert!(cache.udt_names("nonexistent").is_empty());
453 }
454
455 #[test]
458 fn function_names_returns_functions_for_keyspace() {
459 let cache = populated_cache();
460 assert_eq!(cache.function_names("ks1"), vec!["my_func"]);
461 }
462
463 #[test]
464 fn function_names_empty_for_keyspace_with_no_functions() {
465 let cache = populated_cache();
466 assert!(cache.function_names("ks2").is_empty());
467 }
468
469 #[test]
470 fn function_names_empty_for_unknown_keyspace() {
471 let cache = populated_cache();
472 assert!(cache.function_names("nonexistent").is_empty());
473 }
474
475 #[test]
478 fn aggregate_names_returns_aggregates_for_keyspace() {
479 let cache = populated_cache();
480 assert_eq!(cache.aggregate_names("ks1"), vec!["my_agg"]);
481 }
482
483 #[test]
484 fn aggregate_names_empty_for_keyspace_with_no_aggregates() {
485 let cache = populated_cache();
486 assert!(cache.aggregate_names("ks2").is_empty());
487 }
488
489 #[test]
490 fn aggregate_names_empty_for_unknown_keyspace() {
491 let cache = populated_cache();
492 assert!(cache.aggregate_names("nonexistent").is_empty());
493 }
494
495 #[test]
498 fn tables_are_isolated_per_keyspace() {
499 let cache = populated_cache();
500 assert!(!cache.table_names("ks1").contains(&"events"));
501 assert!(!cache.table_names("ks2").contains(&"users"));
502 }
503
504 #[test]
505 fn udts_are_isolated_per_keyspace() {
506 let mut cache = populated_cache();
507 cache
508 .udts
509 .insert("ks2".to_string(), vec![make_udt("ks2", "location")]);
510
511 assert_eq!(cache.udt_names("ks1"), vec!["address"]);
512 assert_eq!(cache.udt_names("ks2"), vec!["location"]);
513 }
514
515 #[test]
518 fn multiple_functions_returned_in_order() {
519 let mut cache = SchemaCache::new();
520 cache.keyspaces = vec![make_keyspace("ks1")];
521 cache.functions.insert(
522 "ks1".to_string(),
523 vec![
524 make_function("ks1", "alpha"),
525 make_function("ks1", "beta"),
526 make_function("ks1", "gamma"),
527 ],
528 );
529 cache.last_refresh = Some(Instant::now());
530
531 assert_eq!(cache.function_names("ks1"), vec!["alpha", "beta", "gamma"]);
532 }
533
534 #[test]
535 fn multiple_aggregates_returned_in_order() {
536 let mut cache = SchemaCache::new();
537 cache.keyspaces = vec![make_keyspace("ks1")];
538 cache.aggregates.insert(
539 "ks1".to_string(),
540 vec![
541 make_aggregate("ks1", "agg_a"),
542 make_aggregate("ks1", "agg_b"),
543 ],
544 );
545 cache.last_refresh = Some(Instant::now());
546
547 assert_eq!(cache.aggregate_names("ks1"), vec!["agg_a", "agg_b"]);
548 }
549
550 #[test]
551 fn table_with_no_columns_returns_empty_column_list() {
552 let mut cache = SchemaCache::new();
553 cache.keyspaces = vec![make_keyspace("ks1")];
554 cache.tables.insert(
555 "ks1".to_string(),
556 vec![make_table("ks1", "empty_table", &[])],
557 );
558 cache.last_refresh = Some(Instant::now());
559
560 assert!(cache.column_names("ks1", "empty_table").is_empty());
561 }
562
563 #[test]
564 fn zero_ttl_cache_is_immediately_stale_after_refresh() {
565 let mut cache = SchemaCache::with_ttl(Duration::ZERO);
566 cache.last_refresh = Some(Instant::now() - Duration::from_nanos(1));
568 assert!(cache.is_stale());
569 }
570}