micromegas_analytics/
arrow_utils.rs1use anyhow::{Context, Result};
2use bytes::Bytes;
3use datafusion::parquet::file::metadata::ParquetMetaDataReader;
4use datafusion::{
5 arrow::{
6 array::{ListBuilder, StructBuilder, as_struct_array},
7 record_batch::RecordBatch,
8 },
9 parquet::file::metadata::ParquetMetaData,
10};
11use micromegas_tracing::prelude::*;
12
13pub fn make_empty_record_batch() -> RecordBatch {
15 let mut list_builder = ListBuilder::new(StructBuilder::from_fields([], 0));
16 let array = list_builder.finish();
17 as_struct_array(array.values()).into()
18}
19
20#[span_fn]
22pub fn parse_parquet_metadata(bytes: &Bytes) -> Result<ParquetMetaData> {
23 ParquetMetaDataReader::decode_metadata(bytes).with_context(|| "parsing ParquetMetaData")
24}
25
26pub fn serialize_parquet_metadata(pmd: &ParquetMetaData) -> Result<bytes::Bytes> {
36 use datafusion::parquet::file::metadata::ParquetMetaDataWriter;
37 let mut buffer = Vec::new();
39 let md_writer = ParquetMetaDataWriter::new(&mut buffer, pmd);
40 md_writer
41 .finish()
42 .with_context(|| "serializing parquet metadata")?;
43 let serialized = bytes::Bytes::from(buffer);
44 const FOOTER_SIZE: usize = 8; const LENGTH_SIZE: usize = 4;
48 if serialized.len() < FOOTER_SIZE {
49 anyhow::bail!("Serialized metadata too small: {} bytes", serialized.len());
50 }
51 let length_offset = serialized.len() - FOOTER_SIZE;
53 let footer_len_bytes = &serialized[length_offset..length_offset + LENGTH_SIZE];
54 let metadata_len = u32::from_le_bytes(
55 footer_len_bytes
56 .try_into()
57 .with_context(|| "reading footer length")?,
58 ) as usize;
59 let footer_start = serialized
61 .len()
62 .checked_sub(FOOTER_SIZE + metadata_len)
63 .with_context(|| {
64 format!(
65 "Invalid footer length: {} (total size: {})",
66 metadata_len,
67 serialized.len()
68 )
69 })?;
70 let file_metadata_bytes = serialized.slice(footer_start..length_offset);
72 Ok(file_metadata_bytes)
73}