micromegas_analytics/dfext/json_table_provider.rs
1use anyhow::Result;
2use datafusion::catalog::TableProvider;
3use datafusion::datasource::file_format::json::JsonFormat;
4use datafusion::datasource::listing::{
5 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
6};
7use datafusion::execution::context::SessionContext;
8use futures::StreamExt;
9use object_store::ObjectStore;
10use std::sync::Arc;
11
12/// Verifies that files exist at the specified URL
13///
14/// This function checks if files exist by first attempting to get metadata using
15/// `head()`, and if that fails (e.g., for directory patterns), it falls back to
16/// listing files at the prefix.
17///
18/// # Arguments
19///
20/// * `object_store` - The object store to query
21/// * `prefix` - The path/prefix to check for files
22/// * `url` - The original URL (used for error messages)
23///
24/// # Returns
25///
26/// Returns `Ok(())` if files exist, or an error if no files are found.
27pub(crate) async fn verify_files_exist(
28 object_store: &Arc<dyn ObjectStore>,
29 prefix: &object_store::path::Path,
30 url: &str,
31) -> Result<()> {
32 // Try to get metadata for the file to verify it exists
33 let head_result = object_store.head(prefix).await;
34 if head_result.is_err() {
35 // If head fails, try listing - could be a directory/prefix
36 let mut list_stream = object_store.list(Some(prefix));
37 let first_file = list_stream.next().await;
38 if first_file.is_none() {
39 anyhow::bail!("No files found at URL: {}", url);
40 }
41 }
42 Ok(())
43}
44
45/// Creates a TableProvider for a JSON file with pre-computed schema
46///
47/// This function infers the schema once and returns a TableProvider that can be
48/// registered in multiple SessionContexts without re-inferring the schema.
49///
50/// DataFusion supports **JSONL (newline-delimited JSON)** format, where each line
51/// contains a complete JSON object.
52///
53/// # Arguments
54///
55/// * `url` - URL to the JSON file (e.g., "file:///path/to/data.json" or "s3://bucket/data.json")
56///
57/// # Returns
58///
59/// Returns an `Arc<dyn TableProvider>` that can be registered using
60/// `SessionContext::register_table()`.
61///
62/// # Example
63///
64/// ```rust,no_run
65/// use anyhow::Result;
66/// use datafusion::execution::context::SessionContext;
67/// use micromegas_analytics::dfext::json_table_provider::json_table_provider;
68///
69/// #[tokio::main]
70/// async fn main() -> Result<()> {
71/// let ctx = SessionContext::new();
72/// // Create table provider with pre-computed schema (done once at startup)
73/// let table = json_table_provider(&ctx, "file:///path/to/data.json").await?;
74///
75/// // Register in session context (fast, no schema inference)
76/// ctx.register_table("my_table", table)?;
77///
78/// Ok(())
79/// }
80/// ```
81///
82/// # Performance
83///
84/// Schema inference happens once during this function call. The returned
85/// TableProvider caches the schema, making subsequent registrations in
86/// different SessionContexts very fast.
87pub async fn json_table_provider(
88 ctx: &SessionContext,
89 url: &str,
90) -> Result<Arc<dyn TableProvider>> {
91 let file_format = Arc::new(JsonFormat::default());
92 let listing_options = ListingOptions::new(file_format);
93 let table_url = ListingTableUrl::parse(url)?;
94
95 // Verify that files exist at the specified URL
96 let object_store = ctx.state().runtime_env().object_store(&table_url)?;
97 verify_files_exist(&object_store, table_url.prefix(), url).await?;
98
99 let mut config = ListingTableConfig::new(table_url).with_listing_options(listing_options);
100 config = config.infer_schema(&ctx.state()).await?;
101 let listing_table = ListingTable::try_new(config)?;
102 Ok(Arc::new(listing_table))
103}