micromegas_analytics/lakehouse/
static_tables_configurator.rs1use anyhow::{Context, Result};
2use datafusion::catalog::TableProvider;
3use datafusion::execution::context::{SessionConfig, SessionContext};
4use datafusion::execution::runtime_env::RuntimeEnv;
5use futures::StreamExt;
6use micromegas_tracing::prelude::*;
7use std::sync::Arc;
8
9use super::session_configurator::{NoOpSessionConfigurator, SessionConfigurator};
10use crate::dfext::csv_table_provider::csv_table_provider;
11use crate::dfext::json_table_provider::json_table_provider;
12
13#[derive(Debug)]
36pub struct StaticTablesConfigurator {
37 tables: Vec<(String, Arc<dyn TableProvider>)>,
38}
39
40impl StaticTablesConfigurator {
41 pub async fn from_env(
46 env_var: &str,
47 runtime: Arc<RuntimeEnv>,
48 ) -> Result<Arc<dyn SessionConfigurator>> {
49 let url = match std::env::var(env_var) {
50 Ok(url) => url,
51 Err(_) => {
52 warn!("{env_var} not set, static tables will not be available");
53 return Ok(Arc::new(NoOpSessionConfigurator));
54 }
55 };
56 let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
57 let configurator = Self::new(&ctx, &url)
58 .await
59 .with_context(|| format!("loading static tables from {url}"))?;
60 Ok(Arc::new(configurator))
61 }
62
63 pub async fn new(ctx: &SessionContext, url: &str) -> Result<Self> {
71 let parsed_url = url::Url::parse(url)?;
72 let (object_store, prefix) = object_store::parse_url_opts(
73 &parsed_url,
74 std::env::vars().map(|(k, v)| (k.to_lowercase(), v)),
75 )?;
76 let object_store = Arc::new(object_store);
77
78 ctx.register_object_store(&parsed_url, object_store.clone());
80
81 let mut tables = Vec::new();
82 let mut list_stream = object_store.list(Some(&prefix));
83
84 while let Some(result) = list_stream.next().await {
85 match result {
86 Ok(meta) => {
87 let path_str = meta.location.to_string();
88 let file_url = format!(
89 "{scheme}://{authority}/{path}",
90 scheme = parsed_url.scheme(),
91 authority = parsed_url.authority(),
92 path = path_str,
93 );
94
95 let file_name = meta.location.filename().unwrap_or_default();
96
97 let (stem, ext) = match file_name.rsplit_once('.') {
98 Some((s, e)) => (s, e.to_lowercase()),
99 None => {
100 debug!("skipping file without extension: {path_str}");
101 continue;
102 }
103 };
104
105 if stem.is_empty() {
106 warn!("skipping file with empty stem: {path_str}");
107 continue;
108 }
109
110 let table_name = stem.to_string();
111
112 let provider_result = match ext.as_str() {
113 "json" => json_table_provider(ctx, &file_url).await,
114 "csv" => csv_table_provider(ctx, &file_url).await,
115 _ => {
116 warn!("skipping file with unsupported extension: {path_str}");
117 continue;
118 }
119 };
120
121 match provider_result {
122 Ok(provider) => {
123 if ctx.table_provider(&table_name).await.is_ok() {
125 warn!(
126 "skipping static table '{table_name}': name already registered"
127 );
128 continue;
129 }
130 info!("discovered static table: {table_name} from {path_str}");
131 tables.push((table_name, provider));
132 }
133 Err(e) => {
134 warn!("failed to load static table from {path_str}: {e}");
135 }
136 }
137 }
138 Err(e) => {
139 warn!("error listing files under {url}: {e}");
140 }
141 }
142 }
143
144 info!(
145 "static tables configurator discovered {} tables from {url}",
146 tables.len()
147 );
148 Ok(Self { tables })
149 }
150}
151
152#[async_trait::async_trait]
153impl SessionConfigurator for StaticTablesConfigurator {
154 async fn configure(&self, ctx: &SessionContext) -> Result<()> {
155 for (name, provider) in &self.tables {
156 if let Err(e) = ctx.register_table(name, provider.clone()) {
157 warn!("failed to register static table '{name}': {e}");
158 }
159 }
160 Ok(())
161 }
162}