micromegas_analytics/dfext/
log_stream_table_provider.rs1use super::task_log_exec_plan::TaskLogExecPlan;
2use async_trait::async_trait;
3use datafusion::arrow::datatypes::SchemaRef;
4use datafusion::catalog::Session;
5use datafusion::catalog::TableProvider;
6use datafusion::datasource::TableType;
7use datafusion::physical_plan::ExecutionPlan;
8use datafusion::physical_plan::limit::GlobalLimitExec;
9use datafusion::prelude::Expr;
10use std::any::Any;
11use std::sync::Arc;
12
13#[derive(Debug)]
15pub struct LogStreamTableProvider {
16 pub log_stream: Arc<TaskLogExecPlan>,
18}
19
20#[async_trait]
21impl TableProvider for LogStreamTableProvider {
22 fn as_any(&self) -> &dyn Any {
23 self
24 }
25
26 fn schema(&self) -> SchemaRef {
27 self.log_stream.schema()
28 }
29
30 fn table_type(&self) -> TableType {
31 TableType::Temporary
32 }
33
34 async fn scan(
35 &self,
36 _state: &dyn Session,
37 _projection: Option<&Vec<usize>>,
38 _filters: &[Expr],
39 limit: Option<usize>,
40 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
41 let plan: Arc<dyn ExecutionPlan> = self.log_stream.clone();
45 if let Some(fetch) = limit {
46 Ok(Arc::new(GlobalLimitExec::new(plan, 0, Some(fetch))))
47 } else {
48 Ok(plan)
49 }
50 }
51}