micromegas_analytics/lakehouse/
metadata_cache.rs

1use datafusion::parquet::file::metadata::ParquetMetaData;
2use micromegas_tracing::prelude::*;
3use moka::future::Cache;
4use moka::notification::RemovalCause;
5use std::sync::Arc;
6
7/// Default cache size for batch operations (10 MB)
8const DEFAULT_CACHE_SIZE_BYTES: u64 = 10 * 1024 * 1024;
9
10/// Cache entry storing metadata and its serialized size for weight calculation
11#[derive(Clone)]
12struct CacheEntry {
13    metadata: Arc<ParquetMetaData>,
14    serialized_size: u32,
15    /// Timestamp when the entry was inserted (in ticks from now())
16    inserted_at: i64,
17}
18
19/// Global LRU cache for partition metadata, shared across all readers and queries.
20///
21/// Memory budget is based on serialized metadata size.
22pub struct MetadataCache {
23    cache: Cache<String, CacheEntry>,
24}
25
26impl Default for MetadataCache {
27    fn default() -> Self {
28        Self::new(DEFAULT_CACHE_SIZE_BYTES)
29    }
30}
31
32impl MetadataCache {
33    /// Creates a new metadata cache with the specified memory budget in bytes.
34    pub fn new(max_capacity_bytes: u64) -> Self {
35        let cache = Cache::builder()
36            .max_capacity(max_capacity_bytes)
37            .weigher(|_key: &String, entry: &CacheEntry| -> u32 { entry.serialized_size })
38            .eviction_listener(
39                |_key: Arc<String>, entry: CacheEntry, cause: RemovalCause| {
40                    if cause == RemovalCause::Size {
41                        // Track eviction delay: time between insertion and eviction due to size pressure
42                        let eviction_delay = now() - entry.inserted_at;
43                        imetric!(
44                            "metadata_cache_eviction_delay",
45                            "ticks",
46                            eviction_delay as u64
47                        );
48                    }
49                },
50            )
51            .build();
52        Self { cache }
53    }
54
55    /// Gets cached metadata for the given file path, if present.
56    pub async fn get(&self, file_path: &str) -> Option<Arc<ParquetMetaData>> {
57        self.cache.get(file_path).await.map(|e| e.metadata.clone())
58    }
59
60    /// Inserts metadata into the cache with its serialized size for weight calculation.
61    pub async fn insert(
62        &self,
63        file_path: String,
64        metadata: Arc<ParquetMetaData>,
65        serialized_size: u32,
66    ) {
67        self.cache
68            .insert(
69                file_path,
70                CacheEntry {
71                    metadata,
72                    serialized_size,
73                    inserted_at: now(),
74                },
75            )
76            .await;
77        imetric!(
78            "metadata_cache_entry_count",
79            "count",
80            self.cache.entry_count()
81        );
82    }
83
84    /// Returns cache statistics (entry_count, weighted_size_bytes).
85    pub fn stats(&self) -> (u64, u64) {
86        (self.cache.entry_count(), self.cache.weighted_size())
87    }
88}
89
90impl std::fmt::Debug for MetadataCache {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        let (entries, size) = self.stats();
93        f.debug_struct("MetadataCache")
94            .field("entries", &entries)
95            .field("weighted_size_bytes", &size)
96            .finish()
97    }
98}