micromegas_analytics/lakehouse/
metrics_view.rs

1use crate::{
2    lakehouse::blocks_view::BlocksView,
3    metadata::find_process,
4    metrics_table::metrics_table_schema,
5    time::{TimeRange, datetime_to_scalar},
6};
7
8use super::{
9    batch_update::PartitionCreationStrategy,
10    block_partition_spec::{BlockPartitionSpec, BlockProcessor, BlockProcessorMap},
11    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
12    jit_partitions::{
13        JitPartitionConfig, generate_process_jit_partitions, is_jit_partition_up_to_date,
14        write_partition_from_blocks,
15    },
16    lakehouse_context::LakehouseContext,
17    metrics_block_processor::MetricsBlockProcessor,
18    otel::metrics_block_processor::OtelMetricsBlockProcessor,
19    partition_cache::PartitionCache,
20    partition_source_data::fetch_partition_source_data,
21    view::{PartitionSpec, View, ViewMetadata},
22    view_factory::ViewMaker,
23};
24use anyhow::{Context, Result};
25use async_trait::async_trait;
26use chrono::{DateTime, TimeDelta, Utc};
27use datafusion::{
28    arrow::datatypes::Schema,
29    logical_expr::{Between, Expr, col},
30};
31use micromegas_ingestion::web_ingestion_service::{FORMAT_OTLP_METRICS, FORMAT_TRANSIT};
32use micromegas_tracing::info;
33use micromegas_tracing::prelude::*;
34use std::collections::HashMap;
35use std::sync::Arc;
36use uuid::Uuid;
37
38const VIEW_SET_NAME: &str = "measures";
39const SCHEMA_VERSION: u8 = 5;
40lazy_static::lazy_static! {
41    static ref TIME_COLUMN: Arc<String> = Arc::new( String::from("time"));
42}
43
44/// Block-processor map covering native transit + OTel Sum/Gauge metric blocks.
45fn metrics_processors() -> Arc<BlockProcessorMap> {
46    let mut m: BlockProcessorMap = HashMap::new();
47    m.insert(
48        FORMAT_TRANSIT,
49        Arc::new(MetricsBlockProcessor {}) as Arc<dyn BlockProcessor>,
50    );
51    m.insert(
52        FORMAT_OTLP_METRICS,
53        Arc::new(OtelMetricsBlockProcessor {}) as Arc<dyn BlockProcessor>,
54    );
55    Arc::new(m)
56}
57
58#[derive(Debug)]
59pub struct MetricsViewMaker {}
60
61impl ViewMaker for MetricsViewMaker {
62    fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
63        Ok(Arc::new(MetricsView::new(view_instance_id)?))
64    }
65
66    fn get_schema_hash(&self) -> Vec<u8> {
67        vec![SCHEMA_VERSION]
68    }
69
70    fn get_schema(&self) -> Arc<Schema> {
71        Arc::new(metrics_table_schema())
72    }
73}
74
75#[derive(Debug)]
76pub struct MetricsView {
77    view_set_name: Arc<String>,
78    view_instance_id: Arc<String>,
79    process_id: Option<sqlx::types::Uuid>,
80}
81
82impl MetricsView {
83    pub fn new(view_instance_id: &str) -> Result<Self> {
84        let process_id = if view_instance_id == "global" {
85            None
86        } else {
87            Some(Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?)
88        };
89        Ok(Self {
90            view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
91            view_instance_id: Arc::new(view_instance_id.into()),
92            process_id,
93        })
94    }
95}
96
97#[async_trait]
98impl View for MetricsView {
99    fn get_view_set_name(&self) -> Arc<String> {
100        self.view_set_name.clone()
101    }
102
103    fn get_view_instance_id(&self) -> Arc<String> {
104        self.view_instance_id.clone()
105    }
106
107    async fn make_batch_partition_spec(
108        &self,
109        lakehouse: Arc<LakehouseContext>,
110        existing_partitions: Arc<PartitionCache>,
111        insert_range: TimeRange,
112    ) -> Result<Arc<dyn PartitionSpec>> {
113        if *self.view_instance_id != "global" {
114            anyhow::bail!("not supported for jit queries... should it?");
115        }
116        let source_data = Arc::new(
117            fetch_partition_source_data(
118                lakehouse.clone(),
119                existing_partitions,
120                insert_range,
121                "metrics",
122            )
123            .await
124            .with_context(|| "fetch_partition_source_data")?,
125        );
126        Ok(Arc::new(BlockPartitionSpec {
127            view_metadata: ViewMetadata {
128                view_set_name: self.view_set_name.clone(),
129                view_instance_id: self.view_instance_id.clone(),
130                file_schema_hash: self.get_file_schema_hash(),
131            },
132            schema: self.get_file_schema(),
133            insert_range,
134            source_data,
135            block_processors: metrics_processors(),
136        }))
137    }
138
139    fn get_file_schema_hash(&self) -> Vec<u8> {
140        vec![SCHEMA_VERSION]
141    }
142
143    fn get_file_schema(&self) -> Arc<Schema> {
144        Arc::new(metrics_table_schema())
145    }
146
147    #[span_fn]
148    async fn jit_update(
149        &self,
150        lakehouse: Arc<LakehouseContext>,
151        query_range: Option<TimeRange>,
152    ) -> Result<()> {
153        if *self.view_instance_id == "global" {
154            // this view instance is updated using the deamon
155            return Ok(());
156        }
157        info!("find_process");
158        let process = Arc::new(
159            find_process(
160                &lakehouse.lake().db_pool,
161                &self
162                    .process_id
163                    .with_context(|| "getting a view's process_id")?,
164            )
165            .await
166            .with_context(|| "find_process")?,
167        );
168
169        //todo: use last_update_time in process
170        let query_range =
171            query_range.unwrap_or_else(|| TimeRange::new(process.start_time, chrono::Utc::now()));
172
173        let blocks_view = BlocksView::new()?;
174        let all_partitions = generate_process_jit_partitions(
175            &JitPartitionConfig::default(),
176            lakehouse.clone(),
177            &blocks_view,
178            &query_range,
179            process.clone(),
180            "metrics",
181        )
182        .await
183        .with_context(|| "generate_process_jit_partitions")?;
184        let view_meta = ViewMetadata {
185            view_set_name: self.get_view_set_name(),
186            view_instance_id: self.get_view_instance_id(),
187            file_schema_hash: self.get_file_schema_hash(),
188        };
189        let block_processors = metrics_processors();
190
191        for part in all_partitions {
192            if !is_jit_partition_up_to_date(&lakehouse.lake().db_pool, view_meta.clone(), &part)
193                .await?
194            {
195                write_partition_from_blocks(
196                    lakehouse.lake().clone(),
197                    view_meta.clone(),
198                    self.get_file_schema(),
199                    part,
200                    block_processors.clone(),
201                )
202                .await
203                .with_context(|| "write_partition_from_blocks")?;
204            }
205        }
206        Ok(())
207    }
208
209    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
210        Ok(vec![Expr::Between(Between::new(
211            col("time").into(),
212            false,
213            Expr::Literal(datetime_to_scalar(begin), None).into(),
214            Expr::Literal(datetime_to_scalar(end), None).into(),
215        ))])
216    }
217
218    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
219        Arc::new(NamedColumnsTimeBounds::new(
220            TIME_COLUMN.clone(),
221            TIME_COLUMN.clone(),
222        ))
223    }
224
225    fn get_update_group(&self) -> Option<i32> {
226        if *(self.get_view_instance_id()) == "global" {
227            Some(2000)
228        } else {
229            None
230        }
231    }
232
233    fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
234        TimeDelta::hours(1)
235    }
236}