micromegas_analytics/lakehouse/
caching_reader.rs

1use bytes::Bytes;
2use datafusion::parquet::errors::ParquetError;
3use micromegas_tracing::prelude::*;
4use object_store::ObjectStore;
5use std::ops::Range;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8
9use super::file_cache::FileCache;
10
11/// Adds file content caching to object store reads.
12///
13/// This is an internal component used by `ParquetReader`, not a standalone `AsyncFileReader`.
14/// It only provides `get_bytes` and `get_byte_ranges` methods - metadata handling remains
15/// in the `ParquetReader` layer.
16///
17/// Uses a two-level caching strategy:
18/// 1. Local `cached_data` - avoids global cache lookups within a single reader
19/// 2. Global `FileCache` - shared across all readers, with thundering herd protection
20pub struct CachingReader {
21    /// Object store for loading uncached files (shared, cloneable)
22    object_store: Arc<dyn ObjectStore>,
23    /// Path to the file in object store
24    path: object_store::path::Path,
25    filename: String,
26    file_size: u64,
27    file_cache: Arc<FileCache>,
28    /// Local cache of file data for this reader instance
29    cached_data: Option<Bytes>,
30    /// Whether the most recent read operation was served from cache
31    last_read_was_cache_hit: bool,
32}
33
34impl CachingReader {
35    pub fn new(
36        object_store: Arc<dyn ObjectStore>,
37        path: object_store::path::Path,
38        filename: String,
39        file_size: u64,
40        file_cache: Arc<FileCache>,
41    ) -> Self {
42        Self {
43            object_store,
44            path,
45            filename,
46            file_size,
47            file_cache,
48            cached_data: None,
49            last_read_was_cache_hit: false,
50        }
51    }
52
53    /// Returns whether the most recent read operation was served from cache.
54    pub fn last_read_was_cache_hit(&self) -> bool {
55        self.last_read_was_cache_hit
56    }
57
58    /// Load file data, using cache with thundering herd protection.
59    /// Returns the data and sets `last_read_was_cache_hit` accordingly.
60    async fn load_file_data(&mut self) -> datafusion::parquet::errors::Result<Bytes> {
61        // Check local cache first (avoids global cache lookup)
62        if let Some(data) = &self.cached_data {
63            self.last_read_was_cache_hit = true;
64            return Ok(data.clone());
65        }
66
67        // Use get_or_load for thundering herd protection - concurrent requests
68        // for the same file will coalesce into a single object store fetch.
69        // Track whether the loader was called to determine cache hit/miss.
70        let loader_was_called = Arc::new(AtomicBool::new(false));
71        let loader_was_called_clone = Arc::clone(&loader_was_called);
72
73        let object_store = Arc::clone(&self.object_store);
74        let path = self.path.clone();
75        let filename = self.filename.clone();
76        let file_size = self.file_size;
77
78        let data = self
79            .file_cache
80            .get_or_load(&self.filename, self.file_size, || {
81                loader_was_called_clone.store(true, Ordering::SeqCst);
82                async move {
83                    debug!("file_cache_load file={filename} file_size={file_size}");
84                    let result = object_store.get(&path).await?;
85                    result.bytes().await
86                }
87            })
88            .await
89            .map_err(|e| ParquetError::General(e.to_string()))?;
90
91        self.cached_data = Some(data.clone());
92        self.last_read_was_cache_hit = !loader_was_called.load(Ordering::SeqCst);
93        Ok(data)
94    }
95
96    pub async fn get_bytes(
97        &mut self,
98        range: Range<u64>,
99    ) -> datafusion::parquet::errors::Result<Bytes> {
100        if self.file_cache.should_cache(self.file_size) {
101            let data = self.load_file_data().await?;
102            Ok(data.slice(range.start as usize..range.end as usize))
103        } else {
104            // Large file - read directly from object store (bypass cache)
105            self.last_read_was_cache_hit = false;
106            debug!(
107                "file_cache_skip file={} file_size={}",
108                self.filename, self.file_size
109            );
110            self.object_store
111                .get_range(&self.path, range)
112                .await
113                .map_err(|e| ParquetError::External(Box::new(e)))
114        }
115    }
116
117    pub async fn get_byte_ranges(
118        &mut self,
119        ranges: Vec<Range<u64>>,
120    ) -> datafusion::parquet::errors::Result<Vec<Bytes>> {
121        if self.file_cache.should_cache(self.file_size) {
122            let data = self.load_file_data().await?;
123            Ok(ranges
124                .into_iter()
125                .map(|r| data.slice(r.start as usize..r.end as usize))
126                .collect())
127        } else {
128            // Large file - use object_store's get_ranges for efficient multi-range fetch
129            self.last_read_was_cache_hit = false;
130            debug!(
131                "file_cache_skip file={} file_size={}",
132                self.filename, self.file_size
133            );
134            let results = self
135                .object_store
136                .get_ranges(&self.path, &ranges)
137                .await
138                .map_err(|e| ParquetError::External(Box::new(e)))?;
139            Ok(results)
140        }
141    }
142}