micromegas_analytics/lakehouse/
log_view.rs1use super::{
2 batch_update::PartitionCreationStrategy,
3 block_partition_spec::{BlockPartitionSpec, BlockProcessor, BlockProcessorMap},
4 blocks_view::BlocksView,
5 dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
6 jit_partitions::{JitPartitionConfig, write_partition_from_blocks},
7 lakehouse_context::LakehouseContext,
8 log_block_processor::LogBlockProcessor,
9 otel::logs_block_processor::OtelLogsBlockProcessor,
10 partition_cache::PartitionCache,
11 partition_source_data::fetch_partition_source_data,
12 view::{PartitionSpec, View, ViewMetadata},
13 view_factory::ViewMaker,
14};
15use crate::{
16 lakehouse::jit_partitions::{generate_process_jit_partitions, is_jit_partition_up_to_date},
17 log_entries_table::log_table_schema,
18 metadata::find_process,
19 time::{TimeRange, datetime_to_scalar},
20};
21use anyhow::{Context, Result};
22use async_trait::async_trait;
23use chrono::{DateTime, TimeDelta, Utc};
24use datafusion::{
25 arrow::datatypes::Schema,
26 logical_expr::{Between, Expr, col},
27};
28use micromegas_ingestion::web_ingestion_service::{FORMAT_OTLP_LOGS, FORMAT_TRANSIT};
29use micromegas_tracing::prelude::*;
30use std::collections::HashMap;
31use std::sync::Arc;
32use uuid::Uuid;
33
34const VIEW_SET_NAME: &str = "log_entries";
35const SCHEMA_VERSION: u8 = 5;
36lazy_static::lazy_static! {
37 static ref TIME_COLUMN: Arc<String> = Arc::new( String::from("time"));
38}
39
40fn log_processors() -> Arc<BlockProcessorMap> {
42 let mut m: BlockProcessorMap = HashMap::new();
43 m.insert(
44 FORMAT_TRANSIT,
45 Arc::new(LogBlockProcessor {}) as Arc<dyn BlockProcessor>,
46 );
47 m.insert(
48 FORMAT_OTLP_LOGS,
49 Arc::new(OtelLogsBlockProcessor {}) as Arc<dyn BlockProcessor>,
50 );
51 Arc::new(m)
52}
53
54#[derive(Debug)]
56pub struct LogViewMaker {}
57
58impl ViewMaker for LogViewMaker {
59 fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
60 Ok(Arc::new(LogView::new(view_instance_id)?))
61 }
62
63 fn get_schema_hash(&self) -> Vec<u8> {
64 vec![SCHEMA_VERSION]
65 }
66
67 fn get_schema(&self) -> Arc<Schema> {
68 Arc::new(log_table_schema())
69 }
70}
71
72#[derive(Debug)]
74pub struct LogView {
75 view_set_name: Arc<String>,
76 view_instance_id: Arc<String>,
77 process_id: Option<sqlx::types::Uuid>,
78}
79
80impl LogView {
81 pub fn new(view_instance_id: &str) -> Result<Self> {
82 let process_id = if view_instance_id == "global" {
83 None
84 } else {
85 Some(Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?)
86 };
87
88 Ok(Self {
89 view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
90 view_instance_id: Arc::new(view_instance_id.into()),
91 process_id,
92 })
93 }
94}
95
96#[async_trait]
97impl View for LogView {
98 fn get_view_set_name(&self) -> Arc<String> {
99 self.view_set_name.clone()
100 }
101
102 fn get_view_instance_id(&self) -> Arc<String> {
103 self.view_instance_id.clone()
104 }
105
106 async fn make_batch_partition_spec(
107 &self,
108 lakehouse: Arc<LakehouseContext>,
109 existing_partitions: Arc<PartitionCache>,
110 insert_range: TimeRange,
111 ) -> Result<Arc<dyn PartitionSpec>> {
112 if *self.view_instance_id != "global" {
113 anyhow::bail!("not supported for jit queries... should it?");
114 }
115 let source_data = Arc::new(
116 fetch_partition_source_data(
117 lakehouse.clone(),
118 existing_partitions,
119 insert_range,
120 "log",
121 )
122 .await
123 .with_context(|| "fetch_partition_source_data")?,
124 );
125 Ok(Arc::new(BlockPartitionSpec {
126 view_metadata: ViewMetadata {
127 view_set_name: self.view_set_name.clone(),
128 view_instance_id: self.view_instance_id.clone(),
129 file_schema_hash: self.get_file_schema_hash(),
130 },
131 schema: self.get_file_schema(),
132 insert_range,
133 source_data,
134 block_processors: log_processors(),
135 }))
136 }
137
138 fn get_file_schema_hash(&self) -> Vec<u8> {
139 vec![SCHEMA_VERSION]
140 }
141
142 fn get_file_schema(&self) -> Arc<Schema> {
143 Arc::new(log_table_schema())
144 }
145
146 #[span_fn]
147 async fn jit_update(
148 &self,
149 lakehouse: Arc<LakehouseContext>,
150 query_range: Option<TimeRange>,
151 ) -> Result<()> {
152 if *self.view_instance_id == "global" {
153 return Ok(());
155 }
156 let process = Arc::new(
157 find_process(
158 &lakehouse.lake().db_pool,
159 &self
160 .process_id
161 .with_context(|| "getting a view's process_id")?,
162 )
163 .await
164 .with_context(|| "find_process")?,
165 );
166 let query_range =
167 query_range.unwrap_or_else(|| TimeRange::new(process.start_time, chrono::Utc::now()));
168
169 let blocks_view = BlocksView::new()?;
170 let all_partitions = generate_process_jit_partitions(
171 &JitPartitionConfig::default(),
172 lakehouse.clone(),
173 &blocks_view,
174 &query_range,
175 process.clone(),
176 "log",
177 )
178 .await
179 .with_context(|| "generate_process_jit_partitions")?;
180 let view_meta = ViewMetadata {
181 view_set_name: self.get_view_set_name(),
182 view_instance_id: self.get_view_instance_id(),
183 file_schema_hash: self.get_file_schema_hash(),
184 };
185 let block_processors = log_processors();
186
187 for part in all_partitions {
188 if !is_jit_partition_up_to_date(&lakehouse.lake().db_pool, view_meta.clone(), &part)
189 .await?
190 {
191 write_partition_from_blocks(
192 lakehouse.lake().clone(),
193 view_meta.clone(),
194 self.get_file_schema(),
195 part,
196 block_processors.clone(),
197 )
198 .await
199 .with_context(|| "write_partition_from_blocks")?;
200 }
201 }
202 Ok(())
203 }
204
205 fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
206 Ok(vec![Expr::Between(Between::new(
207 col("time").into(),
208 false,
209 Expr::Literal(datetime_to_scalar(begin), None).into(),
210 Expr::Literal(datetime_to_scalar(end), None).into(),
211 ))])
212 }
213
214 fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
215 Arc::new(NamedColumnsTimeBounds::new(
216 TIME_COLUMN.clone(),
217 TIME_COLUMN.clone(),
218 ))
219 }
220
221 fn get_update_group(&self) -> Option<i32> {
222 if *(self.get_view_instance_id()) == "global" {
223 Some(2000)
224 } else {
225 None
226 }
227 }
228
229 fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
230 TimeDelta::hours(1)
231 }
232}