micromegas_analytics/lakehouse/
metadata_cache.rs1use datafusion::parquet::file::metadata::ParquetMetaData;
2use micromegas_tracing::prelude::*;
3use moka::future::Cache;
4use moka::notification::RemovalCause;
5use std::sync::Arc;
6
7const DEFAULT_CACHE_SIZE_BYTES: u64 = 10 * 1024 * 1024;
9
10#[derive(Clone)]
12struct CacheEntry {
13 metadata: Arc<ParquetMetaData>,
14 serialized_size: u32,
15 inserted_at: i64,
17}
18
19pub struct MetadataCache {
23 cache: Cache<String, CacheEntry>,
24}
25
26impl Default for MetadataCache {
27 fn default() -> Self {
28 Self::new(DEFAULT_CACHE_SIZE_BYTES)
29 }
30}
31
32impl MetadataCache {
33 pub fn new(max_capacity_bytes: u64) -> Self {
35 let cache = Cache::builder()
36 .max_capacity(max_capacity_bytes)
37 .weigher(|_key: &String, entry: &CacheEntry| -> u32 { entry.serialized_size })
38 .eviction_listener(
39 |_key: Arc<String>, entry: CacheEntry, cause: RemovalCause| {
40 if cause == RemovalCause::Size {
41 let eviction_delay = now() - entry.inserted_at;
43 imetric!(
44 "metadata_cache_eviction_delay",
45 "ticks",
46 eviction_delay as u64
47 );
48 }
49 },
50 )
51 .build();
52 Self { cache }
53 }
54
55 pub async fn get(&self, file_path: &str) -> Option<Arc<ParquetMetaData>> {
57 self.cache.get(file_path).await.map(|e| e.metadata.clone())
58 }
59
60 pub async fn insert(
62 &self,
63 file_path: String,
64 metadata: Arc<ParquetMetaData>,
65 serialized_size: u32,
66 ) {
67 self.cache
68 .insert(
69 file_path,
70 CacheEntry {
71 metadata,
72 serialized_size,
73 inserted_at: now(),
74 },
75 )
76 .await;
77 imetric!(
78 "metadata_cache_entry_count",
79 "count",
80 self.cache.entry_count()
81 );
82 }
83
84 pub fn stats(&self) -> (u64, u64) {
86 (self.cache.entry_count(), self.cache.weighted_size())
87 }
88}
89
90impl std::fmt::Debug for MetadataCache {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 let (entries, size) = self.stats();
93 f.debug_struct("MetadataCache")
94 .field("entries", &entries)
95 .field("weighted_size_bytes", &size)
96 .finish()
97 }
98}