micromegas_analytics/lakehouse/
partition_metadata.rs

1use anyhow::{Context, Result};
2use bytes::Bytes;
3use micromegas_tracing::prelude::*;
4use sqlx::{PgPool, Row};
5use std::sync::Arc;
6
7use super::metadata_cache::MetadataCache;
8use crate::arrow_utils::parse_parquet_metadata;
9use crate::lakehouse::metadata_compat;
10use datafusion::parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
11
12/// Strips column index information from Parquet metadata
13///
14/// This removes column_index_offset and column_index_length from ColumnChunk metadata
15/// to prevent DataFusion from trying to read legacy ColumnIndex structures that may
16/// have incomplete or malformed null_pages fields (required in Arrow 57.0+).
17///
18/// The approach: serialize metadata to thrift, modify it, then re-parse.
19#[allow(deprecated)]
20fn strip_column_index_info(metadata: ParquetMetaData) -> Result<ParquetMetaData> {
21    use datafusion::parquet::file::metadata::ParquetMetaDataWriter;
22    use parquet::format::FileMetaData as ThriftFileMetaData;
23    use parquet::thrift::TSerializable;
24    use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol};
25    // Serialize metadata using ParquetMetaDataWriter
26    let mut buffer = Vec::new();
27    let writer = ParquetMetaDataWriter::new(&mut buffer, &metadata);
28    writer.finish()?;
29    // Extract FileMetaData portion (similar to serialize_parquet_metadata)
30    // Format: [Page Indexes][FileMetaData][Length][PAR1]
31    let metadata_len = u32::from_le_bytes([
32        buffer[buffer.len() - 8],
33        buffer[buffer.len() - 7],
34        buffer[buffer.len() - 6],
35        buffer[buffer.len() - 5],
36    ]) as usize;
37    let file_metadata_start = buffer.len() - 8 - metadata_len;
38    let file_metadata_bytes = &buffer[file_metadata_start..buffer.len() - 8];
39    // Parse FileMetaData with thrift
40    let mut transport =
41        thrift::transport::TBufferChannel::with_capacity(file_metadata_bytes.len(), 0);
42    transport.set_readable_bytes(file_metadata_bytes);
43    let mut protocol = TCompactInputProtocol::new(transport);
44    let mut thrift_meta = ThriftFileMetaData::read_from_in_protocol(&mut protocol)
45        .context("parsing thrift metadata to strip column index")?;
46    // Remove column index information from all row groups and columns
47    for rg in thrift_meta.row_groups.iter_mut() {
48        for col in rg.columns.iter_mut() {
49            col.column_index_offset = None;
50            col.column_index_length = None;
51            // Also remove offset index for consistency
52            col.offset_index_offset = None;
53            col.offset_index_length = None;
54        }
55    }
56    // Re-serialize - use Vec<u8> which auto-grows as needed
57    let mut modified_bytes: Vec<u8> = Vec::with_capacity(file_metadata_bytes.len() * 2);
58    let mut out_protocol = TCompactOutputProtocol::new(&mut modified_bytes);
59    thrift_meta
60        .write_to_out_protocol(&mut out_protocol)
61        .context("serializing modified thrift metadata")?;
62    out_protocol.flush()?;
63    // Parse back to ParquetMetaData
64    ParquetMetaDataReader::decode_metadata(&Bytes::copy_from_slice(&modified_bytes))
65        .context("re-parsing metadata after stripping column index")
66}
67
68/// Load partition metadata by file path from the dedicated metadata table
69///
70/// Dispatches to appropriate parser based on partition_format_version:
71/// - Version 1: Arrow 56.0 format, uses legacy parser with num_rows injection (requires additional query)
72/// - Version 2: Arrow 57.0 format, uses standard parser (fast path, no join)
73///
74/// If a cache is provided, checks it first and stores results after loading.
75#[span_fn]
76pub async fn load_partition_metadata(
77    pool: &PgPool,
78    file_path: &str,
79    cache: Option<&MetadataCache>,
80) -> Result<Arc<ParquetMetaData>> {
81    // Check cache first
82    if let Some(cache) = cache
83        && let Some(metadata) = cache.get(file_path).await
84    {
85        debug!("cache hit for partition metadata path={file_path}");
86        return Ok(metadata);
87    }
88
89    // Fast path: query only partition_metadata table (no join)
90    let row = sqlx::query(
91        "SELECT metadata, partition_format_version
92         FROM partition_metadata
93         WHERE file_path = $1",
94    )
95    .bind(file_path)
96    .fetch_one(pool)
97    .await
98    .with_context(|| format!("loading metadata for file: {}", file_path))?;
99
100    let metadata_bytes: Vec<u8> = row.try_get("metadata")?;
101    let partition_format_version: i32 = row.try_get("partition_format_version")?;
102    let serialized_size = metadata_bytes.len() as u32;
103
104    debug!("fetched partition metadata path={file_path} size={serialized_size}");
105    // Dispatch based on format version
106    let metadata = match partition_format_version {
107        1 => {
108            // Arrow 56.0 format - need num_rows from lakehouse_partitions for legacy parser
109            let num_rows_row =
110                sqlx::query("SELECT num_rows FROM lakehouse_partitions WHERE file_path = $1")
111                    .bind(file_path)
112                    .fetch_one(pool)
113                    .await
114                    .with_context(|| format!("loading num_rows for v1 partition: {}", file_path))?;
115            let num_rows: i64 = num_rows_row.try_get("num_rows")?;
116
117            metadata_compat::parse_legacy_and_upgrade(&metadata_bytes, num_rows)
118                .with_context(|| format!("parsing v1 metadata for file: {}", file_path))?
119        }
120        2 => {
121            // Arrow 57.0 format - use standard parser (no additional query needed)
122            parse_parquet_metadata(&metadata_bytes.into())
123                .with_context(|| format!("parsing v2 metadata for file: {}", file_path))?
124        }
125        _ => {
126            return Err(anyhow::anyhow!(
127                "unsupported partition_format_version {} for file: {}",
128                partition_format_version,
129                file_path
130            ));
131        }
132    };
133
134    // Remove column index information to prevent DataFusion from trying to read
135    // legacy ColumnIndex structures that may have incomplete null_pages fields
136    let stripped = strip_column_index_info(metadata)
137        .with_context(|| format!("stripping column index for file: {}", file_path))?;
138    let result = Arc::new(stripped);
139
140    // Store in cache
141    if let Some(cache) = cache {
142        cache
143            .insert(file_path.to_string(), result.clone(), serialized_size)
144            .await;
145    }
146
147    Ok(result)
148}
149
150/// Delete multiple partition metadata entries in a single transaction
151/// Uses PostgreSQL's ANY() with array to avoid placeholder limits
152#[span_fn]
153pub async fn delete_partition_metadata_batch(
154    tr: &mut sqlx::Transaction<'_, sqlx::Postgres>,
155    file_paths: &[String],
156) -> Result<()> {
157    if file_paths.is_empty() {
158        return Ok(());
159    }
160    // Use PostgreSQL's ANY() with array - no placeholder limits
161    let result = sqlx::query("DELETE FROM partition_metadata WHERE file_path = ANY($1)")
162        .bind(file_paths)
163        .execute(&mut **tr)
164        .await
165        .with_context(|| format!("deleting {} metadata entries", file_paths.len()))?;
166    debug!("deleted {} metadata entries", result.rows_affected());
167    Ok(())
168}