micromegas_analytics/lakehouse/
export_log_view.rs1use super::{
2 batch_update::PartitionCreationStrategy,
3 dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
4 lakehouse_context::LakehouseContext,
5 partition_cache::{NullPartitionProvider, PartitionCache},
6 query::make_session_context,
7 session_configurator::SessionConfigurator,
8 view::{PartitionSpec, View},
9 view_factory::ViewFactory,
10};
11use crate::{
12 lakehouse::{sql_partition_spec::fetch_sql_partition_spec, view::ViewMetadata},
13 record_batch_transformer::RecordBatchTransformer,
14 time::{TimeRange, datetime_to_scalar},
15};
16use anyhow::{Context, Result};
17use async_trait::async_trait;
18use chrono::{DateTime, TimeDelta, Utc};
19use datafusion::{
20 arrow::{
21 array::{PrimitiveBuilder, RecordBatch, StringBuilder},
22 datatypes::{DataType, Field, Int32Type, Schema, TimeUnit, TimestampNanosecondType},
23 },
24 execution::runtime_env::RuntimeEnv,
25 prelude::*,
26};
27use micromegas_ingestion::data_lake_connection::DataLakeConnection;
28use micromegas_tracing::levels::Level;
29use std::hash::Hash;
30use std::hash::Hasher;
31use std::{hash::DefaultHasher, sync::Arc};
32
33pub struct ExportLogBuilder {
35 times: PrimitiveBuilder<TimestampNanosecondType>,
36 levels: PrimitiveBuilder<Int32Type>,
37 msgs: StringBuilder,
38}
39
40impl ExportLogBuilder {
41 #[expect(clippy::new_without_default)]
42 pub fn new() -> Self {
43 Self {
44 times: PrimitiveBuilder::new(),
45 levels: PrimitiveBuilder::new(),
46 msgs: StringBuilder::new(),
47 }
48 }
49
50 pub fn append(&mut self, level: Level, msg: &str) {
51 let now = Utc::now();
52 self.times
53 .append_value(now.timestamp_nanos_opt().unwrap_or_default());
54 self.levels.append_value(level as i32);
55 self.msgs.append_value(msg);
56 }
57
58 pub fn finish(mut self) -> Result<RecordBatch> {
59 RecordBatch::try_new(
60 make_export_log_schema(),
61 vec![
62 Arc::new(self.times.finish().with_timezone_utc()),
63 Arc::new(self.levels.finish()),
64 Arc::new(self.msgs.finish()),
65 ],
66 )
67 .with_context(|| "building record batch")
68 }
69}
70
71#[derive(Debug)]
73pub struct ExportLogView {
74 view_set_name: Arc<String>,
75 view_instance_id: Arc<String>,
76 time_column_name: Arc<String>,
77 count_src_query: Arc<String>,
78 extract_query: Arc<String>,
79 exporter: Arc<dyn RecordBatchTransformer>,
80 log_schema: Arc<Schema>,
81 view_factory: Arc<ViewFactory>,
82 session_configurator: Arc<dyn SessionConfigurator>,
83 update_group: Option<i32>,
84 max_partition_delta_from_source: TimeDelta,
85 max_partition_delta_from_merge: TimeDelta,
86}
87
88pub fn make_export_log_schema() -> Arc<Schema> {
90 Arc::new(Schema::new(vec![
91 Field::new(
92 "time",
93 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
94 false,
95 ),
96 Field::new("level", DataType::Int32, false),
97 Field::new("msg", DataType::Utf8, false),
98 ]))
99}
100
101impl ExportLogView {
102 #[expect(clippy::too_many_arguments)]
103 pub async fn new(
104 runtime: Arc<RuntimeEnv>,
105 view_set_name: Arc<String>,
106 count_src_query: Arc<String>,
107 extract_query: Arc<String>,
108 exporter: Arc<dyn RecordBatchTransformer>,
109 lake: Arc<DataLakeConnection>,
110 view_factory: Arc<ViewFactory>,
111 session_configurator: Arc<dyn SessionConfigurator>,
112 update_group: Option<i32>,
113 max_partition_delta_from_source: TimeDelta,
114 max_partition_delta_from_merge: TimeDelta,
115 ) -> Result<Self> {
116 let null_part_provider = Arc::new(NullPartitionProvider {});
117 let lakehouse = Arc::new(LakehouseContext::new(lake.clone(), runtime.clone()));
118 let ctx = make_session_context(
119 lakehouse,
120 null_part_provider,
121 None,
122 view_factory.clone(),
123 session_configurator.clone(),
124 )
125 .await
126 .with_context(|| "make_session_context")?;
127 let now_str = Utc::now().to_rfc3339();
128 let sql = extract_query
129 .replace("{begin}", &now_str)
130 .replace("{end}", &now_str);
131 let _extracted_df = ctx.sql(&sql).await?;
132 Ok(Self {
133 view_set_name,
134 view_instance_id: Arc::new(String::from("global")),
135 time_column_name: Arc::new(String::from("time")),
136 count_src_query,
137 extract_query,
138 exporter,
139 log_schema: make_export_log_schema(),
140 view_factory,
141 session_configurator,
142 update_group,
143 max_partition_delta_from_source,
144 max_partition_delta_from_merge,
145 })
146 }
147}
148
149#[async_trait]
150impl View for ExportLogView {
151 fn get_view_set_name(&self) -> Arc<String> {
152 self.view_set_name.clone()
153 }
154
155 fn get_view_instance_id(&self) -> Arc<String> {
156 self.view_instance_id.clone()
157 }
158
159 async fn make_batch_partition_spec(
160 &self,
161 lakehouse: Arc<LakehouseContext>,
162 existing_partitions: Arc<PartitionCache>,
163 insert_range: TimeRange,
164 ) -> Result<Arc<dyn PartitionSpec>> {
165 let view_meta = ViewMetadata {
166 view_set_name: self.get_view_set_name(),
167 view_instance_id: self.get_view_instance_id(),
168 file_schema_hash: self.get_file_schema_hash(),
169 };
170 let partitions_in_range = Arc::new(existing_partitions.filter_insert_range(insert_range));
171 let ctx = make_session_context(
172 lakehouse,
173 partitions_in_range.clone(),
174 None,
175 self.view_factory.clone(),
176 self.session_configurator.clone(),
177 )
178 .await
179 .with_context(|| "make_session_context")?;
180 let count_src_sql = self
181 .count_src_query
182 .replace("{begin}", &insert_range.begin.to_rfc3339())
183 .replace("{end}", &insert_range.end.to_rfc3339());
184 let extract_sql = self
185 .extract_query
186 .replace("{begin}", &insert_range.begin.to_rfc3339())
187 .replace("{end}", &insert_range.end.to_rfc3339());
188 Ok(Arc::new(
189 fetch_sql_partition_spec(
190 ctx,
191 self.exporter.clone(),
192 self.get_time_bounds(),
193 self.log_schema.clone(),
194 count_src_sql,
195 extract_sql,
196 view_meta,
197 insert_range,
198 )
199 .await
200 .with_context(|| "fetch_sql_partition_spec")?,
201 ))
202 }
203
204 fn get_file_schema_hash(&self) -> Vec<u8> {
205 let mut hasher = DefaultHasher::new();
206 self.log_schema.hash(&mut hasher);
207 hasher.finish().to_le_bytes().to_vec()
208 }
209
210 fn get_file_schema(&self) -> Arc<Schema> {
211 self.log_schema.clone()
212 }
213
214 async fn jit_update(
215 &self,
216 _lakehouse: Arc<LakehouseContext>,
217 _query_range: Option<TimeRange>,
218 ) -> Result<()> {
219 Ok(())
220 }
221
222 fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
223 Ok(vec![
224 col(&**self.time_column_name).lt_eq(lit(datetime_to_scalar(end))),
225 col(&**self.time_column_name).gt_eq(lit(datetime_to_scalar(begin))),
226 ])
227 }
228
229 fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
230 Arc::new(NamedColumnsTimeBounds::new(
231 self.time_column_name.clone(),
232 self.time_column_name.clone(),
233 ))
234 }
235
236 fn get_update_group(&self) -> Option<i32> {
237 self.update_group
238 }
239
240 fn get_max_partition_time_delta(&self, strategy: &PartitionCreationStrategy) -> TimeDelta {
241 match strategy {
242 PartitionCreationStrategy::Abort | PartitionCreationStrategy::CreateFromSource => {
243 self.max_partition_delta_from_source
244 }
245 PartitionCreationStrategy::MergeExisting(_partitions) => {
246 self.max_partition_delta_from_merge
247 }
248 }
249 }
250}