micromegas_analytics/lakehouse/
partition_source_data.rs1use super::blocks_view::blocks_file_schema_hash;
2use super::lakehouse_context::LakehouseContext;
3use super::partition_cache::PartitionCache;
4use crate::dfext::{
5 string_column_accessor::string_column_by_name, typed_column::typed_column_by_name,
6};
7use crate::metadata::{ProcessMetadata, StreamMetadata};
8use crate::properties::properties_column_accessor::properties_column_by_name;
9use crate::time::TimeRange;
10use crate::{
11 dfext::typed_column::typed_column,
12 lakehouse::{blocks_view::blocks_view_schema, query::query_partitions},
13};
14use anyhow::{Context, Result};
15use async_stream::try_stream;
16use async_trait::async_trait;
17use chrono::DateTime;
18use datafusion::functions_aggregate::{count::count_all, expr_fn::sum, min_max::max};
19use datafusion::{
20 arrow::array::{
21 Array, BinaryArray, GenericListArray, Int32Array, Int64Array, StringArray,
22 TimestampNanosecondArray,
23 },
24 prelude::*,
25};
26use futures::{StreamExt, stream::BoxStream};
27use micromegas_telemetry::types::block::BlockMetadata;
28use std::fmt::Debug;
29use std::sync::Arc;
30use uuid::Uuid;
31
32#[derive(Debug)]
34pub struct PartitionSourceBlock {
35 pub block: BlockMetadata,
36 pub stream: Arc<StreamMetadata>,
37 pub process: Arc<ProcessMetadata>,
38}
39
40#[async_trait]
42pub trait PartitionBlocksSource: Sync + Send + Debug {
43 fn is_empty(&self) -> bool;
45 fn get_nb_blocks(&self) -> i64;
47 fn get_max_payload_size(&self) -> i64;
49 fn get_source_data_hash(&self) -> Vec<u8>;
51 async fn get_blocks_stream(&self) -> BoxStream<'static, Result<Arc<PartitionSourceBlock>>>;
53}
54
55#[derive(Debug)]
57pub struct SourceDataBlocksInMemory {
58 pub blocks: Vec<Arc<PartitionSourceBlock>>,
59 pub block_ids_hash: Vec<u8>,
60}
61
62#[async_trait]
63impl PartitionBlocksSource for SourceDataBlocksInMemory {
64 fn is_empty(&self) -> bool {
65 self.blocks.is_empty()
66 }
67
68 fn get_nb_blocks(&self) -> i64 {
69 self.blocks.len() as i64
70 }
71
72 fn get_max_payload_size(&self) -> i64 {
73 let mut max_size = self.blocks[0].block.payload_size;
74 for block in &self.blocks {
75 max_size = max_size.max(block.block.payload_size);
76 }
77 max_size
78 }
79
80 fn get_source_data_hash(&self) -> Vec<u8> {
81 self.block_ids_hash.clone()
82 }
83
84 async fn get_blocks_stream(&self) -> BoxStream<'static, Result<Arc<PartitionSourceBlock>>> {
85 let stream = futures::stream::iter(self.blocks.clone()).map(Ok);
86 stream.boxed()
87 }
88}
89
90#[derive(Debug)]
92pub struct SourceDataBlocks {
93 pub blocks_dataframe: DataFrame,
94 pub object_count: i64,
95 pub block_count: i64,
96 pub max_payload: i64,
97}
98
99#[async_trait]
100impl PartitionBlocksSource for SourceDataBlocks {
101 fn is_empty(&self) -> bool {
102 self.object_count == 0
103 }
104
105 fn get_nb_blocks(&self) -> i64 {
106 self.block_count
107 }
108
109 fn get_max_payload_size(&self) -> i64 {
110 self.max_payload
111 }
112
113 fn get_source_data_hash(&self) -> Vec<u8> {
114 self.object_count.to_le_bytes().to_vec()
115 }
116
117 async fn get_blocks_stream(&self) -> BoxStream<'static, Result<Arc<PartitionSourceBlock>>> {
118 let df = self.blocks_dataframe.clone();
119 Box::pin(try_stream! {
120 let mut stream = df.execute_stream().await?;
121 while let Some(res) = stream.next().await {
122 let b = res.with_context(|| "fetching blocks query results")?;
123 let block_id_column = string_column_by_name(&b, "block_id")?;
124 let stream_id_column = string_column_by_name(&b, "stream_id")?;
125 let process_id_column = string_column_by_name(&b, "process_id")?;
126 let begin_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "begin_time")?;
127 let begin_ticks_column: &Int64Array = typed_column_by_name(&b, "begin_ticks")?;
128 let end_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "end_time")?;
129 let end_ticks_column: &Int64Array = typed_column_by_name(&b, "end_ticks")?;
130 let nb_objects_column: &Int32Array = typed_column_by_name(&b, "nb_objects")?;
131 let object_offset_column: &Int64Array = typed_column_by_name(&b, "object_offset")?;
132 let payload_size_column: &Int64Array = typed_column_by_name(&b, "payload_size")?;
133 let block_insert_time_column: &TimestampNanosecondArray =
134 typed_column_by_name(&b, "insert_time")?;
135 let dependencies_metadata_column: &BinaryArray =
136 typed_column_by_name(&b, "streams.dependencies_metadata")?;
137 let objects_metadata_column: &BinaryArray =
138 typed_column_by_name(&b, "streams.objects_metadata")?;
139 let stream_tags_column: &GenericListArray<i32> = typed_column_by_name(&b, "streams.tags")?;
140 let stream_properties_accessor = properties_column_by_name(&b, "streams.properties")?;
141
142 let process_start_time_column: &TimestampNanosecondArray =
143 typed_column_by_name(&b, "processes.start_time")?;
144 let process_start_ticks_column: &Int64Array =
145 typed_column_by_name(&b, "processes.start_ticks")?;
146 let process_tsc_freq_column: &Int64Array =
147 typed_column_by_name(&b, "processes.tsc_frequency")?;
148 let process_exe_column = string_column_by_name(&b, "processes.exe")?;
149 let process_username_column = string_column_by_name(&b, "processes.username")?;
150 let process_realname_column = string_column_by_name(&b, "processes.realname")?;
151 let process_computer_column = string_column_by_name(&b, "processes.computer")?;
152 let process_distro_column = string_column_by_name(&b, "processes.distro")?;
153 let process_cpu_column = string_column_by_name(&b, "processes.cpu_brand")?;
154 let process_parent_column = string_column_by_name(&b, "processes.parent_process_id")?;
155 let process_properties_accessor = properties_column_by_name(&b, "processes.properties")?;
156 for ir in 0..b.num_rows() {
157 let block_insert_time = block_insert_time_column.value(ir);
158 let stream_id = Uuid::parse_str(stream_id_column.value(ir)?)?;
159 let process_id = Uuid::parse_str(process_id_column.value(ir)?)?;
160 let block = BlockMetadata {
161 block_id: Uuid::parse_str(block_id_column.value(ir)?)?,
162 stream_id,
163 process_id,
164 begin_time: DateTime::from_timestamp_nanos(begin_time_column.value(ir)),
165 end_time: DateTime::from_timestamp_nanos(end_time_column.value(ir)),
166 begin_ticks: begin_ticks_column.value(ir),
167 end_ticks: end_ticks_column.value(ir),
168 nb_objects: nb_objects_column.value(ir),
169 payload_size: payload_size_column.value(ir),
170 object_offset: object_offset_column.value(ir),
171 insert_time: DateTime::from_timestamp_nanos(block_insert_time),
172 };
173
174 let dependencies_metadata = dependencies_metadata_column.value(ir);
175 let objects_metadata = objects_metadata_column.value(ir);
176 let stream_tags = stream_tags_column
177 .value(ir)
178 .as_any()
179 .downcast_ref::<StringArray>()
180 .with_context(|| "casting stream_tags")?
181 .iter()
182 .map(|item| String::from(item.unwrap_or_default()))
183 .collect();
184 let stream_properties_jsonb = stream_properties_accessor.jsonb_value(ir)?;
185 let stream = StreamMetadata {
186 process_id,
187 stream_id,
188 dependencies_metadata: ciborium::from_reader(dependencies_metadata)
189 .with_context(|| "decoding dependencies_metadata")?,
190 objects_metadata: ciborium::from_reader(objects_metadata)
191 .with_context(|| "decoding objects_metadata")?,
192 tags: stream_tags,
193 properties: Arc::new(stream_properties_jsonb),
194 };
195 let process_properties_jsonb = process_properties_accessor.jsonb_value(ir)?;
196 let parent_value = process_parent_column.value(ir)?;
197 let parent_process_id = if parent_value.is_empty() {
198 None
199 } else {
200 Some(Uuid::parse_str(parent_value).with_context(|| "parsing parent process_id")?)
201 };
202
203 let process = ProcessMetadata {
204 process_id,
205 exe: process_exe_column.value(ir)?.into(),
206 username: process_username_column.value(ir)?.into(),
207 realname: process_realname_column.value(ir)?.into(),
208 computer: process_computer_column.value(ir)?.into(),
209 distro: process_distro_column.value(ir)?.into(),
210 cpu_brand: process_cpu_column.value(ir)?.into(),
211 tsc_frequency: process_tsc_freq_column.value(ir),
212 start_time: DateTime::from_timestamp_nanos(process_start_time_column.value(ir)),
213 start_ticks: process_start_ticks_column.value(ir),
214 parent_process_id,
215 properties: Arc::new(process_properties_jsonb),
216 };
217 yield Arc::new(PartitionSourceBlock {
218 block,
219 stream: Arc::new(stream),
220 process: Arc::new(process),
221 });
222 }
223 }
224
225 })
226 }
227}
228
229pub fn hash_to_object_count(hash: &[u8]) -> Result<i64> {
231 Ok(i64::from_le_bytes(
232 hash.try_into().with_context(|| "hash_to_object_count")?,
233 ))
234}
235
236pub async fn fetch_partition_source_data(
238 lakehouse: Arc<LakehouseContext>,
239 existing_partitions: Arc<PartitionCache>,
240 insert_range: TimeRange,
241 source_stream_tag: &str,
242) -> Result<SourceDataBlocks> {
243 let begin_rfc = insert_range.begin.to_rfc3339();
244 let end_rfc = insert_range.end.to_rfc3339();
245 let sql = format!(
246 r#"
247 SELECT block_id, stream_id, process_id, begin_time, begin_ticks, end_time, end_ticks, nb_objects,
248 object_offset, payload_size, insert_time,
249 "streams.dependencies_metadata", "streams.objects_metadata", "streams.tags", "streams.properties",
250 "processes.start_time", "processes.start_ticks", "processes.tsc_frequency", "processes.exe",
251 "processes.username", "processes.realname", "processes.computer", "processes.distro", "processes.cpu_brand",
252 "processes.parent_process_id", "processes.properties"
253 FROM source
254 WHERE array_has( "streams.tags", '{source_stream_tag}' )
255 AND insert_time >= '{begin_rfc}'
256 AND insert_time < '{end_rfc}'
257 ;"#
258 );
259 let block_partitions = existing_partitions
260 .filter("blocks", "global", &blocks_file_schema_hash(), insert_range)
261 .partitions;
262 let reader_factory = lakehouse.reader_factory().clone();
263 let df = query_partitions(
264 lakehouse.runtime().clone(),
265 reader_factory,
266 lakehouse.lake().blob_storage.inner(),
267 Arc::new(blocks_view_schema()),
268 Arc::new(block_partitions),
269 &sql,
270 )
271 .await
272 .with_context(|| "blocks query")?;
273 let blocks_stats_df = df.clone().aggregate(
274 vec![],
275 vec![
276 sum(col("nb_objects")),
277 count_all(),
278 max(col("payload_size")),
279 ],
280 )?;
281 let blocks_stats_rbs = blocks_stats_df.collect().await?;
282 if blocks_stats_rbs.len() != 1 {
283 anyhow::bail!("nb_objects_rbs has size {}", blocks_stats_rbs.len());
284 }
285 if blocks_stats_rbs[0].num_rows() != 1 {
286 anyhow::bail!(
287 "nb_objects_rbs[0] has size {}",
288 blocks_stats_rbs[0].num_rows()
289 );
290 }
291 let sub_nb_objects_column: &Int64Array = typed_column(&blocks_stats_rbs[0], 0)?;
292 let object_count = sub_nb_objects_column.value(0);
293 let block_count_column: &Int64Array = typed_column(&blocks_stats_rbs[0], 1)?;
294 let block_count = block_count_column.value(0);
295 let max_payload_size_column: &Int64Array = typed_column(&blocks_stats_rbs[0], 2)?;
296 let max_payload = max_payload_size_column.value(0);
297
298 Ok(SourceDataBlocks {
299 blocks_dataframe: df,
300 object_count,
301 block_count,
302 max_payload,
303 })
304}