micromegas_analytics/lakehouse/
export_log_view.rs

1use super::{
2    batch_update::PartitionCreationStrategy,
3    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
4    lakehouse_context::LakehouseContext,
5    partition_cache::{NullPartitionProvider, PartitionCache},
6    query::make_session_context,
7    session_configurator::SessionConfigurator,
8    view::{PartitionSpec, View},
9    view_factory::ViewFactory,
10};
11use crate::{
12    lakehouse::{sql_partition_spec::fetch_sql_partition_spec, view::ViewMetadata},
13    record_batch_transformer::RecordBatchTransformer,
14    time::{TimeRange, datetime_to_scalar},
15};
16use anyhow::{Context, Result};
17use async_trait::async_trait;
18use chrono::{DateTime, TimeDelta, Utc};
19use datafusion::{
20    arrow::{
21        array::{PrimitiveBuilder, RecordBatch, StringBuilder},
22        datatypes::{DataType, Field, Int32Type, Schema, TimeUnit, TimestampNanosecondType},
23    },
24    execution::runtime_env::RuntimeEnv,
25    prelude::*,
26};
27use micromegas_ingestion::data_lake_connection::DataLakeConnection;
28use micromegas_tracing::levels::Level;
29use std::hash::Hash;
30use std::hash::Hasher;
31use std::{hash::DefaultHasher, sync::Arc};
32
33/// A builder for creating log entries for export.
34pub struct ExportLogBuilder {
35    times: PrimitiveBuilder<TimestampNanosecondType>,
36    levels: PrimitiveBuilder<Int32Type>,
37    msgs: StringBuilder,
38}
39
40impl ExportLogBuilder {
41    #[expect(clippy::new_without_default)]
42    pub fn new() -> Self {
43        Self {
44            times: PrimitiveBuilder::new(),
45            levels: PrimitiveBuilder::new(),
46            msgs: StringBuilder::new(),
47        }
48    }
49
50    pub fn append(&mut self, level: Level, msg: &str) {
51        let now = Utc::now();
52        self.times
53            .append_value(now.timestamp_nanos_opt().unwrap_or_default());
54        self.levels.append_value(level as i32);
55        self.msgs.append_value(msg);
56    }
57
58    pub fn finish(mut self) -> Result<RecordBatch> {
59        RecordBatch::try_new(
60            make_export_log_schema(),
61            vec![
62                Arc::new(self.times.finish().with_timezone_utc()),
63                Arc::new(self.levels.finish()),
64                Arc::new(self.msgs.finish()),
65            ],
66        )
67        .with_context(|| "building record batch")
68    }
69}
70
71/// A view for exporting log data.
72#[derive(Debug)]
73pub struct ExportLogView {
74    view_set_name: Arc<String>,
75    view_instance_id: Arc<String>,
76    time_column_name: Arc<String>,
77    count_src_query: Arc<String>,
78    extract_query: Arc<String>,
79    exporter: Arc<dyn RecordBatchTransformer>,
80    log_schema: Arc<Schema>,
81    view_factory: Arc<ViewFactory>,
82    session_configurator: Arc<dyn SessionConfigurator>,
83    update_group: Option<i32>,
84    max_partition_delta_from_source: TimeDelta,
85    max_partition_delta_from_merge: TimeDelta,
86}
87
88/// Creates the Arrow schema for the export log.
89pub fn make_export_log_schema() -> Arc<Schema> {
90    Arc::new(Schema::new(vec![
91        Field::new(
92            "time",
93            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
94            false,
95        ),
96        Field::new("level", DataType::Int32, false),
97        Field::new("msg", DataType::Utf8, false),
98    ]))
99}
100
101impl ExportLogView {
102    #[expect(clippy::too_many_arguments)]
103    pub async fn new(
104        runtime: Arc<RuntimeEnv>,
105        view_set_name: Arc<String>,
106        count_src_query: Arc<String>,
107        extract_query: Arc<String>,
108        exporter: Arc<dyn RecordBatchTransformer>,
109        lake: Arc<DataLakeConnection>,
110        view_factory: Arc<ViewFactory>,
111        session_configurator: Arc<dyn SessionConfigurator>,
112        update_group: Option<i32>,
113        max_partition_delta_from_source: TimeDelta,
114        max_partition_delta_from_merge: TimeDelta,
115    ) -> Result<Self> {
116        let null_part_provider = Arc::new(NullPartitionProvider {});
117        let lakehouse = Arc::new(LakehouseContext::new(lake.clone(), runtime.clone()));
118        let ctx = make_session_context(
119            lakehouse,
120            null_part_provider,
121            None,
122            view_factory.clone(),
123            session_configurator.clone(),
124        )
125        .await
126        .with_context(|| "make_session_context")?;
127        let now_str = Utc::now().to_rfc3339();
128        let sql = extract_query
129            .replace("{begin}", &now_str)
130            .replace("{end}", &now_str);
131        let _extracted_df = ctx.sql(&sql).await?;
132        Ok(Self {
133            view_set_name,
134            view_instance_id: Arc::new(String::from("global")),
135            time_column_name: Arc::new(String::from("time")),
136            count_src_query,
137            extract_query,
138            exporter,
139            log_schema: make_export_log_schema(),
140            view_factory,
141            session_configurator,
142            update_group,
143            max_partition_delta_from_source,
144            max_partition_delta_from_merge,
145        })
146    }
147}
148
149#[async_trait]
150impl View for ExportLogView {
151    fn get_view_set_name(&self) -> Arc<String> {
152        self.view_set_name.clone()
153    }
154
155    fn get_view_instance_id(&self) -> Arc<String> {
156        self.view_instance_id.clone()
157    }
158
159    async fn make_batch_partition_spec(
160        &self,
161        lakehouse: Arc<LakehouseContext>,
162        existing_partitions: Arc<PartitionCache>,
163        insert_range: TimeRange,
164    ) -> Result<Arc<dyn PartitionSpec>> {
165        let view_meta = ViewMetadata {
166            view_set_name: self.get_view_set_name(),
167            view_instance_id: self.get_view_instance_id(),
168            file_schema_hash: self.get_file_schema_hash(),
169        };
170        let partitions_in_range = Arc::new(existing_partitions.filter_insert_range(insert_range));
171        let ctx = make_session_context(
172            lakehouse,
173            partitions_in_range.clone(),
174            None,
175            self.view_factory.clone(),
176            self.session_configurator.clone(),
177        )
178        .await
179        .with_context(|| "make_session_context")?;
180        let count_src_sql = self
181            .count_src_query
182            .replace("{begin}", &insert_range.begin.to_rfc3339())
183            .replace("{end}", &insert_range.end.to_rfc3339());
184        let extract_sql = self
185            .extract_query
186            .replace("{begin}", &insert_range.begin.to_rfc3339())
187            .replace("{end}", &insert_range.end.to_rfc3339());
188        Ok(Arc::new(
189            fetch_sql_partition_spec(
190                ctx,
191                self.exporter.clone(),
192                self.get_time_bounds(),
193                self.log_schema.clone(),
194                count_src_sql,
195                extract_sql,
196                view_meta,
197                insert_range,
198            )
199            .await
200            .with_context(|| "fetch_sql_partition_spec")?,
201        ))
202    }
203
204    fn get_file_schema_hash(&self) -> Vec<u8> {
205        let mut hasher = DefaultHasher::new();
206        self.log_schema.hash(&mut hasher);
207        hasher.finish().to_le_bytes().to_vec()
208    }
209
210    fn get_file_schema(&self) -> Arc<Schema> {
211        self.log_schema.clone()
212    }
213
214    async fn jit_update(
215        &self,
216        _lakehouse: Arc<LakehouseContext>,
217        _query_range: Option<TimeRange>,
218    ) -> Result<()> {
219        Ok(())
220    }
221
222    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
223        Ok(vec![
224            col(&**self.time_column_name).lt_eq(lit(datetime_to_scalar(end))),
225            col(&**self.time_column_name).gt_eq(lit(datetime_to_scalar(begin))),
226        ])
227    }
228
229    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
230        Arc::new(NamedColumnsTimeBounds::new(
231            self.time_column_name.clone(),
232            self.time_column_name.clone(),
233        ))
234    }
235
236    fn get_update_group(&self) -> Option<i32> {
237        self.update_group
238    }
239
240    fn get_max_partition_time_delta(&self, strategy: &PartitionCreationStrategy) -> TimeDelta {
241        match strategy {
242            PartitionCreationStrategy::Abort | PartitionCreationStrategy::CreateFromSource => {
243                self.max_partition_delta_from_source
244            }
245            PartitionCreationStrategy::MergeExisting(_partitions) => {
246                self.max_partition_delta_from_merge
247            }
248        }
249    }
250}