micromegas_datafusion_extensions/histogram/
variance.rs1use super::histogram_udaf::{HistogramArray, make_histogram_arrow_type};
2use datafusion::{
3 arrow::{array::Float64Builder, datatypes::DataType},
4 error::DataFusionError,
5 logical_expr::{ColumnarValue, ScalarUDF, Volatility},
6 prelude::*,
7};
8use std::sync::Arc;
9
10fn compute_variance(n: f64, sum: f64, sum_sq: f64) -> f64 {
11 let mean = sum / n;
12 ((sum_sq / n) - (mean * mean)) * (n / (n - 1.0))
13}
14
15fn variance_from_histogram(values: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
16 if values.len() != 1 {
17 return Err(DataFusionError::Execution(
18 "wrong number of arguments to variance_from_histogram".into(),
19 ));
20 }
21
22 let histo_array: HistogramArray = (&values[0]).try_into()?;
23 let mut result_builder = Float64Builder::with_capacity(histo_array.len());
24 for index_histo in 0..histo_array.len() {
25 if histo_array.is_null_at(index_histo) {
26 result_builder.append_null();
27 continue;
28 }
29 result_builder.append_value(compute_variance(
30 histo_array.get_count(index_histo)? as f64,
31 histo_array.get_sum(index_histo)?,
32 histo_array.get_sum_sq(index_histo)?,
33 ));
34 }
35
36 Ok(ColumnarValue::Array(Arc::new(result_builder.finish())))
37}
38pub fn make_variance_from_histogram_udf() -> ScalarUDF {
40 create_udf(
41 "variance_from_histogram",
42 vec![make_histogram_arrow_type()],
43 DataType::Float64,
44 Volatility::Immutable,
45 Arc::new(&variance_from_histogram),
46 )
47}