micromegas_analytics/dfext/
expressions.rs

1use chrono::{DateTime, Utc};
2use datafusion::common::plan_err;
3use datafusion::error::DataFusionError;
4use datafusion::logical_expr::simplify::SimplifyContext;
5use datafusion::optimizer::simplify_expressions::ExprSimplifier;
6use datafusion::prelude::*;
7use datafusion::scalar::ScalarValue;
8
9/// Simplifies a DataFusion expression.
10pub fn simplify_exp(expr: &Expr) -> datafusion::error::Result<Expr> {
11    let info = SimplifyContext::default();
12    ExprSimplifier::new(info).simplify(expr.clone())
13}
14
15/// Converts a DataFusion expression to a string.
16pub fn exp_to_string(expr: &Expr) -> datafusion::error::Result<String> {
17    match simplify_exp(expr)? {
18        Expr::Literal(ScalarValue::Utf8(Some(string)), _metadata) => Ok(string),
19        other => {
20            plan_err!("can't convert {other:?} to string")
21        }
22    }
23}
24
25/// Converts a DataFusion expression to an i64.
26pub fn exp_to_i64(expr: &Expr) -> datafusion::error::Result<i64> {
27    match simplify_exp(expr)? {
28        Expr::Literal(ScalarValue::Int64(Some(value)), _metadata) => Ok(value),
29        other => {
30            plan_err!("can't convert {other:?} to i64")
31        }
32    }
33}
34
35/// Converts a DataFusion expression to a f64.
36pub fn exp_to_f64(expr: &Expr) -> datafusion::error::Result<f64> {
37    match simplify_exp(expr)? {
38        Expr::Literal(ScalarValue::Float64(Some(value)), _metadata) => Ok(value),
39        other => {
40            plan_err!("can't convert {other:?} to f64")
41        }
42    }
43}
44
45/// Converts a DataFusion expression to a timestamp.
46pub fn exp_to_timestamp(expr: &Expr) -> datafusion::error::Result<DateTime<Utc>> {
47    match simplify_exp(expr)? {
48        Expr::Literal(ScalarValue::Utf8(Some(string)), _metadata) => {
49            let ts = chrono::DateTime::parse_from_rfc3339(&string)
50                .map_err(|e| DataFusionError::External(e.into()))?;
51            Ok(ts.into())
52        }
53        Expr::Literal(ScalarValue::TimestampNanosecond(Some(ns), timezone), _metadata) => {
54            if let Some(tz) = timezone
55                && *tz != *"+00:00"
56            {
57                return plan_err!("Timestamp should be in UTC");
58            }
59            Ok(DateTime::from_timestamp_nanos(ns))
60        }
61        other => {
62            plan_err!("can't convert {other:?} to timestamp")
63        }
64    }
65}