micromegas_analytics/lakehouse/
log_view.rs1use super::{
2 batch_update::PartitionCreationStrategy,
3 block_partition_spec::BlockPartitionSpec,
4 blocks_view::BlocksView,
5 dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
6 jit_partitions::{JitPartitionConfig, write_partition_from_blocks},
7 lakehouse_context::LakehouseContext,
8 log_block_processor::LogBlockProcessor,
9 partition_cache::PartitionCache,
10 partition_source_data::fetch_partition_source_data,
11 view::{PartitionSpec, View, ViewMetadata},
12 view_factory::ViewMaker,
13};
14use crate::{
15 lakehouse::jit_partitions::{generate_process_jit_partitions, is_jit_partition_up_to_date},
16 log_entries_table::log_table_schema,
17 metadata::find_process,
18 time::{TimeRange, datetime_to_scalar},
19};
20use anyhow::{Context, Result};
21use async_trait::async_trait;
22use chrono::{DateTime, TimeDelta, Utc};
23use datafusion::{
24 arrow::datatypes::Schema,
25 logical_expr::{Between, Expr, col},
26};
27use micromegas_tracing::prelude::*;
28use std::sync::Arc;
29use uuid::Uuid;
30
31const VIEW_SET_NAME: &str = "log_entries";
32const SCHEMA_VERSION: u8 = 5;
33lazy_static::lazy_static! {
34 static ref TIME_COLUMN: Arc<String> = Arc::new( String::from("time"));
35}
36
37#[derive(Debug)]
39pub struct LogViewMaker {}
40
41impl ViewMaker for LogViewMaker {
42 fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
43 Ok(Arc::new(LogView::new(view_instance_id)?))
44 }
45
46 fn get_schema_hash(&self) -> Vec<u8> {
47 vec![SCHEMA_VERSION]
48 }
49
50 fn get_schema(&self) -> Arc<Schema> {
51 Arc::new(log_table_schema())
52 }
53}
54
55#[derive(Debug)]
57pub struct LogView {
58 view_set_name: Arc<String>,
59 view_instance_id: Arc<String>,
60 process_id: Option<sqlx::types::Uuid>,
61}
62
63impl LogView {
64 pub fn new(view_instance_id: &str) -> Result<Self> {
65 let process_id = if view_instance_id == "global" {
66 None
67 } else {
68 Some(Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?)
69 };
70
71 Ok(Self {
72 view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
73 view_instance_id: Arc::new(view_instance_id.into()),
74 process_id,
75 })
76 }
77}
78
79#[async_trait]
80impl View for LogView {
81 fn get_view_set_name(&self) -> Arc<String> {
82 self.view_set_name.clone()
83 }
84
85 fn get_view_instance_id(&self) -> Arc<String> {
86 self.view_instance_id.clone()
87 }
88
89 async fn make_batch_partition_spec(
90 &self,
91 lakehouse: Arc<LakehouseContext>,
92 existing_partitions: Arc<PartitionCache>,
93 insert_range: TimeRange,
94 ) -> Result<Arc<dyn PartitionSpec>> {
95 if *self.view_instance_id != "global" {
96 anyhow::bail!("not supported for jit queries... should it?");
97 }
98 let source_data = Arc::new(
99 fetch_partition_source_data(
100 lakehouse.clone(),
101 existing_partitions,
102 insert_range,
103 "log",
104 )
105 .await
106 .with_context(|| "fetch_partition_source_data")?,
107 );
108 Ok(Arc::new(BlockPartitionSpec {
109 view_metadata: ViewMetadata {
110 view_set_name: self.view_set_name.clone(),
111 view_instance_id: self.view_instance_id.clone(),
112 file_schema_hash: self.get_file_schema_hash(),
113 },
114 schema: self.get_file_schema(),
115 insert_range,
116 source_data,
117 block_processor: Arc::new(LogBlockProcessor {}),
118 }))
119 }
120
121 fn get_file_schema_hash(&self) -> Vec<u8> {
122 vec![SCHEMA_VERSION]
123 }
124
125 fn get_file_schema(&self) -> Arc<Schema> {
126 Arc::new(log_table_schema())
127 }
128
129 #[span_fn]
130 async fn jit_update(
131 &self,
132 lakehouse: Arc<LakehouseContext>,
133 query_range: Option<TimeRange>,
134 ) -> Result<()> {
135 if *self.view_instance_id == "global" {
136 return Ok(());
138 }
139 let process = Arc::new(
140 find_process(
141 &lakehouse.lake().db_pool,
142 &self
143 .process_id
144 .with_context(|| "getting a view's process_id")?,
145 )
146 .await
147 .with_context(|| "find_process")?,
148 );
149 let query_range =
150 query_range.unwrap_or_else(|| TimeRange::new(process.start_time, chrono::Utc::now()));
151
152 let blocks_view = BlocksView::new()?;
153 let all_partitions = generate_process_jit_partitions(
154 &JitPartitionConfig::default(),
155 lakehouse.clone(),
156 &blocks_view,
157 &query_range,
158 process.clone(),
159 "log",
160 )
161 .await
162 .with_context(|| "generate_process_jit_partitions")?;
163 let view_meta = ViewMetadata {
164 view_set_name: self.get_view_set_name(),
165 view_instance_id: self.get_view_instance_id(),
166 file_schema_hash: self.get_file_schema_hash(),
167 };
168
169 for part in all_partitions {
170 if !is_jit_partition_up_to_date(&lakehouse.lake().db_pool, view_meta.clone(), &part)
171 .await?
172 {
173 write_partition_from_blocks(
174 lakehouse.lake().clone(),
175 view_meta.clone(),
176 self.get_file_schema(),
177 part,
178 Arc::new(LogBlockProcessor {}),
179 )
180 .await
181 .with_context(|| "write_partition_from_blocks")?;
182 }
183 }
184 Ok(())
185 }
186
187 fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
188 Ok(vec![Expr::Between(Between::new(
189 col("time").into(),
190 false,
191 Expr::Literal(datetime_to_scalar(begin), None).into(),
192 Expr::Literal(datetime_to_scalar(end), None).into(),
193 ))])
194 }
195
196 fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
197 Arc::new(NamedColumnsTimeBounds::new(
198 TIME_COLUMN.clone(),
199 TIME_COLUMN.clone(),
200 ))
201 }
202
203 fn get_update_group(&self) -> Option<i32> {
204 if *(self.get_view_instance_id()) == "global" {
205 Some(2000)
206 } else {
207 None
208 }
209 }
210
211 fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
212 TimeDelta::hours(1)
213 }
214}