micromegas_analytics/lakehouse/
log_view.rs

1use super::{
2    batch_update::PartitionCreationStrategy,
3    block_partition_spec::{BlockPartitionSpec, BlockProcessor, BlockProcessorMap},
4    blocks_view::BlocksView,
5    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
6    jit_partitions::{JitPartitionConfig, write_partition_from_blocks},
7    lakehouse_context::LakehouseContext,
8    log_block_processor::LogBlockProcessor,
9    otel::logs_block_processor::OtelLogsBlockProcessor,
10    partition_cache::PartitionCache,
11    partition_source_data::fetch_partition_source_data,
12    view::{PartitionSpec, View, ViewMetadata},
13    view_factory::ViewMaker,
14};
15use crate::{
16    lakehouse::jit_partitions::{generate_process_jit_partitions, is_jit_partition_up_to_date},
17    log_entries_table::log_table_schema,
18    metadata::find_process,
19    time::{TimeRange, datetime_to_scalar},
20};
21use anyhow::{Context, Result};
22use async_trait::async_trait;
23use chrono::{DateTime, TimeDelta, Utc};
24use datafusion::{
25    arrow::datatypes::Schema,
26    logical_expr::{Between, Expr, col},
27};
28use micromegas_ingestion::web_ingestion_service::{FORMAT_OTLP_LOGS, FORMAT_TRANSIT};
29use micromegas_tracing::prelude::*;
30use std::collections::HashMap;
31use std::sync::Arc;
32use uuid::Uuid;
33
34const VIEW_SET_NAME: &str = "log_entries";
35const SCHEMA_VERSION: u8 = 5;
36lazy_static::lazy_static! {
37    static ref TIME_COLUMN: Arc<String> = Arc::new( String::from("time"));
38}
39
40/// Block-processor map covering native transit + OTel log blocks.
41fn log_processors() -> Arc<BlockProcessorMap> {
42    let mut m: BlockProcessorMap = HashMap::new();
43    m.insert(
44        FORMAT_TRANSIT,
45        Arc::new(LogBlockProcessor {}) as Arc<dyn BlockProcessor>,
46    );
47    m.insert(
48        FORMAT_OTLP_LOGS,
49        Arc::new(OtelLogsBlockProcessor {}) as Arc<dyn BlockProcessor>,
50    );
51    Arc::new(m)
52}
53
54/// A `ViewMaker` for creating `LogView` instances.
55#[derive(Debug)]
56pub struct LogViewMaker {}
57
58impl ViewMaker for LogViewMaker {
59    fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
60        Ok(Arc::new(LogView::new(view_instance_id)?))
61    }
62
63    fn get_schema_hash(&self) -> Vec<u8> {
64        vec![SCHEMA_VERSION]
65    }
66
67    fn get_schema(&self) -> Arc<Schema> {
68        Arc::new(log_table_schema())
69    }
70}
71
72/// A view of log entries.
73#[derive(Debug)]
74pub struct LogView {
75    view_set_name: Arc<String>,
76    view_instance_id: Arc<String>,
77    process_id: Option<sqlx::types::Uuid>,
78}
79
80impl LogView {
81    pub fn new(view_instance_id: &str) -> Result<Self> {
82        let process_id = if view_instance_id == "global" {
83            None
84        } else {
85            Some(Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?)
86        };
87
88        Ok(Self {
89            view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
90            view_instance_id: Arc::new(view_instance_id.into()),
91            process_id,
92        })
93    }
94}
95
96#[async_trait]
97impl View for LogView {
98    fn get_view_set_name(&self) -> Arc<String> {
99        self.view_set_name.clone()
100    }
101
102    fn get_view_instance_id(&self) -> Arc<String> {
103        self.view_instance_id.clone()
104    }
105
106    async fn make_batch_partition_spec(
107        &self,
108        lakehouse: Arc<LakehouseContext>,
109        existing_partitions: Arc<PartitionCache>,
110        insert_range: TimeRange,
111    ) -> Result<Arc<dyn PartitionSpec>> {
112        if *self.view_instance_id != "global" {
113            anyhow::bail!("not supported for jit queries... should it?");
114        }
115        let source_data = Arc::new(
116            fetch_partition_source_data(
117                lakehouse.clone(),
118                existing_partitions,
119                insert_range,
120                "log",
121            )
122            .await
123            .with_context(|| "fetch_partition_source_data")?,
124        );
125        Ok(Arc::new(BlockPartitionSpec {
126            view_metadata: ViewMetadata {
127                view_set_name: self.view_set_name.clone(),
128                view_instance_id: self.view_instance_id.clone(),
129                file_schema_hash: self.get_file_schema_hash(),
130            },
131            schema: self.get_file_schema(),
132            insert_range,
133            source_data,
134            block_processors: log_processors(),
135        }))
136    }
137
138    fn get_file_schema_hash(&self) -> Vec<u8> {
139        vec![SCHEMA_VERSION]
140    }
141
142    fn get_file_schema(&self) -> Arc<Schema> {
143        Arc::new(log_table_schema())
144    }
145
146    #[span_fn]
147    async fn jit_update(
148        &self,
149        lakehouse: Arc<LakehouseContext>,
150        query_range: Option<TimeRange>,
151    ) -> Result<()> {
152        if *self.view_instance_id == "global" {
153            // this view instance is updated using the deamon
154            return Ok(());
155        }
156        let process = Arc::new(
157            find_process(
158                &lakehouse.lake().db_pool,
159                &self
160                    .process_id
161                    .with_context(|| "getting a view's process_id")?,
162            )
163            .await
164            .with_context(|| "find_process")?,
165        );
166        let query_range =
167            query_range.unwrap_or_else(|| TimeRange::new(process.start_time, chrono::Utc::now()));
168
169        let blocks_view = BlocksView::new()?;
170        let all_partitions = generate_process_jit_partitions(
171            &JitPartitionConfig::default(),
172            lakehouse.clone(),
173            &blocks_view,
174            &query_range,
175            process.clone(),
176            "log",
177        )
178        .await
179        .with_context(|| "generate_process_jit_partitions")?;
180        let view_meta = ViewMetadata {
181            view_set_name: self.get_view_set_name(),
182            view_instance_id: self.get_view_instance_id(),
183            file_schema_hash: self.get_file_schema_hash(),
184        };
185        let block_processors = log_processors();
186
187        for part in all_partitions {
188            if !is_jit_partition_up_to_date(&lakehouse.lake().db_pool, view_meta.clone(), &part)
189                .await?
190            {
191                write_partition_from_blocks(
192                    lakehouse.lake().clone(),
193                    view_meta.clone(),
194                    self.get_file_schema(),
195                    part,
196                    block_processors.clone(),
197                )
198                .await
199                .with_context(|| "write_partition_from_blocks")?;
200            }
201        }
202        Ok(())
203    }
204
205    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
206        Ok(vec![Expr::Between(Between::new(
207            col("time").into(),
208            false,
209            Expr::Literal(datetime_to_scalar(begin), None).into(),
210            Expr::Literal(datetime_to_scalar(end), None).into(),
211        ))])
212    }
213
214    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
215        Arc::new(NamedColumnsTimeBounds::new(
216            TIME_COLUMN.clone(),
217            TIME_COLUMN.clone(),
218        ))
219    }
220
221    fn get_update_group(&self) -> Option<i32> {
222        if *(self.get_view_instance_id()) == "global" {
223            Some(2000)
224        } else {
225            None
226        }
227    }
228
229    fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
230        TimeDelta::hours(1)
231    }
232}