micromegas_ingestion/
sql_migration.rs

1use crate::sql_telemetry_db::create_tables;
2use anyhow::{Context, Result};
3use micromegas_tracing::prelude::*;
4use sqlx::Executor;
5use sqlx::Row;
6
7/// The latest schema version for the data lake.
8pub const LATEST_DATA_LAKE_SCHEMA_VERSION: i32 = 4;
9
10/// Reads the current schema version from the database.
11pub async fn read_data_lake_schema_version(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> i32 {
12    match sqlx::query(
13        "SELECT version
14         FROM migration;",
15    )
16    .fetch_one(&mut **tr)
17    .await
18    {
19        Ok(row) => row.get("version"),
20        Err(e) => {
21            info!(
22                "Error reading data lake schema version, assuming version 0: {}",
23                e
24            );
25            0
26        }
27    }
28}
29
30/// Upgrades the data lake schema to version 2.
31pub async fn upgrade_data_lake_schema_v2(
32    tr: &mut sqlx::Transaction<'_, sqlx::Postgres>,
33) -> Result<()> {
34    tr.execute("ALTER TABLE blocks ADD insert_time TIMESTAMPTZ;")
35        .await
36        .with_context(|| "adding column insert_time to blocks table")?;
37    tr.execute("UPDATE blocks SET insert_time=end_time WHERE insert_time is NULL;")
38        .await
39        .with_context(|| "use end_time as insert_time to backfill missing data")?;
40    tr.execute("CREATE INDEX block_begin_time on blocks(begin_time);")
41        .await
42        .with_context(|| "adding index block_begin_time")?;
43    tr.execute("CREATE INDEX block_end_time on blocks(end_time);")
44        .await
45        .with_context(|| "adding index block_end_time")?;
46    tr.execute("CREATE INDEX block_insert_time on blocks(insert_time);")
47        .await
48        .with_context(|| "adding index block_insert_time")?;
49    tr.execute("CREATE INDEX process_insert_time on processes(insert_time);")
50        .await
51        .with_context(|| "adding index process_insert_time")?;
52    tr.execute("UPDATE migration SET version=2;")
53        .await
54        .with_context(|| "Updating data lake schema version to 2")?;
55    Ok(())
56}
57
58/// Upgrades the data lake schema to version 3.
59/// Drops old non-unique indexes (superseded by the unique indexes created before this transaction).
60pub async fn upgrade_data_lake_schema_v3(
61    tr: &mut sqlx::Transaction<'_, sqlx::Postgres>,
62) -> Result<()> {
63    tr.execute("DROP INDEX IF EXISTS process_id;")
64        .await
65        .with_context(|| "dropping old non-unique index process_id")?;
66    tr.execute("DROP INDEX IF EXISTS stream_id;")
67        .await
68        .with_context(|| "dropping old non-unique index stream_id")?;
69    tr.execute("DROP INDEX IF EXISTS block_id;")
70        .await
71        .with_context(|| "dropping old non-unique index block_id")?;
72    tr.execute("UPDATE migration SET version=3;")
73        .await
74        .with_context(|| "updating data lake schema version to 3")?;
75    Ok(())
76}
77
78/// Upgrades the data lake schema to version 4.
79/// Adds the `format` column to `streams` so OTLP and native blocks can be distinguished.
80pub async fn upgrade_data_lake_schema_v4(
81    tr: &mut sqlx::Transaction<'_, sqlx::Postgres>,
82) -> Result<()> {
83    tr.execute("ALTER TABLE streams ADD COLUMN format TEXT NOT NULL DEFAULT 'micromegas-transit';")
84        .await
85        .with_context(|| "adding column format to streams table")?;
86    tr.execute("UPDATE migration SET version=4;")
87        .await
88        .with_context(|| "updating data lake schema version to 4")?;
89    Ok(())
90}
91
92/// Checks whether a specific index is valid in `pg_index`.
93/// If the index is invalid, drops it and returns `Ok(false)`.
94/// If valid, returns `Ok(true)`.
95/// Returns an error if the index does not exist.
96async fn check_index_is_valid(pool: &sqlx::Pool<sqlx::Postgres>, index_name: &str) -> Result<bool> {
97    let row = sqlx::query(
98        "SELECT i.indisvalid
99         FROM pg_class c
100         JOIN pg_index i ON i.indexrelid = c.oid
101         WHERE c.relname = $1;",
102    )
103    .bind(index_name)
104    .fetch_optional(pool)
105    .await
106    .with_context(|| format!("querying pg_index for {index_name}"))?;
107
108    let row = row.with_context(|| format!("index {index_name} not found in pg_class"))?;
109    let is_valid: bool = row.get("indisvalid");
110
111    if !is_valid {
112        info!("index {index_name} is INVALID, dropping it");
113        sqlx::query(&format!("DROP INDEX IF EXISTS {index_name}"))
114            .execute(pool)
115            .await
116            .with_context(|| format!("dropping invalid index {index_name}"))?;
117        return Ok(false);
118    }
119
120    Ok(true)
121}
122
123/// Validates that all three unique indexes created during the v2→v3 migration are valid.
124/// Drops any invalid indexes and returns an error so the migration can be retried.
125async fn validate_unique_indexes(pool: &sqlx::Pool<sqlx::Postgres>) -> Result<()> {
126    let index_names = [
127        "processes_process_id_unique",
128        "streams_stream_id_unique",
129        "blocks_block_id_unique",
130    ];
131
132    let mut invalid_indexes = Vec::new();
133    for name in &index_names {
134        if !check_index_is_valid(pool, name).await? {
135            invalid_indexes.push(*name);
136        }
137    }
138
139    if !invalid_indexes.is_empty() {
140        anyhow::bail!(
141            "invalid indexes detected and dropped: {}. The migration will be retried on next startup.",
142            invalid_indexes.join(", ")
143        );
144    }
145
146    Ok(())
147}
148
149/// Executes the database migration.
150pub async fn execute_migration(pool: sqlx::Pool<sqlx::Postgres>) -> Result<()> {
151    let mut current_version = read_data_lake_schema_version(&mut pool.begin().await?).await;
152    if 0 == current_version {
153        info!("creating v1 data_lake_schema");
154        let mut tr = pool.begin().await?;
155        create_tables(&mut tr).await?;
156        current_version = read_data_lake_schema_version(&mut tr).await;
157        tr.commit().await?;
158    }
159    if 1 == current_version {
160        info!("upgrading data_lake_schema to v2");
161        let mut tr = pool.begin().await?;
162        upgrade_data_lake_schema_v2(&mut tr).await?;
163        current_version = read_data_lake_schema_version(&mut tr).await;
164        tr.commit().await?;
165    }
166    if 2 == current_version {
167        info!("upgrading data_lake_schema to v3");
168        // CREATE UNIQUE INDEX CONCURRENTLY cannot run inside a transaction.
169        // Run these outside any transaction, then do the rest in a transaction.
170        // IF NOT EXISTS makes this idempotent and safe for retries.
171        sqlx::query("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS processes_process_id_unique ON processes(process_id);")
172            .execute(&pool)
173            .await
174            .with_context(|| "creating unique index on processes(process_id)")?;
175        sqlx::query("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS streams_stream_id_unique ON streams(stream_id);")
176            .execute(&pool)
177            .await
178            .with_context(|| "creating unique index on streams(stream_id)")?;
179        sqlx::query("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS blocks_block_id_unique ON blocks(block_id);")
180            .execute(&pool)
181            .await
182            .with_context(|| "creating unique index on blocks(block_id)")?;
183
184        validate_unique_indexes(&pool).await?;
185
186        let mut tr = pool.begin().await?;
187        upgrade_data_lake_schema_v3(&mut tr).await?;
188        current_version = read_data_lake_schema_version(&mut tr).await;
189        tr.commit().await?;
190    }
191    if 3 == current_version {
192        info!("upgrading data_lake_schema to v4");
193        let mut tr = pool.begin().await?;
194        upgrade_data_lake_schema_v4(&mut tr).await?;
195        current_version = read_data_lake_schema_version(&mut tr).await;
196        tr.commit().await?;
197    }
198    assert_eq!(current_version, LATEST_DATA_LAKE_SCHEMA_VERSION);
199    Ok(())
200}