micromegas_datafusion_extensions/histogram/
accessors.rs

1use super::histogram_udaf::{HistogramArray, make_histogram_arrow_type};
2use datafusion::{
3    arrow::{
4        array::{Float64Builder, UInt64Builder},
5        datatypes::DataType,
6    },
7    error::DataFusionError,
8    logical_expr::{ColumnarValue, ScalarUDF, Volatility},
9    prelude::*,
10};
11use std::sync::Arc;
12
13fn sum_from_histogram(values: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
14    if values.len() != 1 {
15        return Err(DataFusionError::Execution(
16            "wrong number of arguments to sum_from_histogram".into(),
17        ));
18    }
19
20    let histo_array: HistogramArray = (&values[0]).try_into()?;
21    let mut result_builder = Float64Builder::with_capacity(histo_array.len());
22    for index_histo in 0..histo_array.len() {
23        result_builder.append_value(histo_array.get_sum(index_histo)?);
24    }
25
26    Ok(ColumnarValue::Array(Arc::new(result_builder.finish())))
27}
28
29/// Creates a user-defined function to extract the sum from a histogram.
30pub fn make_sum_from_histogram_udf() -> ScalarUDF {
31    create_udf(
32        "sum_from_histogram",
33        vec![make_histogram_arrow_type()],
34        DataType::Float64,
35        Volatility::Immutable,
36        Arc::new(&sum_from_histogram),
37    )
38}
39
40fn count_from_histogram(values: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
41    if values.len() != 1 {
42        return Err(DataFusionError::Execution(
43            "wrong number of arguments to count_from_histogram".into(),
44        ));
45    }
46
47    let histo_array: HistogramArray = (&values[0]).try_into()?;
48    let mut result_builder = UInt64Builder::with_capacity(histo_array.len());
49    for index_histo in 0..histo_array.len() {
50        result_builder.append_value(histo_array.get_count(index_histo)?);
51    }
52
53    Ok(ColumnarValue::Array(Arc::new(result_builder.finish())))
54}
55
56/// Creates a user-defined function to extract the count from a histogram.
57pub fn make_count_from_histogram_udf() -> ScalarUDF {
58    create_udf(
59        "count_from_histogram",
60        vec![make_histogram_arrow_type()],
61        DataType::UInt64,
62        Volatility::Immutable,
63        Arc::new(&count_from_histogram),
64    )
65}