micromegas_analytics/lakehouse/
caching_reader.rs1use bytes::Bytes;
2use datafusion::parquet::errors::ParquetError;
3use micromegas_tracing::prelude::*;
4use object_store::ObjectStore;
5use std::ops::Range;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8
9use super::file_cache::FileCache;
10
11pub struct CachingReader {
21 object_store: Arc<dyn ObjectStore>,
23 path: object_store::path::Path,
25 filename: String,
26 file_size: u64,
27 file_cache: Arc<FileCache>,
28 cached_data: Option<Bytes>,
30 last_read_was_cache_hit: bool,
32}
33
34impl CachingReader {
35 pub fn new(
36 object_store: Arc<dyn ObjectStore>,
37 path: object_store::path::Path,
38 filename: String,
39 file_size: u64,
40 file_cache: Arc<FileCache>,
41 ) -> Self {
42 Self {
43 object_store,
44 path,
45 filename,
46 file_size,
47 file_cache,
48 cached_data: None,
49 last_read_was_cache_hit: false,
50 }
51 }
52
53 pub fn last_read_was_cache_hit(&self) -> bool {
55 self.last_read_was_cache_hit
56 }
57
58 async fn load_file_data(&mut self) -> datafusion::parquet::errors::Result<Bytes> {
61 if let Some(data) = &self.cached_data {
63 self.last_read_was_cache_hit = true;
64 return Ok(data.clone());
65 }
66
67 let loader_was_called = Arc::new(AtomicBool::new(false));
71 let loader_was_called_clone = Arc::clone(&loader_was_called);
72
73 let object_store = Arc::clone(&self.object_store);
74 let path = self.path.clone();
75 let filename = self.filename.clone();
76 let file_size = self.file_size;
77
78 let data = self
79 .file_cache
80 .get_or_load(&self.filename, self.file_size, || {
81 loader_was_called_clone.store(true, Ordering::SeqCst);
82 async move {
83 debug!("file_cache_load file={filename} file_size={file_size}");
84 let result = object_store.get(&path).await?;
85 result.bytes().await
86 }
87 })
88 .await
89 .map_err(|e| ParquetError::General(e.to_string()))?;
90
91 self.cached_data = Some(data.clone());
92 self.last_read_was_cache_hit = !loader_was_called.load(Ordering::SeqCst);
93 Ok(data)
94 }
95
96 pub async fn get_bytes(
97 &mut self,
98 range: Range<u64>,
99 ) -> datafusion::parquet::errors::Result<Bytes> {
100 if self.file_cache.should_cache(self.file_size) {
101 let data = self.load_file_data().await?;
102 Ok(data.slice(range.start as usize..range.end as usize))
103 } else {
104 self.last_read_was_cache_hit = false;
106 debug!(
107 "file_cache_skip file={} file_size={}",
108 self.filename, self.file_size
109 );
110 self.object_store
111 .get_range(&self.path, range)
112 .await
113 .map_err(|e| ParquetError::External(Box::new(e)))
114 }
115 }
116
117 pub async fn get_byte_ranges(
118 &mut self,
119 ranges: Vec<Range<u64>>,
120 ) -> datafusion::parquet::errors::Result<Vec<Bytes>> {
121 if self.file_cache.should_cache(self.file_size) {
122 let data = self.load_file_data().await?;
123 Ok(ranges
124 .into_iter()
125 .map(|r| data.slice(r.start as usize..r.end as usize))
126 .collect())
127 } else {
128 self.last_read_was_cache_hit = false;
130 debug!(
131 "file_cache_skip file={} file_size={}",
132 self.filename, self.file_size
133 );
134 let results = self
135 .object_store
136 .get_ranges(&self.path, &ranges)
137 .await
138 .map_err(|e| ParquetError::External(Box::new(e)))?;
139 Ok(results)
140 }
141 }
142}