micromegas_analytics/dfext/
task_log_exec_plan.rs

1use datafusion::arrow::datatypes::DataType;
2use datafusion::arrow::datatypes::Field;
3use datafusion::arrow::datatypes::Schema;
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::arrow::datatypes::TimeUnit;
6use datafusion::common::Statistics;
7use datafusion::common::internal_err;
8use datafusion::error::DataFusionError;
9use datafusion::execution::SendableRecordBatchStream;
10use datafusion::execution::TaskContext;
11use datafusion::physical_expr::EquivalenceProperties;
12use datafusion::physical_plan::DisplayAs;
13use datafusion::physical_plan::DisplayFormatType;
14use datafusion::physical_plan::ExecutionPlan;
15use datafusion::physical_plan::Partitioning;
16use datafusion::physical_plan::PlanProperties;
17use datafusion::physical_plan::execution_plan::Boundedness;
18use datafusion::physical_plan::execution_plan::EmissionType;
19use std::any::Any;
20use std::sync::Arc;
21use tokio::sync::mpsc;
22
23use super::async_log_stream::AsyncLogStream;
24
25/// A type alias for a function that spawns a log message receiver.
26pub type TaskSpawner =
27    dyn FnOnce() -> mpsc::Receiver<(chrono::DateTime<chrono::Utc>, String)> + Sync + Send;
28
29/// An `ExecutionPlan` that provides a stream of log messages.
30pub struct TaskLogExecPlan {
31    schema: SchemaRef,
32    cache: PlanProperties,
33    spawner: std::sync::Mutex<Option<Box<TaskSpawner>>>,
34}
35
36impl DisplayAs for TaskLogExecPlan {
37    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
38        match t {
39            DisplayFormatType::Default
40            | DisplayFormatType::Verbose
41            | DisplayFormatType::TreeRender => {
42                write!(f, "TaskLogExecPlan")
43            }
44        }
45    }
46}
47
48impl std::fmt::Debug for TaskLogExecPlan {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        write!(f, "TaskLogExecPlan")
51    }
52}
53
54impl TaskLogExecPlan {
55    pub fn new(spawner: Box<TaskSpawner>) -> Self {
56        let schema = Arc::new(Schema::new(vec![
57            Field::new(
58                "time",
59                DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
60                false,
61            ),
62            Field::new("msg", DataType::Utf8, false),
63        ]));
64
65        let cache = PlanProperties::new(
66            EquivalenceProperties::new(Arc::clone(&schema)),
67            Partitioning::RoundRobinBatch(1),
68            EmissionType::Incremental,
69            Boundedness::Unbounded {
70                requires_infinite_memory: false,
71            },
72        );
73
74        Self {
75            schema,
76            cache,
77            spawner: std::sync::Mutex::new(Some(spawner)),
78        }
79    }
80}
81
82impl ExecutionPlan for TaskLogExecPlan {
83    fn name(&self) -> &'static str {
84        "LogExecPlan"
85    }
86
87    fn as_any(&self) -> &dyn Any {
88        self
89    }
90
91    fn schema(&self) -> SchemaRef {
92        self.schema.clone()
93    }
94
95    fn properties(&self) -> &PlanProperties {
96        &self.cache
97    }
98
99    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
100        vec![]
101    }
102
103    fn with_new_children(
104        self: Arc<Self>,
105        children: Vec<Arc<dyn ExecutionPlan>>,
106    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
107        if children.is_empty() {
108            Ok(self)
109        } else {
110            internal_err!("Children cannot be replaced in LogExecPlan")
111        }
112    }
113
114    fn execute(
115        &self,
116        partition: usize,
117        _context: Arc<TaskContext>,
118    ) -> datafusion::error::Result<SendableRecordBatchStream> {
119        if partition >= 1 {
120            return internal_err!("Invalid partition {partition} for LogExecPlan");
121        }
122
123        let mut spawner = self.spawner.lock().map_err(|_| {
124            DataFusionError::Execution("Error locking mutex in LogExecPlan".to_owned())
125        })?;
126        if let Some(fun) = spawner.take() {
127            drop(spawner);
128            Ok(Box::pin(AsyncLogStream::new(self.schema.clone(), fun())))
129        } else {
130            internal_err!("Spawner already taken in LogExecPlan")
131        }
132    }
133
134    fn statistics(&self) -> datafusion::error::Result<Statistics> {
135        Ok(Statistics::new_unknown(&self.schema))
136    }
137}