micromegas_datafusion_extensions/histogram/
accessors.rs1use super::histogram_udaf::{HistogramArray, make_histogram_arrow_type};
2use datafusion::{
3 arrow::{
4 array::{Float64Builder, UInt64Builder},
5 datatypes::DataType,
6 },
7 error::DataFusionError,
8 logical_expr::{ColumnarValue, ScalarUDF, Volatility},
9 prelude::*,
10};
11use std::sync::Arc;
12
13fn sum_from_histogram(values: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
14 if values.len() != 1 {
15 return Err(DataFusionError::Execution(
16 "wrong number of arguments to sum_from_histogram".into(),
17 ));
18 }
19
20 let histo_array: HistogramArray = (&values[0]).try_into()?;
21 let mut result_builder = Float64Builder::with_capacity(histo_array.len());
22 for index_histo in 0..histo_array.len() {
23 if histo_array.is_null_at(index_histo) {
24 result_builder.append_null();
25 continue;
26 }
27 result_builder.append_value(histo_array.get_sum(index_histo)?);
28 }
29
30 Ok(ColumnarValue::Array(Arc::new(result_builder.finish())))
31}
32
33pub fn make_sum_from_histogram_udf() -> ScalarUDF {
35 create_udf(
36 "sum_from_histogram",
37 vec![make_histogram_arrow_type()],
38 DataType::Float64,
39 Volatility::Immutable,
40 Arc::new(&sum_from_histogram),
41 )
42}
43
44fn count_from_histogram(values: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
45 if values.len() != 1 {
46 return Err(DataFusionError::Execution(
47 "wrong number of arguments to count_from_histogram".into(),
48 ));
49 }
50
51 let histo_array: HistogramArray = (&values[0]).try_into()?;
52 let mut result_builder = UInt64Builder::with_capacity(histo_array.len());
53 for index_histo in 0..histo_array.len() {
54 if histo_array.is_null_at(index_histo) {
55 result_builder.append_null();
56 continue;
57 }
58 result_builder.append_value(histo_array.get_count(index_histo)?);
59 }
60
61 Ok(ColumnarValue::Array(Arc::new(result_builder.finish())))
62}
63
64pub fn make_count_from_histogram_udf() -> ScalarUDF {
66 create_udf(
67 "count_from_histogram",
68 vec![make_histogram_arrow_type()],
69 DataType::UInt64,
70 Volatility::Immutable,
71 Arc::new(&count_from_histogram),
72 )
73}