micromegas_analytics/lakehouse/
log_view.rs

1use super::{
2    batch_update::PartitionCreationStrategy,
3    block_partition_spec::BlockPartitionSpec,
4    blocks_view::BlocksView,
5    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
6    jit_partitions::{JitPartitionConfig, write_partition_from_blocks},
7    lakehouse_context::LakehouseContext,
8    log_block_processor::LogBlockProcessor,
9    partition_cache::PartitionCache,
10    partition_source_data::fetch_partition_source_data,
11    view::{PartitionSpec, View, ViewMetadata},
12    view_factory::ViewMaker,
13};
14use crate::{
15    lakehouse::jit_partitions::{generate_process_jit_partitions, is_jit_partition_up_to_date},
16    log_entries_table::log_table_schema,
17    metadata::find_process,
18    time::{TimeRange, datetime_to_scalar},
19};
20use anyhow::{Context, Result};
21use async_trait::async_trait;
22use chrono::{DateTime, TimeDelta, Utc};
23use datafusion::{
24    arrow::datatypes::Schema,
25    logical_expr::{Between, Expr, col},
26};
27use micromegas_tracing::prelude::*;
28use std::sync::Arc;
29use uuid::Uuid;
30
31const VIEW_SET_NAME: &str = "log_entries";
32const SCHEMA_VERSION: u8 = 5;
33lazy_static::lazy_static! {
34    static ref TIME_COLUMN: Arc<String> = Arc::new( String::from("time"));
35}
36
37/// A `ViewMaker` for creating `LogView` instances.
38#[derive(Debug)]
39pub struct LogViewMaker {}
40
41impl ViewMaker for LogViewMaker {
42    fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
43        Ok(Arc::new(LogView::new(view_instance_id)?))
44    }
45
46    fn get_schema_hash(&self) -> Vec<u8> {
47        vec![SCHEMA_VERSION]
48    }
49
50    fn get_schema(&self) -> Arc<Schema> {
51        Arc::new(log_table_schema())
52    }
53}
54
55/// A view of log entries.
56#[derive(Debug)]
57pub struct LogView {
58    view_set_name: Arc<String>,
59    view_instance_id: Arc<String>,
60    process_id: Option<sqlx::types::Uuid>,
61}
62
63impl LogView {
64    pub fn new(view_instance_id: &str) -> Result<Self> {
65        let process_id = if view_instance_id == "global" {
66            None
67        } else {
68            Some(Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?)
69        };
70
71        Ok(Self {
72            view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
73            view_instance_id: Arc::new(view_instance_id.into()),
74            process_id,
75        })
76    }
77}
78
79#[async_trait]
80impl View for LogView {
81    fn get_view_set_name(&self) -> Arc<String> {
82        self.view_set_name.clone()
83    }
84
85    fn get_view_instance_id(&self) -> Arc<String> {
86        self.view_instance_id.clone()
87    }
88
89    async fn make_batch_partition_spec(
90        &self,
91        lakehouse: Arc<LakehouseContext>,
92        existing_partitions: Arc<PartitionCache>,
93        insert_range: TimeRange,
94    ) -> Result<Arc<dyn PartitionSpec>> {
95        if *self.view_instance_id != "global" {
96            anyhow::bail!("not supported for jit queries... should it?");
97        }
98        let source_data = Arc::new(
99            fetch_partition_source_data(
100                lakehouse.clone(),
101                existing_partitions,
102                insert_range,
103                "log",
104            )
105            .await
106            .with_context(|| "fetch_partition_source_data")?,
107        );
108        Ok(Arc::new(BlockPartitionSpec {
109            view_metadata: ViewMetadata {
110                view_set_name: self.view_set_name.clone(),
111                view_instance_id: self.view_instance_id.clone(),
112                file_schema_hash: self.get_file_schema_hash(),
113            },
114            schema: self.get_file_schema(),
115            insert_range,
116            source_data,
117            block_processor: Arc::new(LogBlockProcessor {}),
118        }))
119    }
120
121    fn get_file_schema_hash(&self) -> Vec<u8> {
122        vec![SCHEMA_VERSION]
123    }
124
125    fn get_file_schema(&self) -> Arc<Schema> {
126        Arc::new(log_table_schema())
127    }
128
129    #[span_fn]
130    async fn jit_update(
131        &self,
132        lakehouse: Arc<LakehouseContext>,
133        query_range: Option<TimeRange>,
134    ) -> Result<()> {
135        if *self.view_instance_id == "global" {
136            // this view instance is updated using the deamon
137            return Ok(());
138        }
139        let process = Arc::new(
140            find_process(
141                &lakehouse.lake().db_pool,
142                &self
143                    .process_id
144                    .with_context(|| "getting a view's process_id")?,
145            )
146            .await
147            .with_context(|| "find_process")?,
148        );
149        let query_range =
150            query_range.unwrap_or_else(|| TimeRange::new(process.start_time, chrono::Utc::now()));
151
152        let blocks_view = BlocksView::new()?;
153        let all_partitions = generate_process_jit_partitions(
154            &JitPartitionConfig::default(),
155            lakehouse.clone(),
156            &blocks_view,
157            &query_range,
158            process.clone(),
159            "log",
160        )
161        .await
162        .with_context(|| "generate_process_jit_partitions")?;
163        let view_meta = ViewMetadata {
164            view_set_name: self.get_view_set_name(),
165            view_instance_id: self.get_view_instance_id(),
166            file_schema_hash: self.get_file_schema_hash(),
167        };
168
169        for part in all_partitions {
170            if !is_jit_partition_up_to_date(&lakehouse.lake().db_pool, view_meta.clone(), &part)
171                .await?
172            {
173                write_partition_from_blocks(
174                    lakehouse.lake().clone(),
175                    view_meta.clone(),
176                    self.get_file_schema(),
177                    part,
178                    Arc::new(LogBlockProcessor {}),
179                )
180                .await
181                .with_context(|| "write_partition_from_blocks")?;
182            }
183        }
184        Ok(())
185    }
186
187    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
188        Ok(vec![Expr::Between(Between::new(
189            col("time").into(),
190            false,
191            Expr::Literal(datetime_to_scalar(begin), None).into(),
192            Expr::Literal(datetime_to_scalar(end), None).into(),
193        ))])
194    }
195
196    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
197        Arc::new(NamedColumnsTimeBounds::new(
198            TIME_COLUMN.clone(),
199            TIME_COLUMN.clone(),
200        ))
201    }
202
203    fn get_update_group(&self) -> Option<i32> {
204        if *(self.get_view_instance_id()) == "global" {
205            Some(2000)
206        } else {
207            None
208        }
209    }
210
211    fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
212        TimeDelta::hours(1)
213    }
214}