micromegas_analytics/lakehouse/
reader_factory.rs

1use super::caching_reader::CachingReader;
2use super::file_cache::FileCache;
3use super::metadata_cache::MetadataCache;
4use super::partition_metadata::load_partition_metadata;
5use bytes::Bytes;
6use datafusion::{
7    datasource::{listing::PartitionedFile, physical_plan::ParquetFileReaderFactory},
8    parquet::{
9        arrow::{arrow_reader::ArrowReaderOptions, async_reader::AsyncFileReader},
10        file::metadata::ParquetMetaData,
11    },
12    physical_plan::metrics::ExecutionPlanMetricsSet,
13};
14use futures::future::BoxFuture;
15use micromegas_tracing::prelude::*;
16use object_store::ObjectStore;
17use sqlx::PgPool;
18use std::ops::Range;
19use std::sync::Arc;
20
21/// A custom [`ParquetFileReaderFactory`] that handles opening parquet files
22/// from object storage, and loads metadata on-demand.
23///
24/// Metadata is cached globally across all readers and queries via a shared
25/// `MetadataCache`, significantly reducing database fetches for repeated
26/// queries on the same partitions.
27///
28/// File contents are cached via a shared `FileCache` with thundering herd
29/// protection, reducing object storage reads for frequently accessed files.
30pub struct ReaderFactory {
31    object_store: Arc<dyn ObjectStore>,
32    pool: PgPool,
33    metadata_cache: Arc<MetadataCache>,
34    file_cache: Arc<FileCache>,
35}
36
37impl std::fmt::Debug for ReaderFactory {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        f.debug_struct("ReaderFactory")
40            .field("metadata_cache", &self.metadata_cache)
41            .field("file_cache", &self.file_cache)
42            .finish()
43    }
44}
45
46impl ReaderFactory {
47    /// Creates a new ReaderFactory with shared metadata and file caches.
48    pub fn new(
49        object_store: Arc<dyn ObjectStore>,
50        pool: PgPool,
51        metadata_cache: Arc<MetadataCache>,
52        file_cache: Arc<FileCache>,
53    ) -> Self {
54        Self {
55            object_store,
56            pool,
57            metadata_cache,
58            file_cache,
59        }
60    }
61}
62
63impl ParquetFileReaderFactory for ReaderFactory {
64    fn create_reader(
65        &self,
66        _partition_index: usize,
67        partitioned_file: PartitionedFile,
68        _metadata_size_hint: Option<usize>,
69        _metrics: &ExecutionPlanMetricsSet,
70    ) -> datafusion::error::Result<Box<dyn AsyncFileReader + Send>> {
71        // todo: don't ignore metrics, report performance of the reader
72        let path = partitioned_file.path().clone();
73        let filename = path.to_string();
74        let file_size = partitioned_file.object_meta.size;
75
76        // CachingReader handles file content caching with thundering herd protection
77        let inner = CachingReader::new(
78            Arc::clone(&self.object_store),
79            path,
80            filename.clone(),
81            file_size,
82            Arc::clone(&self.file_cache),
83        );
84
85        Ok(Box::new(ParquetReader {
86            filename,
87            file_size,
88            pool: self.pool.clone(),
89            metadata_cache: Arc::clone(&self.metadata_cache),
90            inner,
91        }))
92    }
93}
94
95/// A wrapper around a `CachingReader` that loads metadata on-demand
96/// using a shared global cache.
97pub struct ParquetReader {
98    pub filename: String,
99    pub file_size: u64,
100    pub pool: PgPool,
101    pub metadata_cache: Arc<MetadataCache>,
102    pub inner: CachingReader,
103}
104
105impl AsyncFileReader for ParquetReader {
106    fn get_bytes(
107        &mut self,
108        range: Range<u64>,
109    ) -> BoxFuture<'_, datafusion::parquet::errors::Result<Bytes>> {
110        let filename = self.filename.clone();
111        let file_size = self.file_size;
112        let bytes_requested = range.end - range.start;
113        let inner = &mut self.inner;
114
115        Box::pin(async move {
116            let start = std::time::Instant::now();
117            let result = inner.get_bytes(range).await;
118            let duration_ms = start.elapsed().as_millis();
119            let cache_hit = inner.last_read_was_cache_hit();
120
121            debug!(
122                "parquet_read file={filename} file_size={file_size} bytes={bytes_requested} cache_hit={cache_hit} duration_ms={duration_ms}"
123            );
124
125            result
126        })
127    }
128
129    fn get_byte_ranges(
130        &mut self,
131        ranges: Vec<Range<u64>>,
132    ) -> BoxFuture<'_, datafusion::parquet::errors::Result<Vec<Bytes>>> {
133        let filename = self.filename.clone();
134        let file_size = self.file_size;
135        let num_ranges = ranges.len();
136        let total_bytes: u64 = ranges.iter().map(|r| r.end - r.start).sum();
137        let inner = &mut self.inner;
138
139        Box::pin(async move {
140            let start = std::time::Instant::now();
141            let result = inner.get_byte_ranges(ranges).await;
142            let duration_ms = start.elapsed().as_millis();
143            let cache_hit = inner.last_read_was_cache_hit();
144
145            debug!(
146                "parquet_read file={filename} file_size={file_size} ranges={num_ranges} bytes={total_bytes} cache_hit={cache_hit} duration_ms={duration_ms}"
147            );
148
149            result
150        })
151    }
152
153    fn get_metadata(
154        &mut self,
155        _options: Option<&ArrowReaderOptions>,
156    ) -> BoxFuture<'_, datafusion::parquet::errors::Result<Arc<ParquetMetaData>>> {
157        let metadata_cache = Arc::clone(&self.metadata_cache);
158        let pool = self.pool.clone();
159        let filename = self.filename.clone();
160
161        Box::pin(async move {
162            // Load metadata using the shared cache (handles cache hit/miss internally)
163            load_partition_metadata(&pool, &filename, Some(&metadata_cache))
164                .await
165                .map_err(|e| datafusion::parquet::errors::ParquetError::External(e.into()))
166        })
167    }
168}