micromegas_analytics/lakehouse/
reader_factory.rs1use super::caching_reader::CachingReader;
2use super::file_cache::FileCache;
3use super::metadata_cache::MetadataCache;
4use super::partition_metadata::load_partition_metadata;
5use bytes::Bytes;
6use datafusion::{
7 datasource::{listing::PartitionedFile, physical_plan::ParquetFileReaderFactory},
8 parquet::{
9 arrow::{arrow_reader::ArrowReaderOptions, async_reader::AsyncFileReader},
10 file::metadata::ParquetMetaData,
11 },
12 physical_plan::metrics::ExecutionPlanMetricsSet,
13};
14use futures::future::BoxFuture;
15use micromegas_tracing::prelude::*;
16use object_store::ObjectStore;
17use sqlx::PgPool;
18use std::ops::Range;
19use std::sync::Arc;
20
21pub struct ReaderFactory {
31 object_store: Arc<dyn ObjectStore>,
32 pool: PgPool,
33 metadata_cache: Arc<MetadataCache>,
34 file_cache: Arc<FileCache>,
35}
36
37impl std::fmt::Debug for ReaderFactory {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 f.debug_struct("ReaderFactory")
40 .field("metadata_cache", &self.metadata_cache)
41 .field("file_cache", &self.file_cache)
42 .finish()
43 }
44}
45
46impl ReaderFactory {
47 pub fn new(
49 object_store: Arc<dyn ObjectStore>,
50 pool: PgPool,
51 metadata_cache: Arc<MetadataCache>,
52 file_cache: Arc<FileCache>,
53 ) -> Self {
54 Self {
55 object_store,
56 pool,
57 metadata_cache,
58 file_cache,
59 }
60 }
61}
62
63impl ParquetFileReaderFactory for ReaderFactory {
64 fn create_reader(
65 &self,
66 _partition_index: usize,
67 partitioned_file: PartitionedFile,
68 _metadata_size_hint: Option<usize>,
69 _metrics: &ExecutionPlanMetricsSet,
70 ) -> datafusion::error::Result<Box<dyn AsyncFileReader + Send>> {
71 let path = partitioned_file.path().clone();
73 let filename = path.to_string();
74 let file_size = partitioned_file.object_meta.size;
75
76 let inner = CachingReader::new(
78 Arc::clone(&self.object_store),
79 path,
80 filename.clone(),
81 file_size,
82 Arc::clone(&self.file_cache),
83 );
84
85 Ok(Box::new(ParquetReader {
86 filename,
87 file_size,
88 pool: self.pool.clone(),
89 metadata_cache: Arc::clone(&self.metadata_cache),
90 inner,
91 }))
92 }
93}
94
95pub struct ParquetReader {
98 pub filename: String,
99 pub file_size: u64,
100 pub pool: PgPool,
101 pub metadata_cache: Arc<MetadataCache>,
102 pub inner: CachingReader,
103}
104
105impl AsyncFileReader for ParquetReader {
106 fn get_bytes(
107 &mut self,
108 range: Range<u64>,
109 ) -> BoxFuture<'_, datafusion::parquet::errors::Result<Bytes>> {
110 let filename = self.filename.clone();
111 let file_size = self.file_size;
112 let bytes_requested = range.end - range.start;
113 let inner = &mut self.inner;
114
115 Box::pin(async move {
116 let start = std::time::Instant::now();
117 let result = inner.get_bytes(range).await;
118 let duration_ms = start.elapsed().as_millis();
119 let cache_hit = inner.last_read_was_cache_hit();
120
121 debug!(
122 "parquet_read file={filename} file_size={file_size} bytes={bytes_requested} cache_hit={cache_hit} duration_ms={duration_ms}"
123 );
124
125 result
126 })
127 }
128
129 fn get_byte_ranges(
130 &mut self,
131 ranges: Vec<Range<u64>>,
132 ) -> BoxFuture<'_, datafusion::parquet::errors::Result<Vec<Bytes>>> {
133 let filename = self.filename.clone();
134 let file_size = self.file_size;
135 let num_ranges = ranges.len();
136 let total_bytes: u64 = ranges.iter().map(|r| r.end - r.start).sum();
137 let inner = &mut self.inner;
138
139 Box::pin(async move {
140 let start = std::time::Instant::now();
141 let result = inner.get_byte_ranges(ranges).await;
142 let duration_ms = start.elapsed().as_millis();
143 let cache_hit = inner.last_read_was_cache_hit();
144
145 debug!(
146 "parquet_read file={filename} file_size={file_size} ranges={num_ranges} bytes={total_bytes} cache_hit={cache_hit} duration_ms={duration_ms}"
147 );
148
149 result
150 })
151 }
152
153 fn get_metadata(
154 &mut self,
155 _options: Option<&ArrowReaderOptions>,
156 ) -> BoxFuture<'_, datafusion::parquet::errors::Result<Arc<ParquetMetaData>>> {
157 let metadata_cache = Arc::clone(&self.metadata_cache);
158 let pool = self.pool.clone();
159 let filename = self.filename.clone();
160
161 Box::pin(async move {
162 load_partition_metadata(&pool, &filename, Some(&metadata_cache))
164 .await
165 .map_err(|e| datafusion::parquet::errors::ParquetError::External(e.into()))
166 })
167 }
168}