micromegas_analytics/lakehouse/
migration.rs1use crate::arrow_utils::parse_parquet_metadata;
2use anyhow::{Context, Result};
3use micromegas_ingestion::remote_data_lake::acquire_lock;
4use micromegas_tracing::prelude::*;
5use sqlx::Executor;
6use sqlx::Row;
7
8pub const LATEST_LAKEHOUSE_SCHEMA_VERSION: i32 = 5;
9
10async fn read_lakehouse_schema_version(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> i32 {
11 match sqlx::query(
12 "SELECT version
13 FROM lakehouse_migration;",
14 )
15 .fetch_one(&mut **tr)
16 .await
17 {
18 Ok(row) => row.get("version"),
19 Err(e) => {
20 info!(
21 "Error reading data lake schema version, assuming version 0: {}",
22 e
23 );
24 0
25 }
26 }
27}
28
29pub async fn migrate_lakehouse(pool: sqlx::Pool<sqlx::Postgres>) -> Result<()> {
31 let mut tr = pool.begin().await?;
32 let mut current_version = read_lakehouse_schema_version(&mut tr).await;
33 drop(tr);
34 info!("current lakehouse schema: {}", current_version);
35 if current_version != LATEST_LAKEHOUSE_SCHEMA_VERSION {
36 let mut tr = pool.begin().await?;
37 acquire_lock(&mut tr, 1).await?;
38 current_version = read_lakehouse_schema_version(&mut pool.begin().await?).await;
39 if LATEST_LAKEHOUSE_SCHEMA_VERSION == current_version {
40 return Ok(());
41 }
42 if let Err(e) = execute_lakehouse_migration(pool.clone()).await {
43 error!("Error migrating database: {}", e);
44 return Err(e);
45 }
46 current_version = read_lakehouse_schema_version(&mut tr).await;
47 }
48 assert_eq!(current_version, LATEST_LAKEHOUSE_SCHEMA_VERSION);
49 Ok(())
50}
51
52async fn execute_lakehouse_migration(pool: sqlx::Pool<sqlx::Postgres>) -> Result<()> {
54 let mut current_version = read_lakehouse_schema_version(&mut pool.begin().await?).await;
55 if 0 == current_version {
56 info!("creating v1 lakehouse_schema");
57 let mut tr = pool.begin().await?;
58 create_tables(&mut tr).await?;
59 current_version = read_lakehouse_schema_version(&mut tr).await;
60 tr.commit().await?;
61 }
62 if 1 == current_version {
63 info!("upgrade lakehouse schema to v2");
64 let mut tr = pool.begin().await?;
65 upgrade_v1_to_v2(&mut tr).await?;
66 current_version = read_lakehouse_schema_version(&mut tr).await;
67 tr.commit().await?;
68 }
69 if 2 == current_version {
70 info!("upgrade lakehouse schema to v3");
71 let mut tr = pool.begin().await?;
72 upgrade_v2_to_v3(&mut tr).await?;
73 current_version = read_lakehouse_schema_version(&mut tr).await;
74 tr.commit().await?;
75 }
76 if 3 == current_version {
77 info!("upgrade lakehouse schema to v4");
78 let mut tr = pool.begin().await?;
79 upgrade_v3_to_v4(&mut tr).await?;
80 current_version = read_lakehouse_schema_version(&mut tr).await;
81 tr.commit().await?;
82 }
83 if 4 == current_version {
84 info!("upgrade lakehouse schema to v5");
85 let mut tr = pool.begin().await?;
86 upgrade_v4_to_v5(&mut tr).await?;
87 current_version = read_lakehouse_schema_version(&mut tr).await;
88 tr.commit().await?;
89 }
90 assert_eq!(current_version, LATEST_LAKEHOUSE_SCHEMA_VERSION);
91 Ok(())
92}
93
94async fn create_partitions_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
95 tr.execute("
103 CREATE TABLE lakehouse_partitions(
104 view_set_name VARCHAR(255),
105 view_instance_id VARCHAR(255),
106 begin_insert_time TIMESTAMPTZ,
107 end_insert_time TIMESTAMPTZ,
108 min_event_time TIMESTAMPTZ,
109 max_event_time TIMESTAMPTZ,
110 updated TIMESTAMPTZ,
111 file_path VARCHAR(2047),
112 file_size BIGINT,
113 file_schema_hash bytea,
114 source_data_hash bytea
115 );
116 CREATE INDEX lh_part_begin_insert on lakehouse_partitions(view_set_name, view_instance_id, begin_insert_time);
117 CREATE INDEX lh_part_end_insert on lakehouse_partitions(view_set_name, view_instance_id, end_insert_time);
118 CREATE INDEX lh_part_min_time on lakehouse_partitions(view_set_name, view_instance_id, min_event_time);
119 CREATE INDEX lh_part_max_time on lakehouse_partitions(view_set_name, view_instance_id, max_event_time);
120")
121 .await
122 .with_context(|| "Creating table blocks and its indices")?;
123 Ok(())
124}
125
126async fn create_temp_files_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
127 tr.execute(
129 "
130 CREATE TABLE temporary_files(
131 file_path VARCHAR(2047),
132 file_size BIGINT,
133 expiration TIMESTAMPTZ );
134 CREATE INDEX temporary_files_expiration on temporary_files(expiration);
135",
136 )
137 .await
138 .with_context(|| "Creating temporary_files table")?;
139 Ok(())
140}
141
142async fn create_migration_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
143 sqlx::query("CREATE table lakehouse_migration(version integer);")
144 .execute(&mut **tr)
145 .await
146 .with_context(|| "Creating table lakehouse_migration")?;
147 sqlx::query("INSERT INTO lakehouse_migration VALUES(1);")
148 .execute(&mut **tr)
149 .await
150 .with_context(|| "Recording the initial lakehouse schema version")?;
151 Ok(())
152}
153
154async fn create_tables(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
155 create_partitions_table(tr).await?;
156 create_temp_files_table(tr).await?;
157 create_migration_table(tr).await?;
158 Ok(())
159}
160
161async fn upgrade_v1_to_v2(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
162 tr.execute("ALTER TABLE lakehouse_partitions ADD file_metadata bytea;")
165 .await
166 .with_context(|| "adding column file_metadata to lakehouse_partitions table")?;
167 tr.execute("UPDATE lakehouse_migration SET version=2;")
168 .await
169 .with_context(|| "Updating lakehouse schema version to 2")?;
170 Ok(())
171}
172
173async fn upgrade_v2_to_v3(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
174 tr.execute("ALTER TABLE lakehouse_partitions ADD num_rows BIGINT;")
176 .await
177 .with_context(|| "adding column num_rows to lakehouse_partitions table")?;
178
179 tr.execute("CREATE INDEX lakehouse_partitions_file_path ON lakehouse_partitions(file_path);")
181 .await
182 .with_context(|| "creating index on file_path")?;
183
184 populate_num_rows_column(tr)
186 .await
187 .with_context(|| "populating num_rows column")?;
188
189 tr.execute("ALTER TABLE lakehouse_partitions ALTER COLUMN num_rows SET NOT NULL;")
191 .await
192 .with_context(|| "setting num_rows column to NOT NULL")?;
193
194 tr.execute("UPDATE lakehouse_migration SET version=3;")
195 .await
196 .with_context(|| "Updating lakehouse schema version to 3")?;
197 Ok(())
198}
199
200async fn populate_num_rows_column(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
201 info!("populating num_rows column for existing partitions");
202
203 let mut total_count = 0;
204 let batch_size = 1000;
205
206 loop {
207 let rows = sqlx::query("SELECT file_path, file_metadata FROM lakehouse_partitions WHERE file_metadata IS NOT NULL AND num_rows IS NULL LIMIT $1")
209 .bind(batch_size)
210 .fetch_all(&mut **tr)
211 .await?;
212
213 if rows.is_empty() {
214 break;
215 }
216
217 let mut batch_count = 0;
218 for row in rows {
219 let file_path: String = row.try_get("file_path")?;
220 let file_metadata_buffer: Vec<u8> = row.try_get("file_metadata")?;
221
222 match parse_parquet_metadata(&file_metadata_buffer.into()) {
224 Ok(file_metadata) => {
225 let num_rows = file_metadata.file_metadata().num_rows();
226
227 if let Err(e) = sqlx::query(
229 "UPDATE lakehouse_partitions SET num_rows = $1 WHERE file_path = $2",
230 )
231 .bind(num_rows)
232 .bind(&file_path)
233 .execute(&mut **tr)
234 .await
235 {
236 error!(
237 "failed to update num_rows for partition {}: {}",
238 file_path, e
239 );
240 continue;
241 }
242
243 batch_count += 1;
244 }
245 Err(e) => {
246 error!(
247 "failed to parse metadata for partition {}: {}",
248 file_path, e
249 );
250 if let Err(e2) = sqlx::query(
252 "UPDATE lakehouse_partitions SET num_rows = 0 WHERE file_path = $1",
253 )
254 .bind(&file_path)
255 .execute(&mut **tr)
256 .await
257 {
258 error!(
259 "failed to set fallback num_rows for partition {}: {}",
260 file_path, e2
261 );
262 }
263 }
264 }
265 }
266
267 total_count += batch_count;
268 info!(
269 "populated num_rows for {} partitions (total: {})",
270 batch_count, total_count
271 );
272 }
273
274 info!("populated num_rows for {} total partitions", total_count);
275 Ok(())
276}
277
278async fn upgrade_v3_to_v4(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
279 tr.execute(
281 "CREATE TABLE partition_metadata(
282 file_path VARCHAR(2047) PRIMARY KEY,
283 metadata bytea NOT NULL,
284 insert_time TIMESTAMPTZ NOT NULL
285 );",
286 )
287 .await
288 .with_context(|| "creating partition_metadata table")?;
289
290 migrate_metadata_to_new_table(tr)
292 .await
293 .with_context(|| "migrating metadata to partition_metadata table")?;
294
295 tr.execute("ALTER TABLE lakehouse_partitions DROP COLUMN file_metadata;")
297 .await
298 .with_context(|| "dropping file_metadata column from lakehouse_partitions")?;
299
300 tr.execute("UPDATE lakehouse_migration SET version=4;")
301 .await
302 .with_context(|| "Updating lakehouse schema version to 4")?;
303 Ok(())
304}
305
306async fn migrate_metadata_to_new_table(
307 tr: &mut sqlx::Transaction<'_, sqlx::Postgres>,
308) -> Result<()> {
309 info!("migrating metadata to partition_metadata table");
310
311 let file_paths: Vec<String> = sqlx::query_scalar(
313 "SELECT file_path
314 FROM lakehouse_partitions
315 WHERE file_metadata IS NOT NULL
316 ORDER BY file_path",
317 )
318 .fetch_all(&mut **tr)
319 .await?;
320
321 let total_to_migrate = file_paths.len();
322 info!(
323 "found {} partitions with metadata to migrate",
324 total_to_migrate
325 );
326
327 let mut total_count = 0;
328 let batch_size = 10; for chunk in file_paths.chunks(batch_size) {
332 let placeholders: Vec<String> = (1..=chunk.len()).map(|i| format!("${}", i)).collect();
334 let query_str = format!(
335 "SELECT file_path, file_metadata, updated
336 FROM lakehouse_partitions
337 WHERE file_path IN ({})",
338 placeholders.join(", ")
339 );
340
341 let mut query = sqlx::query(&query_str);
342 for path in chunk {
343 query = query.bind(path);
344 }
345
346 let rows = query.fetch_all(&mut **tr).await?;
347
348 for row in rows {
349 let file_path: String = row.try_get("file_path")?;
350 let file_metadata: Vec<u8> = row.try_get("file_metadata")?;
351 let updated: chrono::DateTime<chrono::Utc> = row.try_get("updated")?;
352
353 if let Err(e) = sqlx::query(
355 "INSERT INTO partition_metadata (file_path, metadata, insert_time)
356 VALUES ($1, $2, $3)
357 ON CONFLICT (file_path) DO NOTHING",
358 )
359 .bind(&file_path)
360 .bind(&file_metadata)
361 .bind(updated)
362 .execute(&mut **tr)
363 .await
364 {
365 error!(
366 "failed to migrate metadata for partition {}: {}",
367 file_path, e
368 );
369 continue;
370 }
371
372 total_count += 1;
373 }
374
375 if total_count % 100 == 0 || total_count == total_to_migrate {
376 info!(
377 "migrated {}/{} partition metadata entries",
378 total_count, total_to_migrate
379 );
380 }
381 }
382
383 info!("migrated metadata for {} total partitions", total_count);
384 Ok(())
385}
386
387async fn upgrade_v4_to_v5(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
388 tr.execute(
391 "ALTER TABLE lakehouse_partitions
392 ADD COLUMN partition_format_version INTEGER NOT NULL DEFAULT 1;",
393 )
394 .await
395 .with_context(|| "adding partition_format_version to lakehouse_partitions")?;
396 tr.execute(
399 "ALTER TABLE partition_metadata
400 ADD COLUMN partition_format_version INTEGER NOT NULL DEFAULT 1;",
401 )
402 .await
403 .with_context(|| "adding partition_format_version to partition_metadata")?;
404 tr.execute("UPDATE lakehouse_migration SET version=5;")
405 .await
406 .with_context(|| "Updating lakehouse schema version to 5")?;
407 info!("added partition_format_version columns to both tables");
408 Ok(())
409}