micromegas_analytics/lakehouse/
file_cache.rs1use anyhow::bail;
2use bytes::Bytes;
3use micromegas_tracing::prelude::*;
4use moka::future::Cache;
5use moka::notification::RemovalCause;
6use std::future::Future;
7use std::sync::Arc;
8
9const DEFAULT_CACHE_SIZE_BYTES: u64 = 200 * 1024 * 1024;
11
12const DEFAULT_MAX_FILE_SIZE_BYTES: u64 = 10 * 1024 * 1024;
14
15#[derive(Clone)]
17struct CacheEntry {
18 data: Bytes,
19 file_size: u32,
20 inserted_at: i64,
22}
23
24pub struct FileCache {
30 cache: Cache<String, CacheEntry>,
31 max_file_size: u64,
32}
33
34impl Default for FileCache {
35 fn default() -> Self {
36 Self::new(DEFAULT_CACHE_SIZE_BYTES, DEFAULT_MAX_FILE_SIZE_BYTES)
37 }
38}
39
40impl FileCache {
41 pub fn new(max_capacity_bytes: u64, max_file_size_bytes: u64) -> Self {
43 let cache = Cache::builder()
44 .max_capacity(max_capacity_bytes)
45 .weigher(|_key: &String, entry: &CacheEntry| -> u32 { entry.file_size })
46 .eviction_listener(
47 |_key: Arc<String>, entry: CacheEntry, cause: RemovalCause| {
48 if cause == RemovalCause::Size {
49 let eviction_delay = now() - entry.inserted_at;
51 imetric!("file_cache_eviction_delay", "ticks", eviction_delay as u64);
52 }
53 },
54 )
55 .build();
56
57 Self {
58 cache,
59 max_file_size: max_file_size_bytes,
60 }
61 }
62
63 pub fn should_cache(&self, file_size: u64) -> bool {
65 file_size <= self.max_file_size
66 }
67
68 pub async fn get_or_load<F, Fut, E>(
76 &self,
77 file_path: &str,
78 file_size: u64,
79 loader: F,
80 ) -> anyhow::Result<Bytes>
81 where
82 F: FnOnce() -> Fut,
83 Fut: Future<Output = Result<Bytes, E>>,
84 E: Send + Sync + std::error::Error + 'static,
85 {
86 if file_size > u32::MAX as u64 {
87 bail!(
88 "file too large to cache: {file_size} bytes (max {})",
89 u32::MAX
90 );
91 }
92 let file_size_u32 = file_size as u32;
93 let entry_count = self.cache.entry_count();
95 let result = self
96 .cache
97 .try_get_with(file_path.to_string(), async {
98 let data = loader().await.map_err(|e| anyhow::anyhow!(e))?;
99 imetric!("file_cache_entry_count", "count", entry_count + 1);
100 Ok::<_, anyhow::Error>(CacheEntry {
101 data,
102 file_size: file_size_u32,
103 inserted_at: now(),
104 })
105 })
106 .await
107 .map_err(|e: Arc<anyhow::Error>| anyhow::anyhow!("{e}"))?;
108 Ok(result.data.clone())
109 }
110
111 pub fn stats(&self) -> (u64, u64) {
113 (self.cache.entry_count(), self.cache.weighted_size())
114 }
115
116 pub async fn run_pending_tasks(&self) {
121 self.cache.run_pending_tasks().await;
122 }
123}
124
125impl std::fmt::Debug for FileCache {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 let (entries, size) = self.stats();
128 f.debug_struct("FileCache")
129 .field("entries", &entries)
130 .field("weighted_size_bytes", &size)
131 .field("max_file_size", &self.max_file_size)
132 .finish()
133 }
134}