micromegas_ingestion/
sql_migration.rs

1use crate::sql_telemetry_db::create_tables;
2use anyhow::{Context, Result};
3use micromegas_tracing::prelude::*;
4use sqlx::Executor;
5use sqlx::Row;
6
7/// The latest schema version for the data lake.
8pub const LATEST_DATA_LAKE_SCHEMA_VERSION: i32 = 3;
9
10/// Reads the current schema version from the database.
11pub async fn read_data_lake_schema_version(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> i32 {
12    match sqlx::query(
13        "SELECT version
14         FROM migration;",
15    )
16    .fetch_one(&mut **tr)
17    .await
18    {
19        Ok(row) => row.get("version"),
20        Err(e) => {
21            info!(
22                "Error reading data lake schema version, assuming version 0: {}",
23                e
24            );
25            0
26        }
27    }
28}
29
30/// Upgrades the data lake schema to version 2.
31pub async fn upgrade_data_lake_schema_v2(
32    tr: &mut sqlx::Transaction<'_, sqlx::Postgres>,
33) -> Result<()> {
34    tr.execute("ALTER TABLE blocks ADD insert_time TIMESTAMPTZ;")
35        .await
36        .with_context(|| "adding column insert_time to blocks table")?;
37    tr.execute("UPDATE blocks SET insert_time=end_time WHERE insert_time is NULL;")
38        .await
39        .with_context(|| "use end_time as insert_time to backfill missing data")?;
40    tr.execute("CREATE INDEX block_begin_time on blocks(begin_time);")
41        .await
42        .with_context(|| "adding index block_begin_time")?;
43    tr.execute("CREATE INDEX block_end_time on blocks(end_time);")
44        .await
45        .with_context(|| "adding index block_end_time")?;
46    tr.execute("CREATE INDEX block_insert_time on blocks(insert_time);")
47        .await
48        .with_context(|| "adding index block_insert_time")?;
49    tr.execute("CREATE INDEX process_insert_time on processes(insert_time);")
50        .await
51        .with_context(|| "adding index process_insert_time")?;
52    tr.execute("UPDATE migration SET version=2;")
53        .await
54        .with_context(|| "Updating data lake schema version to 2")?;
55    Ok(())
56}
57
58/// Upgrades the data lake schema to version 3.
59/// Drops old non-unique indexes (superseded by the unique indexes created before this transaction).
60pub async fn upgrade_data_lake_schema_v3(
61    tr: &mut sqlx::Transaction<'_, sqlx::Postgres>,
62) -> Result<()> {
63    tr.execute("DROP INDEX IF EXISTS process_id;")
64        .await
65        .with_context(|| "dropping old non-unique index process_id")?;
66    tr.execute("DROP INDEX IF EXISTS stream_id;")
67        .await
68        .with_context(|| "dropping old non-unique index stream_id")?;
69    tr.execute("DROP INDEX IF EXISTS block_id;")
70        .await
71        .with_context(|| "dropping old non-unique index block_id")?;
72    tr.execute("UPDATE migration SET version=3;")
73        .await
74        .with_context(|| "updating data lake schema version to 3")?;
75    Ok(())
76}
77
78/// Checks whether a specific index is valid in `pg_index`.
79/// If the index is invalid, drops it and returns `Ok(false)`.
80/// If valid, returns `Ok(true)`.
81/// Returns an error if the index does not exist.
82async fn check_index_is_valid(pool: &sqlx::Pool<sqlx::Postgres>, index_name: &str) -> Result<bool> {
83    let row = sqlx::query(
84        "SELECT i.indisvalid
85         FROM pg_class c
86         JOIN pg_index i ON i.indexrelid = c.oid
87         WHERE c.relname = $1;",
88    )
89    .bind(index_name)
90    .fetch_optional(pool)
91    .await
92    .with_context(|| format!("querying pg_index for {index_name}"))?;
93
94    let row = row.with_context(|| format!("index {index_name} not found in pg_class"))?;
95    let is_valid: bool = row.get("indisvalid");
96
97    if !is_valid {
98        info!("index {index_name} is INVALID, dropping it");
99        sqlx::query(&format!("DROP INDEX IF EXISTS {index_name}"))
100            .execute(pool)
101            .await
102            .with_context(|| format!("dropping invalid index {index_name}"))?;
103        return Ok(false);
104    }
105
106    Ok(true)
107}
108
109/// Validates that all three unique indexes created during the v2→v3 migration are valid.
110/// Drops any invalid indexes and returns an error so the migration can be retried.
111async fn validate_unique_indexes(pool: &sqlx::Pool<sqlx::Postgres>) -> Result<()> {
112    let index_names = [
113        "processes_process_id_unique",
114        "streams_stream_id_unique",
115        "blocks_block_id_unique",
116    ];
117
118    let mut invalid_indexes = Vec::new();
119    for name in &index_names {
120        if !check_index_is_valid(pool, name).await? {
121            invalid_indexes.push(*name);
122        }
123    }
124
125    if !invalid_indexes.is_empty() {
126        anyhow::bail!(
127            "invalid indexes detected and dropped: {}. The migration will be retried on next startup.",
128            invalid_indexes.join(", ")
129        );
130    }
131
132    Ok(())
133}
134
135/// Executes the database migration.
136pub async fn execute_migration(pool: sqlx::Pool<sqlx::Postgres>) -> Result<()> {
137    let mut current_version = read_data_lake_schema_version(&mut pool.begin().await?).await;
138    if 0 == current_version {
139        info!("creating v1 data_lake_schema");
140        let mut tr = pool.begin().await?;
141        create_tables(&mut tr).await?;
142        current_version = read_data_lake_schema_version(&mut tr).await;
143        tr.commit().await?;
144    }
145    if 1 == current_version {
146        info!("upgrading data_lake_schema to v2");
147        let mut tr = pool.begin().await?;
148        upgrade_data_lake_schema_v2(&mut tr).await?;
149        current_version = read_data_lake_schema_version(&mut tr).await;
150        tr.commit().await?;
151    }
152    if 2 == current_version {
153        info!("upgrading data_lake_schema to v3");
154        // CREATE UNIQUE INDEX CONCURRENTLY cannot run inside a transaction.
155        // Run these outside any transaction, then do the rest in a transaction.
156        // IF NOT EXISTS makes this idempotent and safe for retries.
157        sqlx::query("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS processes_process_id_unique ON processes(process_id);")
158            .execute(&pool)
159            .await
160            .with_context(|| "creating unique index on processes(process_id)")?;
161        sqlx::query("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS streams_stream_id_unique ON streams(stream_id);")
162            .execute(&pool)
163            .await
164            .with_context(|| "creating unique index on streams(stream_id)")?;
165        sqlx::query("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS blocks_block_id_unique ON blocks(block_id);")
166            .execute(&pool)
167            .await
168            .with_context(|| "creating unique index on blocks(block_id)")?;
169
170        validate_unique_indexes(&pool).await?;
171
172        let mut tr = pool.begin().await?;
173        upgrade_data_lake_schema_v3(&mut tr).await?;
174        current_version = read_data_lake_schema_version(&mut tr).await;
175        tr.commit().await?;
176    }
177    assert_eq!(current_version, LATEST_DATA_LAKE_SCHEMA_VERSION);
178    Ok(())
179}