micromegas_analytics/dfext/
expressions.rs1use chrono::{DateTime, Utc};
2use datafusion::common::plan_err;
3use datafusion::error::DataFusionError;
4use datafusion::execution::context::ExecutionProps;
5use datafusion::logical_expr::simplify::SimplifyContext;
6use datafusion::optimizer::simplify_expressions::ExprSimplifier;
7use datafusion::prelude::*;
8use datafusion::scalar::ScalarValue;
9
10pub fn simplify_exp(expr: &Expr) -> datafusion::error::Result<Expr> {
12 let execution_props = ExecutionProps::new();
13 let info = SimplifyContext::new(&execution_props);
14 ExprSimplifier::new(info).simplify(expr.clone())
15}
16
17pub fn exp_to_string(expr: &Expr) -> datafusion::error::Result<String> {
19 match simplify_exp(expr)? {
20 Expr::Literal(ScalarValue::Utf8(Some(string)), _metadata) => Ok(string),
21 other => {
22 plan_err!("can't convert {other:?} to string")
23 }
24 }
25}
26
27pub fn exp_to_i64(expr: &Expr) -> datafusion::error::Result<i64> {
29 match simplify_exp(expr)? {
30 Expr::Literal(ScalarValue::Int64(Some(value)), _metadata) => Ok(value),
31 other => {
32 plan_err!("can't convert {other:?} to i64")
33 }
34 }
35}
36
37pub fn exp_to_f64(expr: &Expr) -> datafusion::error::Result<f64> {
39 match simplify_exp(expr)? {
40 Expr::Literal(ScalarValue::Float64(Some(value)), _metadata) => Ok(value),
41 other => {
42 plan_err!("can't convert {other:?} to f64")
43 }
44 }
45}
46
47pub fn exp_to_timestamp(expr: &Expr) -> datafusion::error::Result<DateTime<Utc>> {
49 match simplify_exp(expr)? {
50 Expr::Literal(ScalarValue::Utf8(Some(string)), _metadata) => {
51 let ts = chrono::DateTime::parse_from_rfc3339(&string)
52 .map_err(|e| DataFusionError::External(e.into()))?;
53 Ok(ts.into())
54 }
55 Expr::Literal(ScalarValue::TimestampNanosecond(Some(ns), timezone), _metadata) => {
56 if let Some(tz) = timezone
57 && *tz != *"+00:00"
58 {
59 return plan_err!("Timestamp should be in UTC");
60 }
61 Ok(DateTime::from_timestamp_nanos(ns))
62 }
63 other => {
64 plan_err!("can't convert {other:?} to timestamp")
65 }
66 }
67}