micromegas/utils/
log_json_rows.rs

1use anyhow::Result;
2use datafusion::arrow::{
3    array::{RecordBatch, StructArray},
4    datatypes::Field,
5    json::{EncoderOptions, writer::make_encoder},
6};
7use micromegas_analytics::dfext::string_column_accessor::string_column_by_name;
8use micromegas_tracing::info;
9use micromegas_tracing::intern_string::intern_string;
10use micromegas_tracing::property_set::{Property, PropertySet};
11use std::{str::from_utf8, sync::Arc};
12
13/// Logs rows from a `RecordBatch` as JSON, with specified columns converted to properties.
14///
15/// This function is useful for logging structured data from a `RecordBatch` in a human-readable format.
16pub async fn log_json_rows(
17    target: &'static str,
18    rbs: &[RecordBatch],
19    columns_as_properties: &[&str],
20) -> Result<()> {
21    let options = EncoderOptions::default();
22    for batch in rbs {
23        let mut prop_columns = vec![];
24        for prop_name in columns_as_properties {
25            let c = string_column_by_name(batch, prop_name)?;
26            prop_columns.push(c);
27        }
28        let mut buffer = Vec::with_capacity(16 * 1024);
29        let array = StructArray::from(batch.clone());
30        let field = Arc::new(Field::new_struct(
31            "",
32            batch.schema().fields().clone(),
33            false,
34        ));
35        let mut encoder = make_encoder(&field, &array, &options)?;
36        assert!(!encoder.has_nulls(), "root cannot be nullable");
37        for idx in 0..batch.num_rows() {
38            let mut properties = vec![Property::new("target", target)];
39            for prop_index in 0..columns_as_properties.len() {
40                properties.push(Property::new(
41                    intern_string(columns_as_properties[prop_index]),
42                    intern_string(prop_columns[prop_index].value(idx)?),
43                ));
44            }
45            let pset = PropertySet::find_or_create(properties);
46
47            encoder.encode(idx, &mut buffer);
48            info!(properties:pset, "{}", from_utf8(&buffer)?);
49            buffer.clear();
50        }
51        drop(prop_columns);
52    }
53    Ok(())
54}