micromegas_analytics/dfext/
task_log_exec_plan.rs1use datafusion::arrow::datatypes::DataType;
2use datafusion::arrow::datatypes::Field;
3use datafusion::arrow::datatypes::Schema;
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::arrow::datatypes::TimeUnit;
6use datafusion::common::Statistics;
7use datafusion::common::internal_err;
8use datafusion::error::DataFusionError;
9use datafusion::execution::SendableRecordBatchStream;
10use datafusion::execution::TaskContext;
11use datafusion::physical_expr::EquivalenceProperties;
12use datafusion::physical_plan::DisplayAs;
13use datafusion::physical_plan::DisplayFormatType;
14use datafusion::physical_plan::ExecutionPlan;
15use datafusion::physical_plan::Partitioning;
16use datafusion::physical_plan::PlanProperties;
17use datafusion::physical_plan::execution_plan::Boundedness;
18use datafusion::physical_plan::execution_plan::EmissionType;
19use std::any::Any;
20use std::sync::Arc;
21use tokio::sync::mpsc;
22
23use super::async_log_stream::AsyncLogStream;
24
25pub type TaskSpawner =
27 dyn FnOnce() -> mpsc::Receiver<(chrono::DateTime<chrono::Utc>, String)> + Sync + Send;
28
29pub struct TaskLogExecPlan {
31 schema: SchemaRef,
32 cache: PlanProperties,
33 spawner: std::sync::Mutex<Option<Box<TaskSpawner>>>,
34}
35
36impl DisplayAs for TaskLogExecPlan {
37 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
38 match t {
39 DisplayFormatType::Default
40 | DisplayFormatType::Verbose
41 | DisplayFormatType::TreeRender => {
42 write!(f, "TaskLogExecPlan")
43 }
44 }
45 }
46}
47
48impl std::fmt::Debug for TaskLogExecPlan {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 write!(f, "TaskLogExecPlan")
51 }
52}
53
54impl TaskLogExecPlan {
55 pub fn new(spawner: Box<TaskSpawner>) -> Self {
56 let schema = Arc::new(Schema::new(vec![
57 Field::new(
58 "time",
59 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
60 false,
61 ),
62 Field::new("msg", DataType::Utf8, false),
63 ]));
64
65 let cache = PlanProperties::new(
66 EquivalenceProperties::new(Arc::clone(&schema)),
67 Partitioning::RoundRobinBatch(1),
68 EmissionType::Incremental,
69 Boundedness::Unbounded {
70 requires_infinite_memory: false,
71 },
72 );
73
74 Self {
75 schema,
76 cache,
77 spawner: std::sync::Mutex::new(Some(spawner)),
78 }
79 }
80}
81
82impl ExecutionPlan for TaskLogExecPlan {
83 fn name(&self) -> &'static str {
84 "LogExecPlan"
85 }
86
87 fn as_any(&self) -> &dyn Any {
88 self
89 }
90
91 fn schema(&self) -> SchemaRef {
92 self.schema.clone()
93 }
94
95 fn properties(&self) -> &PlanProperties {
96 &self.cache
97 }
98
99 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
100 vec![]
101 }
102
103 fn with_new_children(
104 self: Arc<Self>,
105 children: Vec<Arc<dyn ExecutionPlan>>,
106 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
107 if children.is_empty() {
108 Ok(self)
109 } else {
110 internal_err!("Children cannot be replaced in LogExecPlan")
111 }
112 }
113
114 fn execute(
115 &self,
116 partition: usize,
117 _context: Arc<TaskContext>,
118 ) -> datafusion::error::Result<SendableRecordBatchStream> {
119 if partition >= 1 {
120 return internal_err!("Invalid partition {partition} for LogExecPlan");
121 }
122
123 let mut spawner = self.spawner.lock().map_err(|_| {
124 DataFusionError::Execution("Error locking mutex in LogExecPlan".to_owned())
125 })?;
126 if let Some(fun) = spawner.take() {
127 drop(spawner);
128 Ok(Box::pin(AsyncLogStream::new(self.schema.clone(), fun())))
129 } else {
130 internal_err!("Spawner already taken in LogExecPlan")
131 }
132 }
133
134 fn statistics(&self) -> datafusion::error::Result<Statistics> {
135 Ok(Statistics::new_unknown(&self.schema))
136 }
137}