micromegas_analytics/lakehouse/
partition_cache.rs

1use crate::time::TimeRange;
2
3use super::{
4    partition::Partition, partition_metadata::load_partition_metadata, view::ViewMetadata,
5};
6use anyhow::{Context, Result};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use datafusion::parquet::file::metadata::ParquetMetaData;
10use micromegas_tracing::prelude::*;
11use sqlx::{PgPool, Row};
12use std::{fmt, sync::Arc};
13
14/// A partition with its file metadata loaded on-demand.
15/// This is used when you need both the partition data and its parquet metadata.
16#[derive(Clone, Debug)]
17pub struct PartitionWithMetadata {
18    pub partition: Partition,
19    pub file_metadata: Arc<ParquetMetaData>,
20}
21
22/// Convenience function to create a PartitionWithMetadata from an existing Partition.
23/// This loads the file metadata on-demand using the file_path.
24#[span_fn]
25pub async fn partition_with_metadata(
26    partition: Partition,
27    pool: &PgPool,
28) -> Result<PartitionWithMetadata> {
29    let file_path = partition
30        .file_path
31        .as_ref()
32        .ok_or_else(|| anyhow::anyhow!("cannot load metadata for empty partition"))?;
33    let file_metadata = load_partition_metadata(pool, file_path, None)
34        .await
35        .with_context(|| format!("loading metadata for partition: {}", file_path))?;
36    Ok(PartitionWithMetadata {
37        partition,
38        file_metadata,
39    })
40}
41
42/// A trait for providing queryable partitions.
43#[async_trait]
44pub trait QueryPartitionProvider: std::fmt::Display + Send + Sync + std::fmt::Debug {
45    /// Fetches partitions based on the provided criteria.
46    async fn fetch(
47        &self,
48        view_set_name: &str,
49        view_instance_id: &str,
50        query_range: Option<TimeRange>,
51        file_schema_hash: Vec<u8>,
52    ) -> Result<Vec<Partition>>;
53}
54
55/// PartitionCache allows to query partitions based on the insert_time range
56#[derive(Debug)]
57pub struct PartitionCache {
58    pub partitions: Vec<Partition>,
59    insert_range: TimeRange,
60}
61
62impl fmt::Display for PartitionCache {
63    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
64        write!(f, "{self:?}")
65    }
66}
67
68impl PartitionCache {
69    pub fn len(&self) -> usize {
70        self.partitions.len()
71    }
72
73    pub fn is_empty(&self) -> bool {
74        self.partitions.is_empty()
75    }
76
77    /// fetches the partitions of all views matching the specified insert range
78    //todo: this should be limited to global instances
79    //todo: ask for a list of view sets (which would be provided by the views using a get_dependencies() api entry)
80    #[span_fn]
81    pub async fn fetch_overlapping_insert_range(
82        pool: &sqlx::PgPool,
83        insert_range: TimeRange,
84    ) -> Result<Self> {
85        let rows = sqlx::query(
86            "SELECT view_set_name,
87                    view_instance_id,
88                    begin_insert_time,
89                    end_insert_time,
90                    min_event_time,
91                    max_event_time,
92                    updated,
93                    file_path,
94                    file_size,
95                    file_schema_hash,
96                    source_data_hash,
97                    num_rows
98             FROM lakehouse_partitions
99             WHERE begin_insert_time < $1
100             AND end_insert_time > $2
101             ORDER BY begin_insert_time, file_path
102             ;",
103        )
104        .bind(insert_range.end)
105        .bind(insert_range.begin)
106        .fetch_all(pool)
107        .await
108        .with_context(|| "fetching partitions")?;
109        let mut partitions = vec![];
110        for r in rows {
111            let view_metadata = ViewMetadata {
112                view_set_name: Arc::new(r.try_get("view_set_name")?),
113                view_instance_id: Arc::new(r.try_get("view_instance_id")?),
114                file_schema_hash: r.try_get("file_schema_hash")?,
115            };
116            // file_metadata will be loaded on-demand when needed
117            let insert_time_range = TimeRange {
118                begin: r.try_get("begin_insert_time")?,
119                end: r.try_get("end_insert_time")?,
120            };
121            let event_time_range = match (
122                r.try_get::<DateTime<Utc>, _>("min_event_time").ok(),
123                r.try_get::<DateTime<Utc>, _>("max_event_time").ok(),
124            ) {
125                (Some(begin), Some(end)) => Some(TimeRange { begin, end }),
126                (None, None) => None, // Empty partition - both NULL
127                (Some(_), None) | (None, Some(_)) => {
128                    anyhow::bail!(
129                        "Corrupt partition record: only one of min/max_event_time is NULL"
130                    );
131                }
132            };
133            let partition = Partition {
134                view_metadata,
135                insert_time_range,
136                event_time_range,
137                updated: r.try_get("updated")?,
138                file_path: r.try_get::<String, _>("file_path").ok(),
139                file_size: r.try_get("file_size")?,
140                source_data_hash: r.try_get("source_data_hash")?,
141                num_rows: r.try_get("num_rows")?,
142            };
143            partition
144                .validate()
145                .with_context(|| "validating partition from database")?;
146            partitions.push(partition);
147        }
148        Ok(Self {
149            partitions,
150            insert_range,
151        })
152    }
153
154    /// fetches the partitions of a single view instance matching the specified insert range
155    #[span_fn]
156    pub async fn fetch_overlapping_insert_range_for_view(
157        pool: &sqlx::PgPool,
158        view_set_name: Arc<String>,
159        view_instance_id: Arc<String>,
160        insert_range: TimeRange,
161    ) -> Result<Self> {
162        let rows = sqlx::query(
163            "SELECT begin_insert_time,
164                    end_insert_time,
165                    min_event_time,
166                    max_event_time,
167                    updated,
168                    file_path,
169                    file_size,
170                    file_schema_hash,
171                    source_data_hash,
172                    num_rows
173             FROM lakehouse_partitions
174             WHERE begin_insert_time < $1
175             AND end_insert_time > $2
176             AND view_set_name = $3
177             AND view_instance_id = $4
178             ORDER BY begin_insert_time, file_path
179             ;",
180        )
181        .bind(insert_range.end)
182        .bind(insert_range.begin)
183        .bind(&*view_set_name)
184        .bind(&*view_instance_id)
185        .fetch_all(pool)
186        .await
187        .with_context(|| "fetching partitions")?;
188        let mut partitions = vec![];
189        for r in rows {
190            let view_metadata = ViewMetadata {
191                view_set_name: view_set_name.clone(),
192                view_instance_id: view_instance_id.clone(),
193                file_schema_hash: r.try_get("file_schema_hash")?,
194            };
195            // file_metadata will be loaded on-demand when needed
196            let insert_time_range = TimeRange {
197                begin: r.try_get("begin_insert_time")?,
198                end: r.try_get("end_insert_time")?,
199            };
200            let event_time_range = match (
201                r.try_get::<DateTime<Utc>, _>("min_event_time").ok(),
202                r.try_get::<DateTime<Utc>, _>("max_event_time").ok(),
203            ) {
204                (Some(begin), Some(end)) => Some(TimeRange { begin, end }),
205                (None, None) => None, // Empty partition - both NULL
206                (Some(_), None) | (None, Some(_)) => {
207                    anyhow::bail!(
208                        "Corrupt partition record: only one of min/max_event_time is NULL"
209                    );
210                }
211            };
212            let partition = Partition {
213                view_metadata,
214                insert_time_range,
215                event_time_range,
216                updated: r.try_get("updated")?,
217                file_path: r.try_get::<String, _>("file_path").ok(),
218                file_size: r.try_get("file_size")?,
219                source_data_hash: r.try_get("source_data_hash")?,
220                num_rows: r.try_get("num_rows")?,
221            };
222            partition
223                .validate()
224                .with_context(|| "validating partition from database")?;
225            partitions.push(partition);
226        }
227        Ok(Self {
228            partitions,
229            insert_range,
230        })
231    }
232
233    // overlap test for a specific view
234    pub fn filter(
235        &self,
236        view_set_name: &str,
237        view_instance_id: &str,
238        file_schema_hash: &[u8],
239        insert_range: TimeRange,
240    ) -> Self {
241        let mut partitions = vec![];
242        for part in &self.partitions {
243            if *part.view_metadata.view_set_name == view_set_name
244                && *part.view_metadata.view_instance_id == view_instance_id
245                && part.view_metadata.file_schema_hash == file_schema_hash
246                && part.begin_insert_time() < insert_range.end
247                && part.end_insert_time() > insert_range.begin
248            {
249                partitions.push(part.clone());
250            }
251        }
252        Self {
253            partitions,
254            insert_range,
255        }
256    }
257
258    // overlap test for a all views
259    pub fn filter_insert_range(&self, insert_range: TimeRange) -> Self {
260        let mut partitions = vec![];
261        for part in &self.partitions {
262            if part.begin_insert_time() < insert_range.end
263                && part.end_insert_time() > insert_range.begin
264            {
265                partitions.push(part.clone());
266            }
267        }
268        Self {
269            partitions,
270            insert_range,
271        }
272    }
273
274    // single view that fits completely in the specified range
275    pub fn filter_inside_range(
276        &self,
277        view_set_name: &str,
278        view_instance_id: &str,
279        insert_range: TimeRange,
280    ) -> Self {
281        let mut partitions = vec![];
282        for part in &self.partitions {
283            if *part.view_metadata.view_set_name == view_set_name
284                && *part.view_metadata.view_instance_id == view_instance_id
285                && part.begin_insert_time() >= insert_range.begin
286                && part.end_insert_time() <= insert_range.end
287            {
288                partitions.push(part.clone());
289            }
290        }
291        Self {
292            partitions,
293            insert_range,
294        }
295    }
296}
297
298#[async_trait]
299impl QueryPartitionProvider for PartitionCache {
300    /// unlike LivePartitionProvider, the query_range is tested against the insertion time, not the event time
301    #[span_fn]
302    async fn fetch(
303        &self,
304        view_set_name: &str,
305        view_instance_id: &str,
306        query_range: Option<TimeRange>,
307        file_schema_hash: Vec<u8>,
308    ) -> Result<Vec<Partition>> {
309        let mut partitions = vec![];
310        if let Some(range) = query_range {
311            if range.begin < self.insert_range.begin || range.end > self.insert_range.end {
312                anyhow::bail!("filtering from a result set that's not large enough");
313            }
314            for part in &self.partitions {
315                if *part.view_metadata.view_set_name == view_set_name
316                    && *part.view_metadata.view_instance_id == view_instance_id
317                    && part.begin_insert_time() < range.end
318                    && part.end_insert_time() > range.begin
319                    && part.view_metadata.file_schema_hash == file_schema_hash
320                {
321                    partitions.push(part.clone());
322                }
323            }
324        } else {
325            for part in &self.partitions {
326                if *part.view_metadata.view_set_name == view_set_name
327                    && *part.view_metadata.view_instance_id == view_instance_id
328                    && part.view_metadata.file_schema_hash == file_schema_hash
329                {
330                    partitions.push(part.clone());
331                }
332            }
333        }
334        Ok(partitions)
335    }
336}
337
338/// A `QueryPartitionProvider` that fetches partitions directly from the database.
339#[derive(Debug)]
340pub struct LivePartitionProvider {
341    db_pool: PgPool,
342}
343
344impl fmt::Display for LivePartitionProvider {
345    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
346        write!(f, "{self:?}")
347    }
348}
349
350impl LivePartitionProvider {
351    pub fn new(db_pool: PgPool) -> Self {
352        Self { db_pool }
353    }
354}
355
356#[async_trait]
357impl QueryPartitionProvider for LivePartitionProvider {
358    #[span_fn]
359    async fn fetch(
360        &self,
361        view_set_name: &str,
362        view_instance_id: &str,
363        query_range: Option<TimeRange>,
364        file_schema_hash: Vec<u8>,
365    ) -> Result<Vec<Partition>> {
366        let mut partitions = vec![];
367        let rows = if let Some(range) = query_range {
368            sqlx::query(
369                "SELECT view_set_name,
370                    view_instance_id,
371                    begin_insert_time,
372                    end_insert_time,
373                    min_event_time,
374                    max_event_time,
375                    updated,
376                    file_path,
377                    file_size,
378                    file_schema_hash,
379                    source_data_hash,
380                    num_rows
381             FROM lakehouse_partitions
382             WHERE view_set_name = $1
383             AND view_instance_id = $2
384             AND min_event_time <= $3
385             AND max_event_time >= $4
386             AND file_schema_hash = $5
387             ORDER BY begin_insert_time, file_path
388             ;",
389            )
390            .bind(view_set_name)
391            .bind(view_instance_id)
392            .bind(range.end)
393            .bind(range.begin)
394            .bind(file_schema_hash)
395            .fetch_all(&self.db_pool)
396            .await
397            .with_context(|| "listing lakehouse partitions")?
398        } else {
399            sqlx::query(
400                "SELECT view_set_name,
401                    view_instance_id,
402                    begin_insert_time,
403                    end_insert_time,
404                    min_event_time,
405                    max_event_time,
406                    updated,
407                    file_path,
408                    file_size,
409                    file_schema_hash,
410                    source_data_hash,
411                    num_rows
412             FROM lakehouse_partitions
413             WHERE view_set_name = $1
414             AND view_instance_id = $2
415             AND file_schema_hash = $3
416             ORDER BY begin_insert_time, file_path
417             ;",
418            )
419            .bind(view_set_name)
420            .bind(view_instance_id)
421            .bind(file_schema_hash)
422            .fetch_all(&self.db_pool)
423            .await
424            .with_context(|| "listing lakehouse partitions")?
425        };
426        for r in rows {
427            let view_metadata = ViewMetadata {
428                view_set_name: Arc::new(r.try_get("view_set_name")?),
429                view_instance_id: Arc::new(r.try_get("view_instance_id")?),
430                file_schema_hash: r.try_get("file_schema_hash")?,
431            };
432            // file_metadata will be loaded on-demand when needed
433            let insert_time_range = TimeRange {
434                begin: r.try_get("begin_insert_time")?,
435                end: r.try_get("end_insert_time")?,
436            };
437            let event_time_range = match (
438                r.try_get::<DateTime<Utc>, _>("min_event_time").ok(),
439                r.try_get::<DateTime<Utc>, _>("max_event_time").ok(),
440            ) {
441                (Some(begin), Some(end)) => Some(TimeRange { begin, end }),
442                (None, None) => None, // Empty partition - both NULL
443                (Some(_), None) | (None, Some(_)) => {
444                    anyhow::bail!(
445                        "Corrupt partition record: only one of min/max_event_time is NULL"
446                    );
447                }
448            };
449            let partition = Partition {
450                view_metadata,
451                insert_time_range,
452                event_time_range,
453                updated: r.try_get("updated")?,
454                file_path: r.try_get::<String, _>("file_path").ok(),
455                file_size: r.try_get("file_size")?,
456                source_data_hash: r.try_get("source_data_hash")?,
457                num_rows: r.try_get("num_rows")?,
458            };
459            partition
460                .validate()
461                .with_context(|| "validating partition from database")?;
462            partitions.push(partition);
463        }
464        Ok(partitions)
465    }
466}
467
468/// A `QueryPartitionProvider` that always returns an empty list of partitions.
469#[derive(Debug)]
470pub struct NullPartitionProvider {}
471
472impl fmt::Display for NullPartitionProvider {
473    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
474        write!(f, "{self:?}")
475    }
476}
477
478#[async_trait]
479impl QueryPartitionProvider for NullPartitionProvider {
480    async fn fetch(
481        &self,
482        _view_set_name: &str,
483        _view_instance_id: &str,
484        _query_range: Option<TimeRange>,
485        _file_schema_hash: Vec<u8>,
486    ) -> Result<Vec<Partition>> {
487        Ok(vec![])
488    }
489}