micromegas_analytics/dfext/
csv_table_provider.rs

1use anyhow::Result;
2use datafusion::catalog::TableProvider;
3use datafusion::datasource::file_format::csv::CsvFormat;
4use datafusion::datasource::listing::{
5    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
6};
7use datafusion::execution::context::SessionContext;
8use std::sync::Arc;
9
10use super::json_table_provider::verify_files_exist;
11
12/// Creates a TableProvider for a CSV file with pre-computed schema
13///
14/// This function infers the schema once and returns a TableProvider that can be
15/// registered in multiple SessionContexts without re-inferring the schema.
16///
17/// Assumes CSV files have a header row and use comma delimiters.
18///
19/// # Arguments
20///
21/// * `ctx` - A SessionContext used for schema inference and object store access
22/// * `url` - URL to the CSV file (e.g., "file:///path/to/data.csv" or "s3://bucket/data.csv")
23///
24/// # Returns
25///
26/// Returns an `Arc<dyn TableProvider>` that can be registered using
27/// `SessionContext::register_table()`.
28pub async fn csv_table_provider(ctx: &SessionContext, url: &str) -> Result<Arc<dyn TableProvider>> {
29    let file_format = Arc::new(CsvFormat::default());
30    let listing_options = ListingOptions::new(file_format);
31    let table_url = ListingTableUrl::parse(url)?;
32
33    // Verify that files exist at the specified URL
34    let object_store = ctx.state().runtime_env().object_store(&table_url)?;
35    verify_files_exist(&object_store, table_url.prefix(), url).await?;
36
37    let mut config = ListingTableConfig::new(table_url).with_listing_options(listing_options);
38    config = config.infer_schema(&ctx.state()).await?;
39    let listing_table = ListingTable::try_new(config)?;
40    Ok(Arc::new(listing_table))
41}