micromegas_analytics/lakehouse/
blocks_view.rs

1use super::{
2    batch_update::PartitionCreationStrategy,
3    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
4    lakehouse_context::LakehouseContext,
5    metadata_partition_spec::fetch_metadata_partition_spec,
6    partition_cache::PartitionCache,
7    view::{PartitionSpec, View, ViewMetadata},
8};
9use crate::time::{TimeRange, datetime_to_scalar};
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use chrono::{DateTime, TimeDelta, Utc};
13use datafusion::{
14    arrow::datatypes::{DataType, Field, Schema, TimeUnit},
15    logical_expr::{Expr, col},
16    prelude::*,
17};
18use std::sync::Arc;
19
20const VIEW_SET_NAME: &str = "blocks";
21const VIEW_INSTANCE_ID: &str = "global";
22lazy_static::lazy_static! {
23    static ref BEGIN_TIME_COLUMN: Arc<String> = Arc::new( String::from("begin_time"));
24    static ref INSERT_TIME_COLUMN: Arc<String> = Arc::new( String::from("insert_time"));
25}
26
27/// A view of the `blocks` table, providing access to telemetry block metadata.
28#[derive(Debug)]
29pub struct BlocksView {
30    view_set_name: Arc<String>,
31    view_instance_id: Arc<String>,
32    data_sql: Arc<String>,
33}
34
35impl BlocksView {
36    pub fn new() -> Result<Self> {
37        let data_sql = Arc::new(String::from(
38            r#"SELECT block_id, streams.stream_id, processes.process_id, blocks.begin_time, blocks.begin_ticks, blocks.end_time, blocks.end_ticks, blocks.nb_objects, blocks.object_offset, blocks.payload_size, blocks.insert_time,
39           streams.dependencies_metadata, streams.objects_metadata, streams.tags, streams.properties, streams.insert_time as stream_insert_time,
40           processes.start_time, processes.start_ticks, processes.tsc_frequency, processes.exe, processes.username, processes.realname, processes.computer, processes.distro, processes.cpu_brand, processes.insert_time as process_insert_time, processes.parent_process_id, processes.properties as process_properties
41         FROM blocks, streams, processes
42         WHERE blocks.stream_id = streams.stream_id
43         AND blocks.process_id = processes.process_id
44         AND blocks.insert_time >= $1
45         AND blocks.insert_time < $2
46         ORDER BY blocks.insert_time, blocks.block_id
47         ;"#,
48        ));
49        Ok(Self {
50            view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
51            view_instance_id: Arc::new(String::from(VIEW_INSTANCE_ID)),
52            data_sql,
53        })
54    }
55}
56
57#[async_trait]
58impl View for BlocksView {
59    fn get_view_set_name(&self) -> Arc<String> {
60        self.view_set_name.clone()
61    }
62
63    fn get_view_instance_id(&self) -> Arc<String> {
64        self.view_instance_id.clone()
65    }
66
67    async fn make_batch_partition_spec(
68        &self,
69        lakehouse: Arc<LakehouseContext>,
70        _existing_partitions: Arc<PartitionCache>,
71        insert_range: TimeRange,
72    ) -> Result<Arc<dyn PartitionSpec>> {
73        let view_meta = ViewMetadata {
74            view_set_name: self.get_view_set_name(),
75            view_instance_id: self.get_view_instance_id(),
76            file_schema_hash: self.get_file_schema_hash(),
77        };
78        let source_count_query = "
79             SELECT COUNT(*) as count
80             FROM blocks, streams, processes
81             WHERE blocks.stream_id = streams.stream_id
82             AND blocks.process_id = processes.process_id
83             AND blocks.insert_time >= $1
84             AND blocks.insert_time < $2
85             ;";
86        Ok(Arc::new(
87            fetch_metadata_partition_spec(
88                &lakehouse.lake().db_pool,
89                source_count_query,
90                self.data_sql.clone(),
91                view_meta,
92                self.get_file_schema(),
93                insert_range,
94                self.get_time_bounds(),
95            )
96            .await
97            .with_context(|| "fetch_metadata_partition_spec")?,
98        ))
99    }
100
101    fn get_file_schema_hash(&self) -> Vec<u8> {
102        blocks_file_schema_hash()
103    }
104
105    fn get_file_schema(&self) -> Arc<Schema> {
106        Arc::new(blocks_view_schema())
107    }
108
109    async fn jit_update(
110        &self,
111        _lakehouse: Arc<LakehouseContext>,
112        _query_range: Option<TimeRange>,
113    ) -> Result<()> {
114        if *self.view_instance_id == "global" {
115            // this view instance is updated using the deamon
116            return Ok(());
117        }
118        anyhow::bail!("not supported");
119    }
120
121    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
122        Ok(vec![
123            col("begin_time").lt_eq(lit(datetime_to_scalar(end))),
124            col("insert_time").gt_eq(lit(datetime_to_scalar(begin))),
125        ])
126    }
127
128    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
129        //todo: make more robust, by changing to [ min(begin, insert), max(end, insert) ]
130        Arc::new(NamedColumnsTimeBounds::new(
131            BEGIN_TIME_COLUMN.clone(),
132            INSERT_TIME_COLUMN.clone(),
133        ))
134    }
135
136    fn get_update_group(&self) -> Option<i32> {
137        Some(1000)
138    }
139
140    fn get_max_partition_time_delta(&self, strategy: &PartitionCreationStrategy) -> TimeDelta {
141        match strategy {
142            PartitionCreationStrategy::Abort | PartitionCreationStrategy::CreateFromSource => {
143                TimeDelta::hours(1)
144            }
145            PartitionCreationStrategy::MergeExisting(_partitions) => TimeDelta::days(1),
146        }
147    }
148}
149
150/// Returns the Arrow schema for the blocks view.
151pub fn blocks_view_schema() -> Schema {
152    Schema::new(vec![
153        Field::new("block_id", DataType::Utf8, false),
154        Field::new("stream_id", DataType::Utf8, false),
155        Field::new("process_id", DataType::Utf8, false),
156        Field::new(
157            "begin_time",
158            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
159            false,
160        ),
161        Field::new("begin_ticks", DataType::Int64, false),
162        Field::new(
163            "end_time",
164            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
165            false,
166        ),
167        Field::new("end_ticks", DataType::Int64, false),
168        Field::new("nb_objects", DataType::Int32, false),
169        Field::new("object_offset", DataType::Int64, false),
170        Field::new("payload_size", DataType::Int64, false),
171        Field::new(
172            "insert_time",
173            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
174            false,
175        ),
176        Field::new("streams.dependencies_metadata", DataType::Binary, false),
177        Field::new("streams.objects_metadata", DataType::Binary, false),
178        Field::new(
179            "streams.tags",
180            DataType::List(Arc::new(Field::new("tag", DataType::Utf8, false))),
181            true,
182        ),
183        Field::new(
184            "streams.properties",
185            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
186            false,
187        ),
188        Field::new(
189            "streams.insert_time",
190            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
191            false,
192        ),
193        Field::new(
194            "processes.start_time",
195            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
196            false,
197        ),
198        Field::new("processes.start_ticks", DataType::Int64, false),
199        Field::new("processes.tsc_frequency", DataType::Int64, false),
200        Field::new("processes.exe", DataType::Utf8, false),
201        Field::new("processes.username", DataType::Utf8, false),
202        Field::new("processes.realname", DataType::Utf8, false),
203        Field::new("processes.computer", DataType::Utf8, false),
204        Field::new("processes.distro", DataType::Utf8, false),
205        Field::new("processes.cpu_brand", DataType::Utf8, false),
206        Field::new(
207            "processes.insert_time",
208            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
209            false,
210        ),
211        Field::new("processes.parent_process_id", DataType::Utf8, false),
212        Field::new(
213            "processes.properties",
214            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
215            false,
216        ),
217    ])
218}
219
220/// Returns the file schema hash for the blocks view.
221pub fn blocks_file_schema_hash() -> Vec<u8> {
222    vec![2] // Bumped from vec![1] for JSONB migration
223}