micromegas_analytics/lakehouse/
dataframe_time_bounds.rs

1use crate::dfext::typed_column::typed_column;
2use crate::time::TimeRange;
3use anyhow::Result;
4use async_trait::async_trait;
5use chrono::DateTime;
6use datafusion::prelude::*;
7use datafusion::{
8    arrow::array::TimestampNanosecondArray,
9    functions_aggregate::min_max::{max, min},
10};
11use std::fmt::Debug;
12use std::sync::Arc;
13
14#[async_trait]
15pub trait DataFrameTimeBounds: Send + Sync + Debug {
16    async fn get_time_bounds(&self, df: DataFrame) -> Result<TimeRange>;
17}
18
19#[derive(Debug)]
20pub struct NamedColumnsTimeBounds {
21    min_column_name: Arc<String>,
22    max_column_name: Arc<String>,
23}
24
25impl NamedColumnsTimeBounds {
26    pub fn new(min_column_name: Arc<String>, max_column_name: Arc<String>) -> Self {
27        Self {
28            min_column_name,
29            max_column_name,
30        }
31    }
32}
33
34#[async_trait]
35impl DataFrameTimeBounds for NamedColumnsTimeBounds {
36    async fn get_time_bounds(&self, df: DataFrame) -> Result<TimeRange> {
37        let df = df.aggregate(
38            vec![],
39            vec![
40                min(col(&*self.min_column_name)),
41                max(col(&*self.max_column_name)),
42            ],
43        )?;
44        let minmax = df.collect().await?;
45        if minmax.len() != 1 {
46            anyhow::bail!("expected minmax to be size 1");
47        }
48        let minmax = &minmax[0];
49        let min_column: &TimestampNanosecondArray = typed_column(minmax, 0)?;
50        let max_column: &TimestampNanosecondArray = typed_column(minmax, 1)?;
51        if min_column.is_empty() || max_column.is_empty() {
52            anyhow::bail!("expected minmax to be size 1");
53        }
54        Ok(TimeRange::new(
55            DateTime::from_timestamp_nanos(min_column.value(0)),
56            DateTime::from_timestamp_nanos(max_column.value(0)),
57        ))
58    }
59}