micromegas_analytics/lakehouse/
static_tables_configurator.rs

1use anyhow::{Context, Result};
2use datafusion::catalog::TableProvider;
3use datafusion::execution::context::{SessionConfig, SessionContext};
4use datafusion::execution::runtime_env::RuntimeEnv;
5use futures::StreamExt;
6use micromegas_tracing::prelude::*;
7use std::sync::Arc;
8
9use super::session_configurator::{NoOpSessionConfigurator, SessionConfigurator};
10use crate::dfext::csv_table_provider::csv_table_provider;
11use crate::dfext::json_table_provider::json_table_provider;
12
13/// A SessionConfigurator that auto-discovers JSON and CSV files under an object store URL
14/// and registers each as a queryable DataFusion table.
15///
16/// Table names are derived from the filename stem (e.g., `event_schemas.json` → `event_schemas`).
17///
18/// # Example
19///
20/// ```rust,no_run
21/// use anyhow::Result;
22/// use datafusion::execution::context::SessionContext;
23/// use micromegas_analytics::lakehouse::static_tables_configurator::StaticTablesConfigurator;
24/// use micromegas_analytics::lakehouse::session_configurator::SessionConfigurator;
25///
26/// #[tokio::main]
27/// async fn main() -> Result<()> {
28///     let ctx = SessionContext::new();
29///     let configurator = StaticTablesConfigurator::new(&ctx, "file:///data/tables/").await?;
30///     // Later, configure a session:
31///     configurator.configure(&ctx).await?;
32///     Ok(())
33/// }
34/// ```
35#[derive(Debug)]
36pub struct StaticTablesConfigurator {
37    tables: Vec<(String, Arc<dyn TableProvider>)>,
38}
39
40impl StaticTablesConfigurator {
41    /// Load static tables from the URL in the given environment variable.
42    ///
43    /// Returns `NoOpSessionConfigurator` when the variable is unset.
44    /// Errors if the variable is set but loading fails (preserves fail-fast behavior).
45    pub async fn from_env(
46        env_var: &str,
47        runtime: Arc<RuntimeEnv>,
48    ) -> Result<Arc<dyn SessionConfigurator>> {
49        let url = match std::env::var(env_var) {
50            Ok(url) => url,
51            Err(_) => {
52                warn!("{env_var} not set, static tables will not be available");
53                return Ok(Arc::new(NoOpSessionConfigurator));
54            }
55        };
56        let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
57        let configurator = Self::new(&ctx, &url)
58            .await
59            .with_context(|| format!("loading static tables from {url}"))?;
60        Ok(Arc::new(configurator))
61    }
62
63    /// Discovers JSON and CSV files under the given URL and creates table providers for each.
64    ///
65    /// Files with `.json` extensions are loaded as JSON tables.
66    /// Files with `.csv` extensions are loaded as CSV tables.
67    /// Other extensions are skipped with a warning.
68    ///
69    /// Errors loading individual files are logged but do not prevent other files from loading.
70    pub async fn new(ctx: &SessionContext, url: &str) -> Result<Self> {
71        let parsed_url = url::Url::parse(url)?;
72        let (object_store, prefix) = object_store::parse_url_opts(
73            &parsed_url,
74            std::env::vars().map(|(k, v)| (k.to_lowercase(), v)),
75        )?;
76        let object_store = Arc::new(object_store);
77
78        // Register the object store so table providers can access it
79        ctx.register_object_store(&parsed_url, object_store.clone());
80
81        let mut tables = Vec::new();
82        let mut list_stream = object_store.list(Some(&prefix));
83
84        while let Some(result) = list_stream.next().await {
85            match result {
86                Ok(meta) => {
87                    let path_str = meta.location.to_string();
88                    let file_url = format!(
89                        "{scheme}://{authority}/{path}",
90                        scheme = parsed_url.scheme(),
91                        authority = parsed_url.authority(),
92                        path = path_str,
93                    );
94
95                    let file_name = meta.location.filename().unwrap_or_default();
96
97                    let (stem, ext) = match file_name.rsplit_once('.') {
98                        Some((s, e)) => (s, e.to_lowercase()),
99                        None => {
100                            debug!("skipping file without extension: {path_str}");
101                            continue;
102                        }
103                    };
104
105                    if stem.is_empty() {
106                        warn!("skipping file with empty stem: {path_str}");
107                        continue;
108                    }
109
110                    let table_name = stem.to_string();
111
112                    let provider_result = match ext.as_str() {
113                        "json" => json_table_provider(ctx, &file_url).await,
114                        "csv" => csv_table_provider(ctx, &file_url).await,
115                        _ => {
116                            warn!("skipping file with unsupported extension: {path_str}");
117                            continue;
118                        }
119                    };
120
121                    match provider_result {
122                        Ok(provider) => {
123                            // Check for table name collisions with already-registered tables
124                            if ctx.table_provider(&table_name).await.is_ok() {
125                                warn!(
126                                    "skipping static table '{table_name}': name already registered"
127                                );
128                                continue;
129                            }
130                            info!("discovered static table: {table_name} from {path_str}");
131                            tables.push((table_name, provider));
132                        }
133                        Err(e) => {
134                            warn!("failed to load static table from {path_str}: {e}");
135                        }
136                    }
137                }
138                Err(e) => {
139                    warn!("error listing files under {url}: {e}");
140                }
141            }
142        }
143
144        info!(
145            "static tables configurator discovered {} tables from {url}",
146            tables.len()
147        );
148        Ok(Self { tables })
149    }
150}
151
152#[async_trait::async_trait]
153impl SessionConfigurator for StaticTablesConfigurator {
154    async fn configure(&self, ctx: &SessionContext) -> Result<()> {
155        for (name, provider) in &self.tables {
156            if let Err(e) = ctx.register_table(name, provider.clone()) {
157                warn!("failed to register static table '{name}': {e}");
158            }
159        }
160        Ok(())
161    }
162}