micromegas_analytics/lakehouse/
metrics_view.rs1use crate::{
2 lakehouse::blocks_view::BlocksView,
3 metadata::find_process,
4 metrics_table::metrics_table_schema,
5 time::{TimeRange, datetime_to_scalar},
6};
7
8use super::{
9 batch_update::PartitionCreationStrategy,
10 block_partition_spec::{BlockPartitionSpec, BlockProcessor, BlockProcessorMap},
11 dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
12 jit_partitions::{
13 JitPartitionConfig, generate_process_jit_partitions, is_jit_partition_up_to_date,
14 write_partition_from_blocks,
15 },
16 lakehouse_context::LakehouseContext,
17 metrics_block_processor::MetricsBlockProcessor,
18 otel::metrics_block_processor::OtelMetricsBlockProcessor,
19 partition_cache::PartitionCache,
20 partition_source_data::fetch_partition_source_data,
21 view::{PartitionSpec, View, ViewMetadata},
22 view_factory::ViewMaker,
23};
24use anyhow::{Context, Result};
25use async_trait::async_trait;
26use chrono::{DateTime, TimeDelta, Utc};
27use datafusion::{
28 arrow::datatypes::Schema,
29 logical_expr::{Between, Expr, col},
30};
31use micromegas_ingestion::web_ingestion_service::{FORMAT_OTLP_METRICS, FORMAT_TRANSIT};
32use micromegas_tracing::info;
33use micromegas_tracing::prelude::*;
34use std::collections::HashMap;
35use std::sync::Arc;
36use uuid::Uuid;
37
38const VIEW_SET_NAME: &str = "measures";
39const SCHEMA_VERSION: u8 = 5;
40lazy_static::lazy_static! {
41 static ref TIME_COLUMN: Arc<String> = Arc::new( String::from("time"));
42}
43
44fn metrics_processors() -> Arc<BlockProcessorMap> {
46 let mut m: BlockProcessorMap = HashMap::new();
47 m.insert(
48 FORMAT_TRANSIT,
49 Arc::new(MetricsBlockProcessor {}) as Arc<dyn BlockProcessor>,
50 );
51 m.insert(
52 FORMAT_OTLP_METRICS,
53 Arc::new(OtelMetricsBlockProcessor {}) as Arc<dyn BlockProcessor>,
54 );
55 Arc::new(m)
56}
57
58#[derive(Debug)]
59pub struct MetricsViewMaker {}
60
61impl ViewMaker for MetricsViewMaker {
62 fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
63 Ok(Arc::new(MetricsView::new(view_instance_id)?))
64 }
65
66 fn get_schema_hash(&self) -> Vec<u8> {
67 vec![SCHEMA_VERSION]
68 }
69
70 fn get_schema(&self) -> Arc<Schema> {
71 Arc::new(metrics_table_schema())
72 }
73}
74
75#[derive(Debug)]
76pub struct MetricsView {
77 view_set_name: Arc<String>,
78 view_instance_id: Arc<String>,
79 process_id: Option<sqlx::types::Uuid>,
80}
81
82impl MetricsView {
83 pub fn new(view_instance_id: &str) -> Result<Self> {
84 let process_id = if view_instance_id == "global" {
85 None
86 } else {
87 Some(Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?)
88 };
89 Ok(Self {
90 view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
91 view_instance_id: Arc::new(view_instance_id.into()),
92 process_id,
93 })
94 }
95}
96
97#[async_trait]
98impl View for MetricsView {
99 fn get_view_set_name(&self) -> Arc<String> {
100 self.view_set_name.clone()
101 }
102
103 fn get_view_instance_id(&self) -> Arc<String> {
104 self.view_instance_id.clone()
105 }
106
107 async fn make_batch_partition_spec(
108 &self,
109 lakehouse: Arc<LakehouseContext>,
110 existing_partitions: Arc<PartitionCache>,
111 insert_range: TimeRange,
112 ) -> Result<Arc<dyn PartitionSpec>> {
113 if *self.view_instance_id != "global" {
114 anyhow::bail!("not supported for jit queries... should it?");
115 }
116 let source_data = Arc::new(
117 fetch_partition_source_data(
118 lakehouse.clone(),
119 existing_partitions,
120 insert_range,
121 "metrics",
122 )
123 .await
124 .with_context(|| "fetch_partition_source_data")?,
125 );
126 Ok(Arc::new(BlockPartitionSpec {
127 view_metadata: ViewMetadata {
128 view_set_name: self.view_set_name.clone(),
129 view_instance_id: self.view_instance_id.clone(),
130 file_schema_hash: self.get_file_schema_hash(),
131 },
132 schema: self.get_file_schema(),
133 insert_range,
134 source_data,
135 block_processors: metrics_processors(),
136 }))
137 }
138
139 fn get_file_schema_hash(&self) -> Vec<u8> {
140 vec![SCHEMA_VERSION]
141 }
142
143 fn get_file_schema(&self) -> Arc<Schema> {
144 Arc::new(metrics_table_schema())
145 }
146
147 #[span_fn]
148 async fn jit_update(
149 &self,
150 lakehouse: Arc<LakehouseContext>,
151 query_range: Option<TimeRange>,
152 ) -> Result<()> {
153 if *self.view_instance_id == "global" {
154 return Ok(());
156 }
157 info!("find_process");
158 let process = Arc::new(
159 find_process(
160 &lakehouse.lake().db_pool,
161 &self
162 .process_id
163 .with_context(|| "getting a view's process_id")?,
164 )
165 .await
166 .with_context(|| "find_process")?,
167 );
168
169 let query_range =
171 query_range.unwrap_or_else(|| TimeRange::new(process.start_time, chrono::Utc::now()));
172
173 let blocks_view = BlocksView::new()?;
174 let all_partitions = generate_process_jit_partitions(
175 &JitPartitionConfig::default(),
176 lakehouse.clone(),
177 &blocks_view,
178 &query_range,
179 process.clone(),
180 "metrics",
181 )
182 .await
183 .with_context(|| "generate_process_jit_partitions")?;
184 let view_meta = ViewMetadata {
185 view_set_name: self.get_view_set_name(),
186 view_instance_id: self.get_view_instance_id(),
187 file_schema_hash: self.get_file_schema_hash(),
188 };
189 let block_processors = metrics_processors();
190
191 for part in all_partitions {
192 if !is_jit_partition_up_to_date(&lakehouse.lake().db_pool, view_meta.clone(), &part)
193 .await?
194 {
195 write_partition_from_blocks(
196 lakehouse.lake().clone(),
197 view_meta.clone(),
198 self.get_file_schema(),
199 part,
200 block_processors.clone(),
201 )
202 .await
203 .with_context(|| "write_partition_from_blocks")?;
204 }
205 }
206 Ok(())
207 }
208
209 fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
210 Ok(vec![Expr::Between(Between::new(
211 col("time").into(),
212 false,
213 Expr::Literal(datetime_to_scalar(begin), None).into(),
214 Expr::Literal(datetime_to_scalar(end), None).into(),
215 ))])
216 }
217
218 fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
219 Arc::new(NamedColumnsTimeBounds::new(
220 TIME_COLUMN.clone(),
221 TIME_COLUMN.clone(),
222 ))
223 }
224
225 fn get_update_group(&self) -> Option<i32> {
226 if *(self.get_view_instance_id()) == "global" {
227 Some(2000)
228 } else {
229 None
230 }
231 }
232
233 fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
234 TimeDelta::hours(1)
235 }
236}