micromegas_datafusion_extensions/histogram/
quantile.rs1use datafusion::{
2 arrow::{
3 array::{Float64Array, Float64Builder, UInt64Array},
4 datatypes::DataType,
5 },
6 error::DataFusionError,
7 logical_expr::{ColumnarValue, ScalarUDF, Volatility},
8 prelude::*,
9 scalar::ScalarValue,
10};
11use std::sync::Arc;
12
13use super::histogram_udaf::{HistogramArray, make_histogram_arrow_type};
14
15fn estimate_quantile(
16 ratio: f64,
17 start: f64,
18 end: f64,
19 count_values: u64,
20 bins: &UInt64Array,
21) -> f64 {
22 let quant_count = count_values as f64 * ratio;
23 let mut count = 0;
24 for ibin in 0..bins.len() {
25 let this_bucket_count = bins.value(ibin);
26 count += this_bucket_count;
27 if count as f64 >= quant_count && this_bucket_count > 0 {
28 let pop_bucket_start = (count - bins.value(ibin)) as f64;
29 let pop_bucket_end = count as f64;
30 let bucket_ratio =
31 (quant_count - pop_bucket_start) / (pop_bucket_end - pop_bucket_start);
32 let histo_width = end - start;
33 let bucket_width = histo_width / bins.len() as f64;
34 let begin_bucket = start + ibin as f64 * bucket_width;
35 let end_bucket = start + (ibin as f64 + 1.0) * bucket_width;
36 let estimate = (1.0 - bucket_ratio) * begin_bucket + bucket_ratio * end_bucket;
37 return estimate;
38 }
39 }
40 end
41}
42
43fn quantile_from_histogram(values: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
44 if values.len() != 2 {
45 return Err(DataFusionError::Execution(
46 "wrong number of arguments to quantile_from_histogram".into(),
47 ));
48 }
49
50 let histo_array: HistogramArray = (&values[0]).try_into()?;
51 let mut result_builder = Float64Builder::with_capacity(histo_array.len());
52 for index_histo in 0..histo_array.len() {
53 let ratio = match &values[1] {
54 ColumnarValue::Array(array) => array
55 .as_any()
56 .downcast_ref::<Float64Array>()
57 .ok_or_else(|| DataFusionError::Execution("downcasting to Float64Array".into()))?
58 .value(index_histo),
59 ColumnarValue::Scalar(scalar_value) => {
60 if let ScalarValue::Float64(Some(ratio)) = scalar_value {
61 *ratio
62 } else {
63 return Err(DataFusionError::Execution(format!(
64 "bad ratio {scalar_value:?} in quantile_from_histogram"
65 )));
66 }
67 }
68 };
69
70 let bins = histo_array.get_bins(index_histo)?;
71 result_builder.append_value(estimate_quantile(
72 ratio,
73 histo_array.get_start(index_histo)?,
74 histo_array.get_end(index_histo)?,
75 histo_array.get_count(index_histo)?,
76 &bins,
77 ));
78 }
79
80 Ok(ColumnarValue::Array(Arc::new(result_builder.finish())))
81}
82
83pub fn make_quantile_from_histogram_udf() -> ScalarUDF {
85 create_udf(
86 "quantile_from_histogram",
87 vec![make_histogram_arrow_type(), DataType::Float64],
88 DataType::Float64,
89 Volatility::Immutable,
90 Arc::new(&quantile_from_histogram),
91 )
92}