micromegas_datafusion_extensions/histogram/
quantile.rs

1use datafusion::{
2    arrow::{
3        array::{Float64Array, Float64Builder, UInt64Array},
4        datatypes::DataType,
5    },
6    error::DataFusionError,
7    logical_expr::{ColumnarValue, ScalarUDF, Volatility},
8    prelude::*,
9    scalar::ScalarValue,
10};
11use std::sync::Arc;
12
13use super::histogram_udaf::{HistogramArray, make_histogram_arrow_type};
14
15fn estimate_quantile(
16    ratio: f64,
17    start: f64,
18    end: f64,
19    count_values: u64,
20    bins: &UInt64Array,
21) -> f64 {
22    let quant_count = count_values as f64 * ratio;
23    let mut count = 0;
24    for ibin in 0..bins.len() {
25        let this_bucket_count = bins.value(ibin);
26        count += this_bucket_count;
27        if count as f64 >= quant_count && this_bucket_count > 0 {
28            let pop_bucket_start = (count - bins.value(ibin)) as f64;
29            let pop_bucket_end = count as f64;
30            let bucket_ratio =
31                (quant_count - pop_bucket_start) / (pop_bucket_end - pop_bucket_start);
32            let histo_width = end - start;
33            let bucket_width = histo_width / bins.len() as f64;
34            let begin_bucket = start + ibin as f64 * bucket_width;
35            let end_bucket = start + (ibin as f64 + 1.0) * bucket_width;
36            let estimate = (1.0 - bucket_ratio) * begin_bucket + bucket_ratio * end_bucket;
37            return estimate;
38        }
39    }
40    end
41}
42
43fn quantile_from_histogram(values: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
44    if values.len() != 2 {
45        return Err(DataFusionError::Execution(
46            "wrong number of arguments to quantile_from_histogram".into(),
47        ));
48    }
49
50    let histo_array: HistogramArray = (&values[0]).try_into()?;
51    let mut result_builder = Float64Builder::with_capacity(histo_array.len());
52    for index_histo in 0..histo_array.len() {
53        let ratio = match &values[1] {
54            ColumnarValue::Array(array) => array
55                .as_any()
56                .downcast_ref::<Float64Array>()
57                .ok_or_else(|| DataFusionError::Execution("downcasting to Float64Array".into()))?
58                .value(index_histo),
59            ColumnarValue::Scalar(scalar_value) => {
60                if let ScalarValue::Float64(Some(ratio)) = scalar_value {
61                    *ratio
62                } else {
63                    return Err(DataFusionError::Execution(format!(
64                        "bad ratio {scalar_value:?} in quantile_from_histogram"
65                    )));
66                }
67            }
68        };
69
70        let bins = histo_array.get_bins(index_histo)?;
71        result_builder.append_value(estimate_quantile(
72            ratio,
73            histo_array.get_start(index_histo)?,
74            histo_array.get_end(index_histo)?,
75            histo_array.get_count(index_histo)?,
76            &bins,
77        ));
78    }
79
80    Ok(ColumnarValue::Array(Arc::new(result_builder.finish())))
81}
82
83/// Creates a user-defined function to estimate quantiles from a histogram.
84pub fn make_quantile_from_histogram_udf() -> ScalarUDF {
85    create_udf(
86        "quantile_from_histogram",
87        vec![make_histogram_arrow_type(), DataType::Float64],
88        DataType::Float64,
89        Volatility::Immutable,
90        Arc::new(&quantile_from_histogram),
91    )
92}