micromegas_analytics/lakehouse/
migration.rs

1use crate::arrow_utils::parse_parquet_metadata;
2use anyhow::{Context, Result};
3use micromegas_ingestion::remote_data_lake::acquire_lock;
4use micromegas_tracing::prelude::*;
5use sqlx::Executor;
6use sqlx::Row;
7
8pub const LATEST_LAKEHOUSE_SCHEMA_VERSION: i32 = 5;
9
10async fn read_lakehouse_schema_version(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> i32 {
11    match sqlx::query(
12        "SELECT version
13         FROM lakehouse_migration;",
14    )
15    .fetch_one(&mut **tr)
16    .await
17    {
18        Ok(row) => row.get("version"),
19        Err(e) => {
20            info!(
21                "Error reading data lake schema version, assuming version 0: {}",
22                e
23            );
24            0
25        }
26    }
27}
28
29/// Migrates the lakehouse schema to the latest version.
30pub async fn migrate_lakehouse(pool: sqlx::Pool<sqlx::Postgres>) -> Result<()> {
31    let mut tr = pool.begin().await?;
32    let mut current_version = read_lakehouse_schema_version(&mut tr).await;
33    drop(tr);
34    info!("current lakehouse schema: {}", current_version);
35    if current_version != LATEST_LAKEHOUSE_SCHEMA_VERSION {
36        let mut tr = pool.begin().await?;
37        acquire_lock(&mut tr, 1).await?;
38        current_version = read_lakehouse_schema_version(&mut pool.begin().await?).await;
39        if LATEST_LAKEHOUSE_SCHEMA_VERSION == current_version {
40            return Ok(());
41        }
42        if let Err(e) = execute_lakehouse_migration(pool.clone()).await {
43            error!("Error migrating database: {}", e);
44            return Err(e);
45        }
46        current_version = read_lakehouse_schema_version(&mut tr).await;
47    }
48    assert_eq!(current_version, LATEST_LAKEHOUSE_SCHEMA_VERSION);
49    Ok(())
50}
51
52/// Executes the lakehouse migration steps.
53async fn execute_lakehouse_migration(pool: sqlx::Pool<sqlx::Postgres>) -> Result<()> {
54    let mut current_version = read_lakehouse_schema_version(&mut pool.begin().await?).await;
55    if 0 == current_version {
56        info!("creating v1 lakehouse_schema");
57        let mut tr = pool.begin().await?;
58        create_tables(&mut tr).await?;
59        current_version = read_lakehouse_schema_version(&mut tr).await;
60        tr.commit().await?;
61    }
62    if 1 == current_version {
63        info!("upgrade lakehouse schema to v2");
64        let mut tr = pool.begin().await?;
65        upgrade_v1_to_v2(&mut tr).await?;
66        current_version = read_lakehouse_schema_version(&mut tr).await;
67        tr.commit().await?;
68    }
69    if 2 == current_version {
70        info!("upgrade lakehouse schema to v3");
71        let mut tr = pool.begin().await?;
72        upgrade_v2_to_v3(&mut tr).await?;
73        current_version = read_lakehouse_schema_version(&mut tr).await;
74        tr.commit().await?;
75    }
76    if 3 == current_version {
77        info!("upgrade lakehouse schema to v4");
78        let mut tr = pool.begin().await?;
79        upgrade_v3_to_v4(&mut tr).await?;
80        current_version = read_lakehouse_schema_version(&mut tr).await;
81        tr.commit().await?;
82    }
83    if 4 == current_version {
84        info!("upgrade lakehouse schema to v5");
85        let mut tr = pool.begin().await?;
86        upgrade_v4_to_v5(&mut tr).await?;
87        current_version = read_lakehouse_schema_version(&mut tr).await;
88        tr.commit().await?;
89    }
90    assert_eq!(current_version, LATEST_LAKEHOUSE_SCHEMA_VERSION);
91    Ok(())
92}
93
94async fn create_partitions_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
95    // Every table in the set shares the same schema - that schema can change over time, in which case the partition has to be rebuilt.
96    // The instance id is a unique key to the table within the table set.
97    //    * Example 1: if there is a process_log table for each process, the instance id could be the process_id.
98    //      It would not clash with an instance of process_metrics table for the same process.
99    //    * Example 2:  if there is a table instance for each metric, the view_instance_id could be the name of the metric.
100
101    // source_data_hash can be used to detect that the partition is out of date (sha1 of the block_ids, for example)
102    tr.execute("
103         CREATE TABLE lakehouse_partitions(
104                  view_set_name VARCHAR(255),
105                  view_instance_id VARCHAR(255),
106                  begin_insert_time TIMESTAMPTZ,
107                  end_insert_time TIMESTAMPTZ,
108                  min_event_time TIMESTAMPTZ,
109                  max_event_time TIMESTAMPTZ,
110                  updated TIMESTAMPTZ,
111                  file_path VARCHAR(2047),
112                  file_size BIGINT,
113                  file_schema_hash bytea,
114                  source_data_hash bytea
115                  );
116         CREATE INDEX lh_part_begin_insert on lakehouse_partitions(view_set_name, view_instance_id, begin_insert_time);
117         CREATE INDEX lh_part_end_insert on lakehouse_partitions(view_set_name, view_instance_id, end_insert_time);
118         CREATE INDEX lh_part_min_time on lakehouse_partitions(view_set_name, view_instance_id, min_event_time);
119         CREATE INDEX lh_part_max_time on lakehouse_partitions(view_set_name, view_instance_id, max_event_time);
120")
121        .await
122        .with_context(|| "Creating table blocks and its indices")?;
123    Ok(())
124}
125
126async fn create_temp_files_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
127    // partitions that are out of date can still be referenced until they expire
128    tr.execute(
129        "
130         CREATE TABLE temporary_files(
131                  file_path VARCHAR(2047),
132                  file_size BIGINT,
133                  expiration TIMESTAMPTZ );
134         CREATE INDEX temporary_files_expiration on temporary_files(expiration);
135",
136    )
137    .await
138    .with_context(|| "Creating temporary_files table")?;
139    Ok(())
140}
141
142async fn create_migration_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
143    sqlx::query("CREATE table lakehouse_migration(version integer);")
144        .execute(&mut **tr)
145        .await
146        .with_context(|| "Creating table lakehouse_migration")?;
147    sqlx::query("INSERT INTO lakehouse_migration VALUES(1);")
148        .execute(&mut **tr)
149        .await
150        .with_context(|| "Recording the initial lakehouse schema version")?;
151    Ok(())
152}
153
154async fn create_tables(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
155    create_partitions_table(tr).await?;
156    create_temp_files_table(tr).await?;
157    create_migration_table(tr).await?;
158    Ok(())
159}
160
161async fn upgrade_v1_to_v2(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
162    // file_metadata is meant to be the serialized ParquetMetaData
163    // which can be found in the footer of the file
164    tr.execute("ALTER TABLE lakehouse_partitions ADD file_metadata bytea;")
165        .await
166        .with_context(|| "adding column file_metadata to lakehouse_partitions table")?;
167    tr.execute("UPDATE lakehouse_migration SET version=2;")
168        .await
169        .with_context(|| "Updating lakehouse schema version to 2")?;
170    Ok(())
171}
172
173async fn upgrade_v2_to_v3(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
174    // Add num_rows column to store row count separately from file_metadata
175    tr.execute("ALTER TABLE lakehouse_partitions ADD num_rows BIGINT;")
176        .await
177        .with_context(|| "adding column num_rows to lakehouse_partitions table")?;
178
179    // Add index on file_path for efficient on-demand metadata loading
180    tr.execute("CREATE INDEX lakehouse_partitions_file_path ON lakehouse_partitions(file_path);")
181        .await
182        .with_context(|| "creating index on file_path")?;
183
184    // Populate num_rows column for existing partitions
185    populate_num_rows_column(tr)
186        .await
187        .with_context(|| "populating num_rows column")?;
188
189    // Make num_rows NOT NULL after populating existing data
190    tr.execute("ALTER TABLE lakehouse_partitions ALTER COLUMN num_rows SET NOT NULL;")
191        .await
192        .with_context(|| "setting num_rows column to NOT NULL")?;
193
194    tr.execute("UPDATE lakehouse_migration SET version=3;")
195        .await
196        .with_context(|| "Updating lakehouse schema version to 3")?;
197    Ok(())
198}
199
200async fn populate_num_rows_column(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
201    info!("populating num_rows column for existing partitions");
202
203    let mut total_count = 0;
204    let batch_size = 1000;
205
206    loop {
207        // Fetch partitions in batches to avoid loading all metadata into memory
208        let rows = sqlx::query("SELECT file_path, file_metadata FROM lakehouse_partitions WHERE file_metadata IS NOT NULL AND num_rows IS NULL LIMIT $1")
209            .bind(batch_size)
210            .fetch_all(&mut **tr)
211            .await?;
212
213        if rows.is_empty() {
214            break;
215        }
216
217        let mut batch_count = 0;
218        for row in rows {
219            let file_path: String = row.try_get("file_path")?;
220            let file_metadata_buffer: Vec<u8> = row.try_get("file_metadata")?;
221
222            // Parse metadata only for this partition
223            match parse_parquet_metadata(&file_metadata_buffer.into()) {
224                Ok(file_metadata) => {
225                    let num_rows = file_metadata.file_metadata().num_rows();
226
227                    // Update just this partition
228                    if let Err(e) = sqlx::query(
229                        "UPDATE lakehouse_partitions SET num_rows = $1 WHERE file_path = $2",
230                    )
231                    .bind(num_rows)
232                    .bind(&file_path)
233                    .execute(&mut **tr)
234                    .await
235                    {
236                        error!(
237                            "failed to update num_rows for partition {}: {}",
238                            file_path, e
239                        );
240                        continue;
241                    }
242
243                    batch_count += 1;
244                }
245                Err(e) => {
246                    error!(
247                        "failed to parse metadata for partition {}: {}",
248                        file_path, e
249                    );
250                    // For partitions with unparseable metadata, set num_rows to 0 as a fallback
251                    if let Err(e2) = sqlx::query(
252                        "UPDATE lakehouse_partitions SET num_rows = 0 WHERE file_path = $1",
253                    )
254                    .bind(&file_path)
255                    .execute(&mut **tr)
256                    .await
257                    {
258                        error!(
259                            "failed to set fallback num_rows for partition {}: {}",
260                            file_path, e2
261                        );
262                    }
263                }
264            }
265        }
266
267        total_count += batch_count;
268        info!(
269            "populated num_rows for {} partitions (total: {})",
270            batch_count, total_count
271        );
272    }
273
274    info!("populated num_rows for {} total partitions", total_count);
275    Ok(())
276}
277
278async fn upgrade_v3_to_v4(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
279    // Create dedicated partition_metadata table
280    tr.execute(
281        "CREATE TABLE partition_metadata(
282            file_path VARCHAR(2047) PRIMARY KEY,
283            metadata bytea NOT NULL,
284            insert_time TIMESTAMPTZ NOT NULL
285        );",
286    )
287    .await
288    .with_context(|| "creating partition_metadata table")?;
289
290    // Migrate existing metadata from lakehouse_partitions to partition_metadata
291    migrate_metadata_to_new_table(tr)
292        .await
293        .with_context(|| "migrating metadata to partition_metadata table")?;
294
295    // Drop the file_metadata column from lakehouse_partitions after successful migration
296    tr.execute("ALTER TABLE lakehouse_partitions DROP COLUMN file_metadata;")
297        .await
298        .with_context(|| "dropping file_metadata column from lakehouse_partitions")?;
299
300    tr.execute("UPDATE lakehouse_migration SET version=4;")
301        .await
302        .with_context(|| "Updating lakehouse schema version to 4")?;
303    Ok(())
304}
305
306async fn migrate_metadata_to_new_table(
307    tr: &mut sqlx::Transaction<'_, sqlx::Postgres>,
308) -> Result<()> {
309    info!("migrating metadata to partition_metadata table");
310
311    // First, get all file paths that have metadata (small data)
312    let file_paths: Vec<String> = sqlx::query_scalar(
313        "SELECT file_path 
314         FROM lakehouse_partitions 
315         WHERE file_metadata IS NOT NULL
316         ORDER BY file_path",
317    )
318    .fetch_all(&mut **tr)
319    .await?;
320
321    let total_to_migrate = file_paths.len();
322    info!(
323        "found {} partitions with metadata to migrate",
324        total_to_migrate
325    );
326
327    let mut total_count = 0;
328    let batch_size = 10; // Small batch size since metadata can be large
329
330    // Process in batches to avoid loading too much metadata at once
331    for chunk in file_paths.chunks(batch_size) {
332        // Build a query to fetch just this batch
333        let placeholders: Vec<String> = (1..=chunk.len()).map(|i| format!("${}", i)).collect();
334        let query_str = format!(
335            "SELECT file_path, file_metadata, updated 
336             FROM lakehouse_partitions 
337             WHERE file_path IN ({})",
338            placeholders.join(", ")
339        );
340
341        let mut query = sqlx::query(&query_str);
342        for path in chunk {
343            query = query.bind(path);
344        }
345
346        let rows = query.fetch_all(&mut **tr).await?;
347
348        for row in rows {
349            let file_path: String = row.try_get("file_path")?;
350            let file_metadata: Vec<u8> = row.try_get("file_metadata")?;
351            let updated: chrono::DateTime<chrono::Utc> = row.try_get("updated")?;
352
353            // Insert into new partition_metadata table (with ON CONFLICT for migration safety)
354            if let Err(e) = sqlx::query(
355                "INSERT INTO partition_metadata (file_path, metadata, insert_time) 
356                 VALUES ($1, $2, $3)
357                 ON CONFLICT (file_path) DO NOTHING",
358            )
359            .bind(&file_path)
360            .bind(&file_metadata)
361            .bind(updated)
362            .execute(&mut **tr)
363            .await
364            {
365                error!(
366                    "failed to migrate metadata for partition {}: {}",
367                    file_path, e
368                );
369                continue;
370            }
371
372            total_count += 1;
373        }
374
375        if total_count % 100 == 0 || total_count == total_to_migrate {
376            info!(
377                "migrated {}/{} partition metadata entries",
378                total_count, total_to_migrate
379            );
380        }
381    }
382
383    info!("migrated metadata for {} total partitions", total_count);
384    Ok(())
385}
386
387async fn upgrade_v4_to_v5(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
388    // Add partition_format_version column to lakehouse_partitions
389    // Default to 1 for existing partitions (Arrow 56.0 format)
390    tr.execute(
391        "ALTER TABLE lakehouse_partitions
392         ADD COLUMN partition_format_version INTEGER NOT NULL DEFAULT 1;",
393    )
394    .await
395    .with_context(|| "adding partition_format_version to lakehouse_partitions")?;
396    // Add partition_format_version column to partition_metadata
397    // Default to 1 for existing metadata (Arrow 56.0 format)
398    tr.execute(
399        "ALTER TABLE partition_metadata
400         ADD COLUMN partition_format_version INTEGER NOT NULL DEFAULT 1;",
401    )
402    .await
403    .with_context(|| "adding partition_format_version to partition_metadata")?;
404    tr.execute("UPDATE lakehouse_migration SET version=5;")
405        .await
406        .with_context(|| "Updating lakehouse schema version to 5")?;
407    info!("added partition_format_version columns to both tables");
408    Ok(())
409}