micromegas_analytics/lakehouse/
file_cache.rs

1use anyhow::bail;
2use bytes::Bytes;
3use micromegas_tracing::prelude::*;
4use moka::future::Cache;
5use moka::notification::RemovalCause;
6use std::future::Future;
7use std::sync::Arc;
8
9/// Default cache size (200 MB)
10const DEFAULT_CACHE_SIZE_BYTES: u64 = 200 * 1024 * 1024;
11
12/// Default max file size to cache (10 MB)
13const DEFAULT_MAX_FILE_SIZE_BYTES: u64 = 10 * 1024 * 1024;
14
15/// Cache entry storing file data and metadata for weight calculation
16#[derive(Clone)]
17struct CacheEntry {
18    data: Bytes,
19    file_size: u32,
20    /// Timestamp when the entry was inserted (in ticks from now())
21    inserted_at: i64,
22}
23
24/// Global LRU cache for parquet file contents, shared across all readers and queries.
25///
26/// Memory budget is based on file size. Uses moka's `try_get_with` to prevent
27/// thundering herd - concurrent requests for the same uncached file will coalesce
28/// into a single load operation.
29pub struct FileCache {
30    cache: Cache<String, CacheEntry>,
31    max_file_size: u64,
32}
33
34impl Default for FileCache {
35    fn default() -> Self {
36        Self::new(DEFAULT_CACHE_SIZE_BYTES, DEFAULT_MAX_FILE_SIZE_BYTES)
37    }
38}
39
40impl FileCache {
41    /// Creates a new file cache with the specified memory budget and max file size.
42    pub fn new(max_capacity_bytes: u64, max_file_size_bytes: u64) -> Self {
43        let cache = Cache::builder()
44            .max_capacity(max_capacity_bytes)
45            .weigher(|_key: &String, entry: &CacheEntry| -> u32 { entry.file_size })
46            .eviction_listener(
47                |_key: Arc<String>, entry: CacheEntry, cause: RemovalCause| {
48                    if cause == RemovalCause::Size {
49                        // Track eviction delay: time between insertion and eviction due to size pressure
50                        let eviction_delay = now() - entry.inserted_at;
51                        imetric!("file_cache_eviction_delay", "ticks", eviction_delay as u64);
52                    }
53                },
54            )
55            .build();
56
57        Self {
58            cache,
59            max_file_size: max_file_size_bytes,
60        }
61    }
62
63    /// Check if a file should be cached based on its size
64    pub fn should_cache(&self, file_size: u64) -> bool {
65        file_size <= self.max_file_size
66    }
67
68    /// Gets file contents, loading from the provided async function on cache miss.
69    ///
70    /// Uses moka's `try_get_with` to coalesce concurrent requests - if multiple
71    /// callers request the same uncached file simultaneously, only one will
72    /// execute the loader while others wait for the result.
73    ///
74    /// Returns an error if file_size >= 4GB (moka weigher uses u32).
75    pub async fn get_or_load<F, Fut, E>(
76        &self,
77        file_path: &str,
78        file_size: u64,
79        loader: F,
80    ) -> anyhow::Result<Bytes>
81    where
82        F: FnOnce() -> Fut,
83        Fut: Future<Output = Result<Bytes, E>>,
84        E: Send + Sync + std::error::Error + 'static,
85    {
86        if file_size > u32::MAX as u64 {
87            bail!(
88                "file too large to cache: {file_size} bytes (max {})",
89                u32::MAX
90            );
91        }
92        let file_size_u32 = file_size as u32;
93        // Note: entry_count may be stale under concurrent loads of different files (approximate metric)
94        let entry_count = self.cache.entry_count();
95        let result = self
96            .cache
97            .try_get_with(file_path.to_string(), async {
98                let data = loader().await.map_err(|e| anyhow::anyhow!(e))?;
99                imetric!("file_cache_entry_count", "count", entry_count + 1);
100                Ok::<_, anyhow::Error>(CacheEntry {
101                    data,
102                    file_size: file_size_u32,
103                    inserted_at: now(),
104                })
105            })
106            .await
107            .map_err(|e: Arc<anyhow::Error>| anyhow::anyhow!("{e}"))?;
108        Ok(result.data.clone())
109    }
110
111    /// Returns cache statistics (entry_count, weighted_size_bytes).
112    pub fn stats(&self) -> (u64, u64) {
113        (self.cache.entry_count(), self.cache.weighted_size())
114    }
115
116    /// Runs pending cache maintenance tasks.
117    ///
118    /// This should be called to ensure cache statistics are up-to-date,
119    /// particularly useful in test scenarios.
120    pub async fn run_pending_tasks(&self) {
121        self.cache.run_pending_tasks().await;
122    }
123}
124
125impl std::fmt::Debug for FileCache {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        let (entries, size) = self.stats();
128        f.debug_struct("FileCache")
129            .field("entries", &entries)
130            .field("weighted_size_bytes", &size)
131            .field("max_file_size", &self.max_file_size)
132            .finish()
133    }
134}