micromegas_ingestion/
remote_data_lake.rs

1use crate::data_lake_connection::DataLakeConnection;
2use crate::sql_migration::LATEST_DATA_LAKE_SCHEMA_VERSION;
3use crate::sql_migration::execute_migration;
4use crate::sql_migration::read_data_lake_schema_version;
5use anyhow::{Context, Result};
6use micromegas_telemetry::blob_storage::BlobStorage;
7use micromegas_tracing::prelude::*;
8use std::sync::Arc;
9
10/// Acquires a lock on the database to prevent concurrent migrations.
11pub async fn acquire_lock(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>, key: i64) -> Result<()> {
12    sqlx::query("SELECT pg_advisory_xact_lock($1)")
13        .bind(key)
14        .execute(&mut **tr)
15        .await?;
16    Ok(())
17}
18
19/// Migrates the database to the latest schema version.
20pub async fn migrate_db(pool: sqlx::Pool<sqlx::Postgres>) -> Result<()> {
21    let mut tr = pool.begin().await?;
22    let mut current_version = read_data_lake_schema_version(&mut tr).await;
23    drop(tr);
24    info!("current data lake schema: {}", current_version);
25    if current_version != LATEST_DATA_LAKE_SCHEMA_VERSION {
26        let mut tr = pool.begin().await?;
27        acquire_lock(&mut tr, 0).await?;
28        current_version = read_data_lake_schema_version(&mut pool.begin().await?).await;
29        if LATEST_DATA_LAKE_SCHEMA_VERSION == current_version {
30            return Ok(());
31        }
32        if let Err(e) = execute_migration(pool.clone()).await {
33            error!("Error migrating database: {}", e);
34            return Err(e);
35        }
36        current_version = read_data_lake_schema_version(&mut tr).await;
37    }
38    assert_eq!(current_version, LATEST_DATA_LAKE_SCHEMA_VERSION);
39    Ok(())
40}
41
42/// Connects to the remote data lake, migrating the database if necessary.
43pub async fn connect_to_remote_data_lake(
44    db_uri: &str,
45    object_store_url: &str,
46) -> Result<DataLakeConnection> {
47    info!("connecting to blob storage");
48    let blob_storage = Arc::new(
49        BlobStorage::connect(object_store_url).with_context(|| "connecting to blob storage")?,
50    );
51    let pool = sqlx::postgres::PgPoolOptions::new()
52        .connect(db_uri)
53        .await
54        .with_context(|| String::from("Connecting to telemetry database"))?;
55    migrate_db(pool.clone()).await?;
56    Ok(DataLakeConnection::new(pool, blob_storage))
57}