micromegas_analytics/dfext/
log_stream_table_provider.rs

1use super::task_log_exec_plan::TaskLogExecPlan;
2use async_trait::async_trait;
3use datafusion::arrow::datatypes::SchemaRef;
4use datafusion::catalog::Session;
5use datafusion::catalog::TableProvider;
6use datafusion::datasource::TableType;
7use datafusion::physical_plan::ExecutionPlan;
8use datafusion::physical_plan::limit::GlobalLimitExec;
9use datafusion::prelude::Expr;
10use std::any::Any;
11use std::sync::Arc;
12
13/// A DataFusion `TableProvider` for a log stream.
14#[derive(Debug)]
15pub struct LogStreamTableProvider {
16    /// The underlying log stream execution plan.
17    pub log_stream: Arc<TaskLogExecPlan>,
18}
19
20#[async_trait]
21impl TableProvider for LogStreamTableProvider {
22    fn as_any(&self) -> &dyn Any {
23        self
24    }
25
26    fn schema(&self) -> SchemaRef {
27        self.log_stream.schema()
28    }
29
30    fn table_type(&self) -> TableType {
31        TableType::Temporary
32    }
33
34    async fn scan(
35        &self,
36        _state: &dyn Session,
37        _projection: Option<&Vec<usize>>,
38        _filters: &[Expr],
39        limit: Option<usize>,
40    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
41        // Wrap the execution plan in a GlobalLimitExec if a limit is provided.
42        // DataFusion trusts us to apply the limit - if we ignore it, too many rows
43        // will be returned to the client.
44        let plan: Arc<dyn ExecutionPlan> = self.log_stream.clone();
45        if let Some(fetch) = limit {
46            Ok(Arc::new(GlobalLimitExec::new(plan, 0, Some(fetch))))
47        } else {
48            Ok(plan)
49        }
50    }
51}