micromegas_analytics/dfext/
typed_column.rs1use anyhow::{Context, Result};
2use datafusion::arrow::array::{ArrowPrimitiveType, PrimitiveArray, RecordBatch, StringArray};
3
4pub fn typed_column_by_name<'a, T: core::any::Any>(
6 rc: &'a datafusion::arrow::array::RecordBatch,
7 column_name: &str,
8) -> Result<&'a T> {
9 let column = rc
10 .column_by_name(column_name)
11 .with_context(|| format!("getting column {column_name}"))?;
12 column
13 .as_any()
14 .downcast_ref::<T>()
15 .with_context(|| format!("casting {column_name}: {:?}", column.data_type()))
16}
17
18pub fn typed_column<T: core::any::Any>(
20 rc: &datafusion::arrow::array::RecordBatch,
21 index: usize,
22) -> Result<&T> {
23 let column = rc
24 .columns()
25 .get(index)
26 .with_context(|| format!("getting column {index}"))?;
27 column
28 .as_any()
29 .downcast_ref::<T>()
30 .with_context(|| format!("casting {index}: {:?}", column.data_type()))
31}
32
33pub fn get_only_primitive_value<T: ArrowPrimitiveType>(rbs: &[RecordBatch]) -> Result<T::Native> {
35 if rbs.len() != 1 {
36 anyhow::bail!(
37 "get_only_primitive_value given {} record batches",
38 rbs.len()
39 );
40 }
41 let column: &PrimitiveArray<T> = typed_column(&rbs[0], 0)?;
42 Ok(column.value(0))
43}
44
45pub fn get_only_string_value(rbs: &[RecordBatch]) -> Result<String> {
47 if rbs.len() != 1 {
48 anyhow::bail!("get_only_string_value given {} record batches", rbs.len());
49 }
50 let column: &StringArray = typed_column(&rbs[0], 0)?;
51 Ok(column.value(0).into())
52}
53
54pub fn get_single_row_primitive_value_by_name<T: ArrowPrimitiveType>(
56 rbs: &[RecordBatch],
57 column_name: &str,
58) -> Result<T::Native> {
59 if rbs.len() != 1 {
60 anyhow::bail!(
61 "get_single_row_primitive_value given {} record batches",
62 rbs.len()
63 );
64 }
65 let column: &PrimitiveArray<T> = typed_column_by_name(&rbs[0], column_name)?;
66 Ok(column.value(0))
67}
68
69pub fn get_single_row_primitive_value<T: ArrowPrimitiveType>(
71 rbs: &[RecordBatch],
72 column_index: usize,
73) -> Result<T::Native> {
74 if rbs.len() != 1 {
75 anyhow::bail!(
76 "get_single_row_primitive_value given {} record batches",
77 rbs.len()
78 );
79 }
80 let column: &PrimitiveArray<T> = typed_column(&rbs[0], column_index)?;
81 Ok(column.value(0))
82}