micromegas_ingestion/
sql_migration.rs1use crate::sql_telemetry_db::create_tables;
2use anyhow::{Context, Result};
3use micromegas_tracing::prelude::*;
4use sqlx::Executor;
5use sqlx::Row;
6
7pub const LATEST_DATA_LAKE_SCHEMA_VERSION: i32 = 3;
9
10pub async fn read_data_lake_schema_version(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> i32 {
12 match sqlx::query(
13 "SELECT version
14 FROM migration;",
15 )
16 .fetch_one(&mut **tr)
17 .await
18 {
19 Ok(row) => row.get("version"),
20 Err(e) => {
21 info!(
22 "Error reading data lake schema version, assuming version 0: {}",
23 e
24 );
25 0
26 }
27 }
28}
29
30pub async fn upgrade_data_lake_schema_v2(
32 tr: &mut sqlx::Transaction<'_, sqlx::Postgres>,
33) -> Result<()> {
34 tr.execute("ALTER TABLE blocks ADD insert_time TIMESTAMPTZ;")
35 .await
36 .with_context(|| "adding column insert_time to blocks table")?;
37 tr.execute("UPDATE blocks SET insert_time=end_time WHERE insert_time is NULL;")
38 .await
39 .with_context(|| "use end_time as insert_time to backfill missing data")?;
40 tr.execute("CREATE INDEX block_begin_time on blocks(begin_time);")
41 .await
42 .with_context(|| "adding index block_begin_time")?;
43 tr.execute("CREATE INDEX block_end_time on blocks(end_time);")
44 .await
45 .with_context(|| "adding index block_end_time")?;
46 tr.execute("CREATE INDEX block_insert_time on blocks(insert_time);")
47 .await
48 .with_context(|| "adding index block_insert_time")?;
49 tr.execute("CREATE INDEX process_insert_time on processes(insert_time);")
50 .await
51 .with_context(|| "adding index process_insert_time")?;
52 tr.execute("UPDATE migration SET version=2;")
53 .await
54 .with_context(|| "Updating data lake schema version to 2")?;
55 Ok(())
56}
57
58pub async fn upgrade_data_lake_schema_v3(
61 tr: &mut sqlx::Transaction<'_, sqlx::Postgres>,
62) -> Result<()> {
63 tr.execute("DROP INDEX IF EXISTS process_id;")
64 .await
65 .with_context(|| "dropping old non-unique index process_id")?;
66 tr.execute("DROP INDEX IF EXISTS stream_id;")
67 .await
68 .with_context(|| "dropping old non-unique index stream_id")?;
69 tr.execute("DROP INDEX IF EXISTS block_id;")
70 .await
71 .with_context(|| "dropping old non-unique index block_id")?;
72 tr.execute("UPDATE migration SET version=3;")
73 .await
74 .with_context(|| "updating data lake schema version to 3")?;
75 Ok(())
76}
77
78async fn check_index_is_valid(pool: &sqlx::Pool<sqlx::Postgres>, index_name: &str) -> Result<bool> {
83 let row = sqlx::query(
84 "SELECT i.indisvalid
85 FROM pg_class c
86 JOIN pg_index i ON i.indexrelid = c.oid
87 WHERE c.relname = $1;",
88 )
89 .bind(index_name)
90 .fetch_optional(pool)
91 .await
92 .with_context(|| format!("querying pg_index for {index_name}"))?;
93
94 let row = row.with_context(|| format!("index {index_name} not found in pg_class"))?;
95 let is_valid: bool = row.get("indisvalid");
96
97 if !is_valid {
98 info!("index {index_name} is INVALID, dropping it");
99 sqlx::query(&format!("DROP INDEX IF EXISTS {index_name}"))
100 .execute(pool)
101 .await
102 .with_context(|| format!("dropping invalid index {index_name}"))?;
103 return Ok(false);
104 }
105
106 Ok(true)
107}
108
109async fn validate_unique_indexes(pool: &sqlx::Pool<sqlx::Postgres>) -> Result<()> {
112 let index_names = [
113 "processes_process_id_unique",
114 "streams_stream_id_unique",
115 "blocks_block_id_unique",
116 ];
117
118 let mut invalid_indexes = Vec::new();
119 for name in &index_names {
120 if !check_index_is_valid(pool, name).await? {
121 invalid_indexes.push(*name);
122 }
123 }
124
125 if !invalid_indexes.is_empty() {
126 anyhow::bail!(
127 "invalid indexes detected and dropped: {}. The migration will be retried on next startup.",
128 invalid_indexes.join(", ")
129 );
130 }
131
132 Ok(())
133}
134
135pub async fn execute_migration(pool: sqlx::Pool<sqlx::Postgres>) -> Result<()> {
137 let mut current_version = read_data_lake_schema_version(&mut pool.begin().await?).await;
138 if 0 == current_version {
139 info!("creating v1 data_lake_schema");
140 let mut tr = pool.begin().await?;
141 create_tables(&mut tr).await?;
142 current_version = read_data_lake_schema_version(&mut tr).await;
143 tr.commit().await?;
144 }
145 if 1 == current_version {
146 info!("upgrading data_lake_schema to v2");
147 let mut tr = pool.begin().await?;
148 upgrade_data_lake_schema_v2(&mut tr).await?;
149 current_version = read_data_lake_schema_version(&mut tr).await;
150 tr.commit().await?;
151 }
152 if 2 == current_version {
153 info!("upgrading data_lake_schema to v3");
154 sqlx::query("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS processes_process_id_unique ON processes(process_id);")
158 .execute(&pool)
159 .await
160 .with_context(|| "creating unique index on processes(process_id)")?;
161 sqlx::query("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS streams_stream_id_unique ON streams(stream_id);")
162 .execute(&pool)
163 .await
164 .with_context(|| "creating unique index on streams(stream_id)")?;
165 sqlx::query("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS blocks_block_id_unique ON blocks(block_id);")
166 .execute(&pool)
167 .await
168 .with_context(|| "creating unique index on blocks(block_id)")?;
169
170 validate_unique_indexes(&pool).await?;
171
172 let mut tr = pool.begin().await?;
173 upgrade_data_lake_schema_v3(&mut tr).await?;
174 current_version = read_data_lake_schema_version(&mut tr).await;
175 tr.commit().await?;
176 }
177 assert_eq!(current_version, LATEST_DATA_LAKE_SCHEMA_VERSION);
178 Ok(())
179}