micromegas/utils/
log_json_rows.rs1use anyhow::Result;
2use datafusion::arrow::{
3 array::{RecordBatch, StructArray},
4 datatypes::Field,
5 json::{EncoderOptions, writer::make_encoder},
6};
7use micromegas_analytics::dfext::string_column_accessor::string_column_by_name;
8use micromegas_tracing::info;
9use micromegas_tracing::intern_string::intern_string;
10use micromegas_tracing::property_set::{Property, PropertySet};
11use std::{str::from_utf8, sync::Arc};
12
13pub async fn log_json_rows(
17 target: &'static str,
18 rbs: &[RecordBatch],
19 columns_as_properties: &[&str],
20) -> Result<()> {
21 let options = EncoderOptions::default();
22 for batch in rbs {
23 let mut prop_columns = vec![];
24 for prop_name in columns_as_properties {
25 let c = string_column_by_name(batch, prop_name)?;
26 prop_columns.push(c);
27 }
28 let mut buffer = Vec::with_capacity(16 * 1024);
29 let array = StructArray::from(batch.clone());
30 let field = Arc::new(Field::new_struct(
31 "",
32 batch.schema().fields().clone(),
33 false,
34 ));
35 let mut encoder = make_encoder(&field, &array, &options)?;
36 assert!(!encoder.has_nulls(), "root cannot be nullable");
37 for idx in 0..batch.num_rows() {
38 let mut properties = vec![Property::new("target", target)];
39 for prop_index in 0..columns_as_properties.len() {
40 properties.push(Property::new(
41 intern_string(columns_as_properties[prop_index]),
42 intern_string(prop_columns[prop_index].value(idx)?),
43 ));
44 }
45 let pset = PropertySet::find_or_create(properties);
46
47 encoder.encode(idx, &mut buffer);
48 info!(properties:pset, "{}", from_utf8(&buffer)?);
49 buffer.clear();
50 }
51 drop(prop_columns);
52 }
53 Ok(())
54}