micromegas_analytics/lakehouse/
dataframe_time_bounds.rs1use crate::dfext::typed_column::typed_column;
2use crate::time::TimeRange;
3use anyhow::Result;
4use async_trait::async_trait;
5use chrono::DateTime;
6use datafusion::prelude::*;
7use datafusion::{
8 arrow::array::TimestampNanosecondArray,
9 functions_aggregate::min_max::{max, min},
10};
11use std::fmt::Debug;
12use std::sync::Arc;
13
14#[async_trait]
15pub trait DataFrameTimeBounds: Send + Sync + Debug {
16 async fn get_time_bounds(&self, df: DataFrame) -> Result<TimeRange>;
17}
18
19#[derive(Debug)]
20pub struct NamedColumnsTimeBounds {
21 min_column_name: Arc<String>,
22 max_column_name: Arc<String>,
23}
24
25impl NamedColumnsTimeBounds {
26 pub fn new(min_column_name: Arc<String>, max_column_name: Arc<String>) -> Self {
27 Self {
28 min_column_name,
29 max_column_name,
30 }
31 }
32}
33
34#[async_trait]
35impl DataFrameTimeBounds for NamedColumnsTimeBounds {
36 async fn get_time_bounds(&self, df: DataFrame) -> Result<TimeRange> {
37 let df = df.aggregate(
38 vec![],
39 vec![
40 min(col(&*self.min_column_name)),
41 max(col(&*self.max_column_name)),
42 ],
43 )?;
44 let minmax = df.collect().await?;
45 if minmax.len() != 1 {
46 anyhow::bail!("expected minmax to be size 1");
47 }
48 let minmax = &minmax[0];
49 let min_column: &TimestampNanosecondArray = typed_column(minmax, 0)?;
50 let max_column: &TimestampNanosecondArray = typed_column(minmax, 1)?;
51 if min_column.is_empty() || max_column.is_empty() {
52 anyhow::bail!("expected minmax to be size 1");
53 }
54 Ok(TimeRange::new(
55 DateTime::from_timestamp_nanos(min_column.value(0)),
56 DateTime::from_timestamp_nanos(max_column.value(0)),
57 ))
58 }
59}