micromegas_analytics/lakehouse/
partition_metadata.rs1use anyhow::{Context, Result};
2use bytes::Bytes;
3use micromegas_tracing::prelude::*;
4use sqlx::{PgPool, Row};
5use std::sync::Arc;
6
7use super::metadata_cache::MetadataCache;
8use crate::arrow_utils::parse_parquet_metadata;
9use crate::lakehouse::metadata_compat;
10use datafusion::parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
11
12#[allow(deprecated)]
20fn strip_column_index_info(metadata: ParquetMetaData) -> Result<ParquetMetaData> {
21 use datafusion::parquet::file::metadata::ParquetMetaDataWriter;
22 use parquet::format::FileMetaData as ThriftFileMetaData;
23 use parquet::thrift::TSerializable;
24 use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol};
25 let mut buffer = Vec::new();
27 let writer = ParquetMetaDataWriter::new(&mut buffer, &metadata);
28 writer.finish()?;
29 let metadata_len = u32::from_le_bytes([
32 buffer[buffer.len() - 8],
33 buffer[buffer.len() - 7],
34 buffer[buffer.len() - 6],
35 buffer[buffer.len() - 5],
36 ]) as usize;
37 let file_metadata_start = buffer.len() - 8 - metadata_len;
38 let file_metadata_bytes = &buffer[file_metadata_start..buffer.len() - 8];
39 let mut transport =
41 thrift::transport::TBufferChannel::with_capacity(file_metadata_bytes.len(), 0);
42 transport.set_readable_bytes(file_metadata_bytes);
43 let mut protocol = TCompactInputProtocol::new(transport);
44 let mut thrift_meta = ThriftFileMetaData::read_from_in_protocol(&mut protocol)
45 .context("parsing thrift metadata to strip column index")?;
46 for rg in thrift_meta.row_groups.iter_mut() {
48 for col in rg.columns.iter_mut() {
49 col.column_index_offset = None;
50 col.column_index_length = None;
51 col.offset_index_offset = None;
53 col.offset_index_length = None;
54 }
55 }
56 let mut modified_bytes: Vec<u8> = Vec::with_capacity(file_metadata_bytes.len() * 2);
58 let mut out_protocol = TCompactOutputProtocol::new(&mut modified_bytes);
59 thrift_meta
60 .write_to_out_protocol(&mut out_protocol)
61 .context("serializing modified thrift metadata")?;
62 out_protocol.flush()?;
63 ParquetMetaDataReader::decode_metadata(&Bytes::copy_from_slice(&modified_bytes))
65 .context("re-parsing metadata after stripping column index")
66}
67
68#[span_fn]
76pub async fn load_partition_metadata(
77 pool: &PgPool,
78 file_path: &str,
79 cache: Option<&MetadataCache>,
80) -> Result<Arc<ParquetMetaData>> {
81 if let Some(cache) = cache
83 && let Some(metadata) = cache.get(file_path).await
84 {
85 debug!("cache hit for partition metadata path={file_path}");
86 return Ok(metadata);
87 }
88
89 let row = sqlx::query(
91 "SELECT metadata, partition_format_version
92 FROM partition_metadata
93 WHERE file_path = $1",
94 )
95 .bind(file_path)
96 .fetch_one(pool)
97 .await
98 .with_context(|| format!("loading metadata for file: {}", file_path))?;
99
100 let metadata_bytes: Vec<u8> = row.try_get("metadata")?;
101 let partition_format_version: i32 = row.try_get("partition_format_version")?;
102 let serialized_size = metadata_bytes.len() as u32;
103
104 debug!("fetched partition metadata path={file_path} size={serialized_size}");
105 let metadata = match partition_format_version {
107 1 => {
108 let num_rows_row =
110 sqlx::query("SELECT num_rows FROM lakehouse_partitions WHERE file_path = $1")
111 .bind(file_path)
112 .fetch_one(pool)
113 .await
114 .with_context(|| format!("loading num_rows for v1 partition: {}", file_path))?;
115 let num_rows: i64 = num_rows_row.try_get("num_rows")?;
116
117 metadata_compat::parse_legacy_and_upgrade(&metadata_bytes, num_rows)
118 .with_context(|| format!("parsing v1 metadata for file: {}", file_path))?
119 }
120 2 => {
121 parse_parquet_metadata(&metadata_bytes.into())
123 .with_context(|| format!("parsing v2 metadata for file: {}", file_path))?
124 }
125 _ => {
126 return Err(anyhow::anyhow!(
127 "unsupported partition_format_version {} for file: {}",
128 partition_format_version,
129 file_path
130 ));
131 }
132 };
133
134 let stripped = strip_column_index_info(metadata)
137 .with_context(|| format!("stripping column index for file: {}", file_path))?;
138 let result = Arc::new(stripped);
139
140 if let Some(cache) = cache {
142 cache
143 .insert(file_path.to_string(), result.clone(), serialized_size)
144 .await;
145 }
146
147 Ok(result)
148}
149
150#[span_fn]
153pub async fn delete_partition_metadata_batch(
154 tr: &mut sqlx::Transaction<'_, sqlx::Postgres>,
155 file_paths: &[String],
156) -> Result<()> {
157 if file_paths.is_empty() {
158 return Ok(());
159 }
160 let result = sqlx::query("DELETE FROM partition_metadata WHERE file_path = ANY($1)")
162 .bind(file_paths)
163 .execute(&mut **tr)
164 .await
165 .with_context(|| format!("deleting {} metadata entries", file_paths.len()))?;
166 debug!("deleted {} metadata entries", result.rows_affected());
167 Ok(())
168}