micromegas_analytics/lakehouse/
metrics_view.rs1use crate::{
2 lakehouse::blocks_view::BlocksView,
3 metadata::find_process,
4 metrics_table::metrics_table_schema,
5 time::{TimeRange, datetime_to_scalar},
6};
7
8use super::{
9 batch_update::PartitionCreationStrategy,
10 block_partition_spec::BlockPartitionSpec,
11 dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
12 jit_partitions::{
13 JitPartitionConfig, generate_process_jit_partitions, is_jit_partition_up_to_date,
14 write_partition_from_blocks,
15 },
16 lakehouse_context::LakehouseContext,
17 metrics_block_processor::MetricsBlockProcessor,
18 partition_cache::PartitionCache,
19 partition_source_data::fetch_partition_source_data,
20 view::{PartitionSpec, View, ViewMetadata},
21 view_factory::ViewMaker,
22};
23use anyhow::{Context, Result};
24use async_trait::async_trait;
25use chrono::{DateTime, TimeDelta, Utc};
26use datafusion::{
27 arrow::datatypes::Schema,
28 logical_expr::{Between, Expr, col},
29};
30use micromegas_tracing::info;
31use micromegas_tracing::prelude::*;
32use std::sync::Arc;
33use uuid::Uuid;
34
35const VIEW_SET_NAME: &str = "measures";
36const SCHEMA_VERSION: u8 = 5;
37lazy_static::lazy_static! {
38 static ref TIME_COLUMN: Arc<String> = Arc::new( String::from("time"));
39}
40
41#[derive(Debug)]
42pub struct MetricsViewMaker {}
43
44impl ViewMaker for MetricsViewMaker {
45 fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
46 Ok(Arc::new(MetricsView::new(view_instance_id)?))
47 }
48
49 fn get_schema_hash(&self) -> Vec<u8> {
50 vec![SCHEMA_VERSION]
51 }
52
53 fn get_schema(&self) -> Arc<Schema> {
54 Arc::new(metrics_table_schema())
55 }
56}
57
58#[derive(Debug)]
59pub struct MetricsView {
60 view_set_name: Arc<String>,
61 view_instance_id: Arc<String>,
62 process_id: Option<sqlx::types::Uuid>,
63}
64
65impl MetricsView {
66 pub fn new(view_instance_id: &str) -> Result<Self> {
67 let process_id = if view_instance_id == "global" {
68 None
69 } else {
70 Some(Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?)
71 };
72 Ok(Self {
73 view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
74 view_instance_id: Arc::new(view_instance_id.into()),
75 process_id,
76 })
77 }
78}
79
80#[async_trait]
81impl View for MetricsView {
82 fn get_view_set_name(&self) -> Arc<String> {
83 self.view_set_name.clone()
84 }
85
86 fn get_view_instance_id(&self) -> Arc<String> {
87 self.view_instance_id.clone()
88 }
89
90 async fn make_batch_partition_spec(
91 &self,
92 lakehouse: Arc<LakehouseContext>,
93 existing_partitions: Arc<PartitionCache>,
94 insert_range: TimeRange,
95 ) -> Result<Arc<dyn PartitionSpec>> {
96 if *self.view_instance_id != "global" {
97 anyhow::bail!("not supported for jit queries... should it?");
98 }
99 let source_data = Arc::new(
100 fetch_partition_source_data(
101 lakehouse.clone(),
102 existing_partitions,
103 insert_range,
104 "metrics",
105 )
106 .await
107 .with_context(|| "fetch_partition_source_data")?,
108 );
109 Ok(Arc::new(BlockPartitionSpec {
110 view_metadata: ViewMetadata {
111 view_set_name: self.view_set_name.clone(),
112 view_instance_id: self.view_instance_id.clone(),
113 file_schema_hash: self.get_file_schema_hash(),
114 },
115 schema: self.get_file_schema(),
116 insert_range,
117 source_data,
118 block_processor: Arc::new(MetricsBlockProcessor {}),
119 }))
120 }
121
122 fn get_file_schema_hash(&self) -> Vec<u8> {
123 vec![SCHEMA_VERSION]
124 }
125
126 fn get_file_schema(&self) -> Arc<Schema> {
127 Arc::new(metrics_table_schema())
128 }
129
130 #[span_fn]
131 async fn jit_update(
132 &self,
133 lakehouse: Arc<LakehouseContext>,
134 query_range: Option<TimeRange>,
135 ) -> Result<()> {
136 if *self.view_instance_id == "global" {
137 return Ok(());
139 }
140 info!("find_process");
141 let process = Arc::new(
142 find_process(
143 &lakehouse.lake().db_pool,
144 &self
145 .process_id
146 .with_context(|| "getting a view's process_id")?,
147 )
148 .await
149 .with_context(|| "find_process")?,
150 );
151
152 let query_range =
154 query_range.unwrap_or_else(|| TimeRange::new(process.start_time, chrono::Utc::now()));
155
156 let blocks_view = BlocksView::new()?;
157 let all_partitions = generate_process_jit_partitions(
158 &JitPartitionConfig::default(),
159 lakehouse.clone(),
160 &blocks_view,
161 &query_range,
162 process.clone(),
163 "metrics",
164 )
165 .await
166 .with_context(|| "generate_process_jit_partitions")?;
167 let view_meta = ViewMetadata {
168 view_set_name: self.get_view_set_name(),
169 view_instance_id: self.get_view_instance_id(),
170 file_schema_hash: self.get_file_schema_hash(),
171 };
172
173 for part in all_partitions {
174 if !is_jit_partition_up_to_date(&lakehouse.lake().db_pool, view_meta.clone(), &part)
175 .await?
176 {
177 write_partition_from_blocks(
178 lakehouse.lake().clone(),
179 view_meta.clone(),
180 self.get_file_schema(),
181 part,
182 Arc::new(MetricsBlockProcessor {}),
183 )
184 .await
185 .with_context(|| "write_partition_from_blocks")?;
186 }
187 }
188 Ok(())
189 }
190
191 fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
192 Ok(vec![Expr::Between(Between::new(
193 col("time").into(),
194 false,
195 Expr::Literal(datetime_to_scalar(begin), None).into(),
196 Expr::Literal(datetime_to_scalar(end), None).into(),
197 ))])
198 }
199
200 fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
201 Arc::new(NamedColumnsTimeBounds::new(
202 TIME_COLUMN.clone(),
203 TIME_COLUMN.clone(),
204 ))
205 }
206
207 fn get_update_group(&self) -> Option<i32> {
208 if *(self.get_view_instance_id()) == "global" {
209 Some(2000)
210 } else {
211 None
212 }
213 }
214
215 fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
216 TimeDelta::hours(1)
217 }
218}