micromegas_datafusion_extensions/histogram/
accessors.rs

1use super::histogram_udaf::{HistogramArray, make_histogram_arrow_type};
2use datafusion::{
3    arrow::{
4        array::{Float64Builder, UInt64Builder},
5        datatypes::DataType,
6    },
7    error::DataFusionError,
8    logical_expr::{ColumnarValue, ScalarUDF, Volatility},
9    prelude::*,
10};
11use std::sync::Arc;
12
13fn sum_from_histogram(values: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
14    if values.len() != 1 {
15        return Err(DataFusionError::Execution(
16            "wrong number of arguments to sum_from_histogram".into(),
17        ));
18    }
19
20    let histo_array: HistogramArray = (&values[0]).try_into()?;
21    let mut result_builder = Float64Builder::with_capacity(histo_array.len());
22    for index_histo in 0..histo_array.len() {
23        if histo_array.is_null_at(index_histo) {
24            result_builder.append_null();
25            continue;
26        }
27        result_builder.append_value(histo_array.get_sum(index_histo)?);
28    }
29
30    Ok(ColumnarValue::Array(Arc::new(result_builder.finish())))
31}
32
33/// Creates a user-defined function to extract the sum from a histogram.
34pub fn make_sum_from_histogram_udf() -> ScalarUDF {
35    create_udf(
36        "sum_from_histogram",
37        vec![make_histogram_arrow_type()],
38        DataType::Float64,
39        Volatility::Immutable,
40        Arc::new(&sum_from_histogram),
41    )
42}
43
44fn count_from_histogram(values: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
45    if values.len() != 1 {
46        return Err(DataFusionError::Execution(
47            "wrong number of arguments to count_from_histogram".into(),
48        ));
49    }
50
51    let histo_array: HistogramArray = (&values[0]).try_into()?;
52    let mut result_builder = UInt64Builder::with_capacity(histo_array.len());
53    for index_histo in 0..histo_array.len() {
54        if histo_array.is_null_at(index_histo) {
55            result_builder.append_null();
56            continue;
57        }
58        result_builder.append_value(histo_array.get_count(index_histo)?);
59    }
60
61    Ok(ColumnarValue::Array(Arc::new(result_builder.finish())))
62}
63
64/// Creates a user-defined function to extract the count from a histogram.
65pub fn make_count_from_histogram_udf() -> ScalarUDF {
66    create_udf(
67        "count_from_histogram",
68        vec![make_histogram_arrow_type()],
69        DataType::UInt64,
70        Volatility::Immutable,
71        Arc::new(&count_from_histogram),
72    )
73}