micromegas_analytics/lakehouse/
metrics_view.rs

1use crate::{
2    lakehouse::blocks_view::BlocksView,
3    metadata::find_process,
4    metrics_table::metrics_table_schema,
5    time::{TimeRange, datetime_to_scalar},
6};
7
8use super::{
9    batch_update::PartitionCreationStrategy,
10    block_partition_spec::BlockPartitionSpec,
11    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
12    jit_partitions::{
13        JitPartitionConfig, generate_process_jit_partitions, is_jit_partition_up_to_date,
14        write_partition_from_blocks,
15    },
16    lakehouse_context::LakehouseContext,
17    metrics_block_processor::MetricsBlockProcessor,
18    partition_cache::PartitionCache,
19    partition_source_data::fetch_partition_source_data,
20    view::{PartitionSpec, View, ViewMetadata},
21    view_factory::ViewMaker,
22};
23use anyhow::{Context, Result};
24use async_trait::async_trait;
25use chrono::{DateTime, TimeDelta, Utc};
26use datafusion::{
27    arrow::datatypes::Schema,
28    logical_expr::{Between, Expr, col},
29};
30use micromegas_tracing::info;
31use micromegas_tracing::prelude::*;
32use std::sync::Arc;
33use uuid::Uuid;
34
35const VIEW_SET_NAME: &str = "measures";
36const SCHEMA_VERSION: u8 = 5;
37lazy_static::lazy_static! {
38    static ref TIME_COLUMN: Arc<String> = Arc::new( String::from("time"));
39}
40
41#[derive(Debug)]
42pub struct MetricsViewMaker {}
43
44impl ViewMaker for MetricsViewMaker {
45    fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
46        Ok(Arc::new(MetricsView::new(view_instance_id)?))
47    }
48
49    fn get_schema_hash(&self) -> Vec<u8> {
50        vec![SCHEMA_VERSION]
51    }
52
53    fn get_schema(&self) -> Arc<Schema> {
54        Arc::new(metrics_table_schema())
55    }
56}
57
58#[derive(Debug)]
59pub struct MetricsView {
60    view_set_name: Arc<String>,
61    view_instance_id: Arc<String>,
62    process_id: Option<sqlx::types::Uuid>,
63}
64
65impl MetricsView {
66    pub fn new(view_instance_id: &str) -> Result<Self> {
67        let process_id = if view_instance_id == "global" {
68            None
69        } else {
70            Some(Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?)
71        };
72        Ok(Self {
73            view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
74            view_instance_id: Arc::new(view_instance_id.into()),
75            process_id,
76        })
77    }
78}
79
80#[async_trait]
81impl View for MetricsView {
82    fn get_view_set_name(&self) -> Arc<String> {
83        self.view_set_name.clone()
84    }
85
86    fn get_view_instance_id(&self) -> Arc<String> {
87        self.view_instance_id.clone()
88    }
89
90    async fn make_batch_partition_spec(
91        &self,
92        lakehouse: Arc<LakehouseContext>,
93        existing_partitions: Arc<PartitionCache>,
94        insert_range: TimeRange,
95    ) -> Result<Arc<dyn PartitionSpec>> {
96        if *self.view_instance_id != "global" {
97            anyhow::bail!("not supported for jit queries... should it?");
98        }
99        let source_data = Arc::new(
100            fetch_partition_source_data(
101                lakehouse.clone(),
102                existing_partitions,
103                insert_range,
104                "metrics",
105            )
106            .await
107            .with_context(|| "fetch_partition_source_data")?,
108        );
109        Ok(Arc::new(BlockPartitionSpec {
110            view_metadata: ViewMetadata {
111                view_set_name: self.view_set_name.clone(),
112                view_instance_id: self.view_instance_id.clone(),
113                file_schema_hash: self.get_file_schema_hash(),
114            },
115            schema: self.get_file_schema(),
116            insert_range,
117            source_data,
118            block_processor: Arc::new(MetricsBlockProcessor {}),
119        }))
120    }
121
122    fn get_file_schema_hash(&self) -> Vec<u8> {
123        vec![SCHEMA_VERSION]
124    }
125
126    fn get_file_schema(&self) -> Arc<Schema> {
127        Arc::new(metrics_table_schema())
128    }
129
130    #[span_fn]
131    async fn jit_update(
132        &self,
133        lakehouse: Arc<LakehouseContext>,
134        query_range: Option<TimeRange>,
135    ) -> Result<()> {
136        if *self.view_instance_id == "global" {
137            // this view instance is updated using the deamon
138            return Ok(());
139        }
140        info!("find_process");
141        let process = Arc::new(
142            find_process(
143                &lakehouse.lake().db_pool,
144                &self
145                    .process_id
146                    .with_context(|| "getting a view's process_id")?,
147            )
148            .await
149            .with_context(|| "find_process")?,
150        );
151
152        //todo: use last_update_time in process
153        let query_range =
154            query_range.unwrap_or_else(|| TimeRange::new(process.start_time, chrono::Utc::now()));
155
156        let blocks_view = BlocksView::new()?;
157        let all_partitions = generate_process_jit_partitions(
158            &JitPartitionConfig::default(),
159            lakehouse.clone(),
160            &blocks_view,
161            &query_range,
162            process.clone(),
163            "metrics",
164        )
165        .await
166        .with_context(|| "generate_process_jit_partitions")?;
167        let view_meta = ViewMetadata {
168            view_set_name: self.get_view_set_name(),
169            view_instance_id: self.get_view_instance_id(),
170            file_schema_hash: self.get_file_schema_hash(),
171        };
172
173        for part in all_partitions {
174            if !is_jit_partition_up_to_date(&lakehouse.lake().db_pool, view_meta.clone(), &part)
175                .await?
176            {
177                write_partition_from_blocks(
178                    lakehouse.lake().clone(),
179                    view_meta.clone(),
180                    self.get_file_schema(),
181                    part,
182                    Arc::new(MetricsBlockProcessor {}),
183                )
184                .await
185                .with_context(|| "write_partition_from_blocks")?;
186            }
187        }
188        Ok(())
189    }
190
191    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
192        Ok(vec![Expr::Between(Between::new(
193            col("time").into(),
194            false,
195            Expr::Literal(datetime_to_scalar(begin), None).into(),
196            Expr::Literal(datetime_to_scalar(end), None).into(),
197        ))])
198    }
199
200    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
201        Arc::new(NamedColumnsTimeBounds::new(
202            TIME_COLUMN.clone(),
203            TIME_COLUMN.clone(),
204        ))
205    }
206
207    fn get_update_group(&self) -> Option<i32> {
208        if *(self.get_view_instance_id()) == "global" {
209            Some(2000)
210        } else {
211            None
212        }
213    }
214
215    fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
216        TimeDelta::hours(1)
217    }
218}