micromegas_ingestion/
remote_data_lake.rs1use crate::data_lake_connection::DataLakeConnection;
2use crate::sql_migration::LATEST_DATA_LAKE_SCHEMA_VERSION;
3use crate::sql_migration::execute_migration;
4use crate::sql_migration::read_data_lake_schema_version;
5use anyhow::{Context, Result};
6use micromegas_telemetry::blob_storage::BlobStorage;
7use micromegas_tracing::prelude::*;
8use std::sync::Arc;
9
10pub async fn acquire_lock(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>, key: i64) -> Result<()> {
12 sqlx::query("SELECT pg_advisory_xact_lock($1)")
13 .bind(key)
14 .execute(&mut **tr)
15 .await?;
16 Ok(())
17}
18
19pub async fn migrate_db(pool: sqlx::Pool<sqlx::Postgres>) -> Result<()> {
21 let mut tr = pool.begin().await?;
22 let mut current_version = read_data_lake_schema_version(&mut tr).await;
23 drop(tr);
24 info!("current data lake schema: {}", current_version);
25 if current_version != LATEST_DATA_LAKE_SCHEMA_VERSION {
26 let mut tr = pool.begin().await?;
27 acquire_lock(&mut tr, 0).await?;
28 current_version = read_data_lake_schema_version(&mut pool.begin().await?).await;
29 if LATEST_DATA_LAKE_SCHEMA_VERSION == current_version {
30 return Ok(());
31 }
32 if let Err(e) = execute_migration(pool.clone()).await {
33 error!("Error migrating database: {}", e);
34 return Err(e);
35 }
36 current_version = read_data_lake_schema_version(&mut tr).await;
37 }
38 assert_eq!(current_version, LATEST_DATA_LAKE_SCHEMA_VERSION);
39 Ok(())
40}
41
42pub async fn connect_to_remote_data_lake(
44 db_uri: &str,
45 object_store_url: &str,
46) -> Result<DataLakeConnection> {
47 info!("connecting to blob storage");
48 let blob_storage = Arc::new(
49 BlobStorage::connect(object_store_url).with_context(|| "connecting to blob storage")?,
50 );
51 let pool = sqlx::postgres::PgPoolOptions::new()
52 .connect(db_uri)
53 .await
54 .with_context(|| String::from("Connecting to telemetry database"))?;
55 migrate_db(pool.clone()).await?;
56 Ok(DataLakeConnection::new(pool, blob_storage))
57}