micromegas_analytics/dfext/
csv_table_provider.rs1use anyhow::Result;
2use datafusion::catalog::TableProvider;
3use datafusion::datasource::file_format::csv::CsvFormat;
4use datafusion::datasource::listing::{
5 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
6};
7use datafusion::execution::context::SessionContext;
8use std::sync::Arc;
9
10use super::json_table_provider::verify_files_exist;
11
12pub async fn csv_table_provider(ctx: &SessionContext, url: &str) -> Result<Arc<dyn TableProvider>> {
29 let file_format = Arc::new(CsvFormat::default());
30 let listing_options = ListingOptions::new(file_format);
31 let table_url = ListingTableUrl::parse(url)?;
32
33 let object_store = ctx.state().runtime_env().object_store(&table_url)?;
35 verify_files_exist(&object_store, table_url.prefix(), url).await?;
36
37 let mut config = ListingTableConfig::new(table_url).with_listing_options(listing_options);
38 config = config.infer_schema(&ctx.state()).await?;
39 let listing_table = ListingTable::try_new(config)?;
40 Ok(Arc::new(listing_table))
41}