micromegas_analytics/lakehouse/
metadata_compat.rs

1use anyhow::{Context, Result};
2use bytes::Bytes;
3use datafusion::parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
4use micromegas_tracing::prelude::*;
5#[allow(deprecated)]
6use parquet::format::FileMetaData as ThriftFileMetaData;
7use parquet::thrift::TSerializable;
8use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol};
9
10/// Parse legacy metadata (Arrow 56.0) and convert to new format (Arrow 57.0)
11///
12/// This function handles the migration from Arrow 56.0 to 57.0 by:
13/// 1. Parsing legacy metadata using the deprecated thrift API
14/// 2. Injecting the required `num_rows` field if missing or zero
15/// 3. Re-serializing with thrift to produce corrected bytes
16/// 4. Parsing with Arrow 57.0's standard parser
17#[allow(deprecated)]
18pub fn parse_legacy_and_upgrade(metadata_bytes: &[u8], num_rows: i64) -> Result<ParquetMetaData> {
19    // Parse with old thrift API
20    let mut transport = thrift::transport::TBufferChannel::with_capacity(metadata_bytes.len(), 0);
21    transport.set_readable_bytes(metadata_bytes);
22    let mut protocol = TCompactInputProtocol::new(transport);
23    let mut thrift_meta = ThriftFileMetaData::read_from_in_protocol(&mut protocol)
24        .context("parsing legacy metadata with thrift")?;
25
26    // Inject num_rows if missing or zero
27    if thrift_meta.num_rows == 0 {
28        trace!("injecting num_rows={} into legacy metadata", num_rows);
29        thrift_meta.num_rows = num_rows;
30    }
31
32    // Re-serialize with thrift (now has num_rows)
33    // Use Vec<u8> which auto-grows as needed
34    let mut corrected_bytes: Vec<u8> = Vec::with_capacity(metadata_bytes.len() * 2);
35    let mut out_protocol = TCompactOutputProtocol::new(&mut corrected_bytes);
36    thrift_meta
37        .write_to_out_protocol(&mut out_protocol)
38        .context("serializing corrected thrift metadata")?;
39    out_protocol.flush()?;
40
41    // Parse with Arrow 57.0 (should work now)
42    ParquetMetaDataReader::decode_metadata(&Bytes::copy_from_slice(&corrected_bytes))
43        .context("re-parsing with Arrow 57.0")
44}