micromegas_analytics/dfext/
expressions.rs

1use chrono::{DateTime, Utc};
2use datafusion::common::plan_err;
3use datafusion::error::DataFusionError;
4use datafusion::execution::context::ExecutionProps;
5use datafusion::logical_expr::simplify::SimplifyContext;
6use datafusion::optimizer::simplify_expressions::ExprSimplifier;
7use datafusion::prelude::*;
8use datafusion::scalar::ScalarValue;
9
10/// Simplifies a DataFusion expression.
11pub fn simplify_exp(expr: &Expr) -> datafusion::error::Result<Expr> {
12    let execution_props = ExecutionProps::new();
13    let info = SimplifyContext::new(&execution_props);
14    ExprSimplifier::new(info).simplify(expr.clone())
15}
16
17/// Converts a DataFusion expression to a string.
18pub fn exp_to_string(expr: &Expr) -> datafusion::error::Result<String> {
19    match simplify_exp(expr)? {
20        Expr::Literal(ScalarValue::Utf8(Some(string)), _metadata) => Ok(string),
21        other => {
22            plan_err!("can't convert {other:?} to string")
23        }
24    }
25}
26
27/// Converts a DataFusion expression to an i64.
28pub fn exp_to_i64(expr: &Expr) -> datafusion::error::Result<i64> {
29    match simplify_exp(expr)? {
30        Expr::Literal(ScalarValue::Int64(Some(value)), _metadata) => Ok(value),
31        other => {
32            plan_err!("can't convert {other:?} to i64")
33        }
34    }
35}
36
37/// Converts a DataFusion expression to a f64.
38pub fn exp_to_f64(expr: &Expr) -> datafusion::error::Result<f64> {
39    match simplify_exp(expr)? {
40        Expr::Literal(ScalarValue::Float64(Some(value)), _metadata) => Ok(value),
41        other => {
42            plan_err!("can't convert {other:?} to f64")
43        }
44    }
45}
46
47/// Converts a DataFusion expression to a timestamp.
48pub fn exp_to_timestamp(expr: &Expr) -> datafusion::error::Result<DateTime<Utc>> {
49    match simplify_exp(expr)? {
50        Expr::Literal(ScalarValue::Utf8(Some(string)), _metadata) => {
51            let ts = chrono::DateTime::parse_from_rfc3339(&string)
52                .map_err(|e| DataFusionError::External(e.into()))?;
53            Ok(ts.into())
54        }
55        Expr::Literal(ScalarValue::TimestampNanosecond(Some(ns), timezone), _metadata) => {
56            if let Some(tz) = timezone
57                && *tz != *"+00:00"
58            {
59                return plan_err!("Timestamp should be in UTC");
60            }
61            Ok(DateTime::from_timestamp_nanos(ns))
62        }
63        other => {
64            plan_err!("can't convert {other:?} to timestamp")
65        }
66    }
67}