micromegas_analytics/lakehouse/
metadata_compat.rs1use anyhow::{Context, Result};
2use bytes::Bytes;
3use datafusion::parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
4use micromegas_tracing::prelude::*;
5#[allow(deprecated)]
6use parquet::format::FileMetaData as ThriftFileMetaData;
7use parquet::thrift::TSerializable;
8use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol};
9
10#[allow(deprecated)]
18pub fn parse_legacy_and_upgrade(metadata_bytes: &[u8], num_rows: i64) -> Result<ParquetMetaData> {
19 let mut transport = thrift::transport::TBufferChannel::with_capacity(metadata_bytes.len(), 0);
21 transport.set_readable_bytes(metadata_bytes);
22 let mut protocol = TCompactInputProtocol::new(transport);
23 let mut thrift_meta = ThriftFileMetaData::read_from_in_protocol(&mut protocol)
24 .context("parsing legacy metadata with thrift")?;
25
26 if thrift_meta.num_rows == 0 {
28 trace!("injecting num_rows={} into legacy metadata", num_rows);
29 thrift_meta.num_rows = num_rows;
30 }
31
32 let mut corrected_bytes: Vec<u8> = Vec::with_capacity(metadata_bytes.len() * 2);
35 let mut out_protocol = TCompactOutputProtocol::new(&mut corrected_bytes);
36 thrift_meta
37 .write_to_out_protocol(&mut out_protocol)
38 .context("serializing corrected thrift metadata")?;
39 out_protocol.flush()?;
40
41 ParquetMetaDataReader::decode_metadata(&Bytes::copy_from_slice(&corrected_bytes))
43 .context("re-parsing with Arrow 57.0")
44}