1use super::{
2 batch_update::PartitionCreationStrategy,
3 dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
4 lakehouse_context::LakehouseContext,
5 metadata_partition_spec::fetch_metadata_partition_spec,
6 partition_cache::PartitionCache,
7 view::{PartitionSpec, View, ViewMetadata},
8};
9use crate::time::{TimeRange, datetime_to_scalar};
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use chrono::{DateTime, TimeDelta, Utc};
13use datafusion::{
14 arrow::datatypes::{DataType, Field, Schema, TimeUnit},
15 logical_expr::{Expr, col},
16 prelude::*,
17};
18use std::sync::Arc;
19
20const VIEW_SET_NAME: &str = "blocks";
21const VIEW_INSTANCE_ID: &str = "global";
22lazy_static::lazy_static! {
23 static ref BEGIN_TIME_COLUMN: Arc<String> = Arc::new( String::from("begin_time"));
24 static ref INSERT_TIME_COLUMN: Arc<String> = Arc::new( String::from("insert_time"));
25}
26
27#[derive(Debug)]
29pub struct BlocksView {
30 view_set_name: Arc<String>,
31 view_instance_id: Arc<String>,
32 data_sql: Arc<String>,
33}
34
35impl BlocksView {
36 pub fn new() -> Result<Self> {
37 let data_sql = Arc::new(String::from(
38 r#"SELECT block_id, streams.stream_id, processes.process_id, blocks.begin_time, blocks.begin_ticks, blocks.end_time, blocks.end_ticks, blocks.nb_objects, blocks.object_offset, blocks.payload_size, blocks.insert_time,
39 streams.dependencies_metadata, streams.objects_metadata, streams.tags, streams.properties, streams.insert_time as stream_insert_time,
40 processes.start_time, processes.start_ticks, processes.tsc_frequency, processes.exe, processes.username, processes.realname, processes.computer, processes.distro, processes.cpu_brand, processes.insert_time as process_insert_time, processes.parent_process_id, processes.properties as process_properties
41 FROM blocks, streams, processes
42 WHERE blocks.stream_id = streams.stream_id
43 AND blocks.process_id = processes.process_id
44 AND blocks.insert_time >= $1
45 AND blocks.insert_time < $2
46 ORDER BY blocks.insert_time, blocks.block_id
47 ;"#,
48 ));
49 Ok(Self {
50 view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
51 view_instance_id: Arc::new(String::from(VIEW_INSTANCE_ID)),
52 data_sql,
53 })
54 }
55}
56
57#[async_trait]
58impl View for BlocksView {
59 fn get_view_set_name(&self) -> Arc<String> {
60 self.view_set_name.clone()
61 }
62
63 fn get_view_instance_id(&self) -> Arc<String> {
64 self.view_instance_id.clone()
65 }
66
67 async fn make_batch_partition_spec(
68 &self,
69 lakehouse: Arc<LakehouseContext>,
70 _existing_partitions: Arc<PartitionCache>,
71 insert_range: TimeRange,
72 ) -> Result<Arc<dyn PartitionSpec>> {
73 let view_meta = ViewMetadata {
74 view_set_name: self.get_view_set_name(),
75 view_instance_id: self.get_view_instance_id(),
76 file_schema_hash: self.get_file_schema_hash(),
77 };
78 let source_count_query = "
79 SELECT COUNT(*) as count
80 FROM blocks, streams, processes
81 WHERE blocks.stream_id = streams.stream_id
82 AND blocks.process_id = processes.process_id
83 AND blocks.insert_time >= $1
84 AND blocks.insert_time < $2
85 ;";
86 Ok(Arc::new(
87 fetch_metadata_partition_spec(
88 &lakehouse.lake().db_pool,
89 source_count_query,
90 self.data_sql.clone(),
91 view_meta,
92 self.get_file_schema(),
93 insert_range,
94 self.get_time_bounds(),
95 )
96 .await
97 .with_context(|| "fetch_metadata_partition_spec")?,
98 ))
99 }
100
101 fn get_file_schema_hash(&self) -> Vec<u8> {
102 blocks_file_schema_hash()
103 }
104
105 fn get_file_schema(&self) -> Arc<Schema> {
106 Arc::new(blocks_view_schema())
107 }
108
109 async fn jit_update(
110 &self,
111 _lakehouse: Arc<LakehouseContext>,
112 _query_range: Option<TimeRange>,
113 ) -> Result<()> {
114 if *self.view_instance_id == "global" {
115 return Ok(());
117 }
118 anyhow::bail!("not supported");
119 }
120
121 fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
122 Ok(vec![
123 col("begin_time").lt_eq(lit(datetime_to_scalar(end))),
124 col("insert_time").gt_eq(lit(datetime_to_scalar(begin))),
125 ])
126 }
127
128 fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
129 Arc::new(NamedColumnsTimeBounds::new(
131 BEGIN_TIME_COLUMN.clone(),
132 INSERT_TIME_COLUMN.clone(),
133 ))
134 }
135
136 fn get_update_group(&self) -> Option<i32> {
137 Some(1000)
138 }
139
140 fn get_max_partition_time_delta(&self, strategy: &PartitionCreationStrategy) -> TimeDelta {
141 match strategy {
142 PartitionCreationStrategy::Abort | PartitionCreationStrategy::CreateFromSource => {
143 TimeDelta::hours(1)
144 }
145 PartitionCreationStrategy::MergeExisting(_partitions) => TimeDelta::days(1),
146 }
147 }
148}
149
150pub fn blocks_view_schema() -> Schema {
152 Schema::new(vec![
153 Field::new("block_id", DataType::Utf8, false),
154 Field::new("stream_id", DataType::Utf8, false),
155 Field::new("process_id", DataType::Utf8, false),
156 Field::new(
157 "begin_time",
158 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
159 false,
160 ),
161 Field::new("begin_ticks", DataType::Int64, false),
162 Field::new(
163 "end_time",
164 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
165 false,
166 ),
167 Field::new("end_ticks", DataType::Int64, false),
168 Field::new("nb_objects", DataType::Int32, false),
169 Field::new("object_offset", DataType::Int64, false),
170 Field::new("payload_size", DataType::Int64, false),
171 Field::new(
172 "insert_time",
173 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
174 false,
175 ),
176 Field::new("streams.dependencies_metadata", DataType::Binary, false),
177 Field::new("streams.objects_metadata", DataType::Binary, false),
178 Field::new(
179 "streams.tags",
180 DataType::List(Arc::new(Field::new("tag", DataType::Utf8, false))),
181 true,
182 ),
183 Field::new(
184 "streams.properties",
185 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
186 false,
187 ),
188 Field::new(
189 "streams.insert_time",
190 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
191 false,
192 ),
193 Field::new(
194 "processes.start_time",
195 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
196 false,
197 ),
198 Field::new("processes.start_ticks", DataType::Int64, false),
199 Field::new("processes.tsc_frequency", DataType::Int64, false),
200 Field::new("processes.exe", DataType::Utf8, false),
201 Field::new("processes.username", DataType::Utf8, false),
202 Field::new("processes.realname", DataType::Utf8, false),
203 Field::new("processes.computer", DataType::Utf8, false),
204 Field::new("processes.distro", DataType::Utf8, false),
205 Field::new("processes.cpu_brand", DataType::Utf8, false),
206 Field::new(
207 "processes.insert_time",
208 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
209 false,
210 ),
211 Field::new("processes.parent_process_id", DataType::Utf8, false),
212 Field::new(
213 "processes.properties",
214 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
215 false,
216 ),
217 ])
218}
219
220pub fn blocks_file_schema_hash() -> Vec<u8> {
222 vec![2] }