micromegas_analytics/
arrow_utils.rs

1use anyhow::{Context, Result};
2use bytes::Bytes;
3use datafusion::parquet::file::metadata::ParquetMetaDataReader;
4use datafusion::{
5    arrow::{
6        array::{ListBuilder, StructBuilder, as_struct_array},
7        record_batch::RecordBatch,
8    },
9    parquet::file::metadata::ParquetMetaData,
10};
11use micromegas_tracing::prelude::*;
12
13/// Creates an empty record batch with an empty schema.
14pub fn make_empty_record_batch() -> RecordBatch {
15    let mut list_builder = ListBuilder::new(StructBuilder::from_fields([], 0));
16    let array = list_builder.finish();
17    as_struct_array(array.values()).into()
18}
19
20/// Parses Parquet metadata from a byte slice.
21#[span_fn]
22pub fn parse_parquet_metadata(bytes: &Bytes) -> Result<ParquetMetaData> {
23    ParquetMetaDataReader::decode_metadata(bytes).with_context(|| "parsing ParquetMetaData")
24}
25
26/// Serializes Parquet metadata to a byte slice.
27///
28/// This uses `ParquetMetaDataWriter` to serialize the metadata, then extracts
29/// just the FileMetaData portion that `decode_metadata()` expects.
30///
31/// ## Background
32/// `ParquetMetaDataWriter` outputs: \[Page Indexes\]\[FileMetaData\]\[Length\]\[PAR1\]
33/// But `decode_metadata()` expects just the raw FileMetaData thrift bytes.
34/// We extract the FileMetaData portion using the footer length field.
35pub fn serialize_parquet_metadata(pmd: &ParquetMetaData) -> Result<bytes::Bytes> {
36    use datafusion::parquet::file::metadata::ParquetMetaDataWriter;
37    // Serialize the full footer format
38    let mut buffer = Vec::new();
39    let md_writer = ParquetMetaDataWriter::new(&mut buffer, pmd);
40    md_writer
41        .finish()
42        .with_context(|| "serializing parquet metadata")?;
43    let serialized = bytes::Bytes::from(buffer);
44    // Extract just the FileMetaData portion using Parquet footer format
45    // The footer structure is: [...][FileMetaData][metadata_len: u32][magic: u32]
46    const FOOTER_SIZE: usize = 8; // 4 bytes for length + 4 bytes for PAR1 magic
47    const LENGTH_SIZE: usize = 4;
48    if serialized.len() < FOOTER_SIZE {
49        anyhow::bail!("Serialized metadata too small: {} bytes", serialized.len());
50    }
51    // Read the FileMetaData length from the footer
52    let length_offset = serialized.len() - FOOTER_SIZE;
53    let footer_len_bytes = &serialized[length_offset..length_offset + LENGTH_SIZE];
54    let metadata_len = u32::from_le_bytes(
55        footer_len_bytes
56            .try_into()
57            .with_context(|| "reading footer length")?,
58    ) as usize;
59    // Calculate where FileMetaData starts
60    let footer_start = serialized
61        .len()
62        .checked_sub(FOOTER_SIZE + metadata_len)
63        .with_context(|| {
64            format!(
65                "Invalid footer length: {} (total size: {})",
66                metadata_len,
67                serialized.len()
68            )
69        })?;
70    // Extract just the FileMetaData bytes (excluding page indexes and footer suffix)
71    let file_metadata_bytes = serialized.slice(footer_start..length_offset);
72    Ok(file_metadata_bytes)
73}