micromegas_analytics/lakehouse/
partition_source_data.rs1use super::blocks_view::blocks_file_schema_hash;
2use super::lakehouse_context::LakehouseContext;
3use super::partition_cache::PartitionCache;
4use crate::dfext::{
5 string_column_accessor::string_column_by_name, typed_column::typed_column_by_name,
6};
7use crate::metadata::{ProcessMetadata, StreamMetadata};
8use crate::properties::properties_column_accessor::properties_column_by_name;
9use crate::time::TimeRange;
10use crate::{
11 dfext::typed_column::typed_column,
12 lakehouse::{blocks_view::blocks_view_schema, query::query_partitions},
13};
14use anyhow::{Context, Result};
15use async_stream::try_stream;
16use async_trait::async_trait;
17use chrono::DateTime;
18use datafusion::functions_aggregate::{count::count_all, expr_fn::sum, min_max::max};
19use datafusion::{
20 arrow::array::{
21 Array, BinaryArray, GenericListArray, Int32Array, Int64Array, StringArray,
22 TimestampNanosecondArray,
23 },
24 prelude::*,
25};
26use futures::{StreamExt, stream::BoxStream};
27use micromegas_telemetry::types::block::BlockMetadata;
28use std::fmt::Debug;
29use std::sync::Arc;
30use uuid::Uuid;
31
32#[derive(Debug)]
34pub struct PartitionSourceBlock {
35 pub block: BlockMetadata,
36 pub stream: Arc<StreamMetadata>,
37 pub process: Arc<ProcessMetadata>,
38 pub format: String,
42}
43
44#[async_trait]
46pub trait PartitionBlocksSource: Sync + Send + Debug {
47 fn is_empty(&self) -> bool;
49 fn get_nb_blocks(&self) -> i64;
51 fn get_max_payload_size(&self) -> i64;
53 fn get_source_data_hash(&self) -> Vec<u8>;
55 async fn get_blocks_stream(&self) -> BoxStream<'static, Result<Arc<PartitionSourceBlock>>>;
57}
58
59#[derive(Debug)]
61pub struct SourceDataBlocksInMemory {
62 pub blocks: Vec<Arc<PartitionSourceBlock>>,
63 pub block_ids_hash: Vec<u8>,
64}
65
66#[async_trait]
67impl PartitionBlocksSource for SourceDataBlocksInMemory {
68 fn is_empty(&self) -> bool {
69 self.blocks.is_empty()
70 }
71
72 fn get_nb_blocks(&self) -> i64 {
73 self.blocks.len() as i64
74 }
75
76 fn get_max_payload_size(&self) -> i64 {
77 let mut max_size = self.blocks[0].block.payload_size;
78 for block in &self.blocks {
79 max_size = max_size.max(block.block.payload_size);
80 }
81 max_size
82 }
83
84 fn get_source_data_hash(&self) -> Vec<u8> {
85 self.block_ids_hash.clone()
86 }
87
88 async fn get_blocks_stream(&self) -> BoxStream<'static, Result<Arc<PartitionSourceBlock>>> {
89 let stream = futures::stream::iter(self.blocks.clone()).map(Ok);
90 stream.boxed()
91 }
92}
93
94#[derive(Debug)]
96pub struct SourceDataBlocks {
97 pub blocks_dataframe: DataFrame,
98 pub object_count: i64,
99 pub block_count: i64,
100 pub max_payload: i64,
101}
102
103#[async_trait]
104impl PartitionBlocksSource for SourceDataBlocks {
105 fn is_empty(&self) -> bool {
106 self.object_count == 0
107 }
108
109 fn get_nb_blocks(&self) -> i64 {
110 self.block_count
111 }
112
113 fn get_max_payload_size(&self) -> i64 {
114 self.max_payload
115 }
116
117 fn get_source_data_hash(&self) -> Vec<u8> {
118 self.object_count.to_le_bytes().to_vec()
119 }
120
121 async fn get_blocks_stream(&self) -> BoxStream<'static, Result<Arc<PartitionSourceBlock>>> {
122 let df = self.blocks_dataframe.clone();
123 Box::pin(try_stream! {
124 let mut stream = df.execute_stream().await?;
125 while let Some(res) = stream.next().await {
126 let b = res.with_context(|| "fetching blocks query results")?;
127 let block_id_column = string_column_by_name(&b, "block_id")?;
128 let stream_id_column = string_column_by_name(&b, "stream_id")?;
129 let process_id_column = string_column_by_name(&b, "process_id")?;
130 let begin_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "begin_time")?;
131 let begin_ticks_column: &Int64Array = typed_column_by_name(&b, "begin_ticks")?;
132 let end_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "end_time")?;
133 let end_ticks_column: &Int64Array = typed_column_by_name(&b, "end_ticks")?;
134 let nb_objects_column: &Int32Array = typed_column_by_name(&b, "nb_objects")?;
135 let object_offset_column: &Int64Array = typed_column_by_name(&b, "object_offset")?;
136 let payload_size_column: &Int64Array = typed_column_by_name(&b, "payload_size")?;
137 let block_insert_time_column: &TimestampNanosecondArray =
138 typed_column_by_name(&b, "insert_time")?;
139 let dependencies_metadata_column: &BinaryArray =
140 typed_column_by_name(&b, "streams.dependencies_metadata")?;
141 let objects_metadata_column: &BinaryArray =
142 typed_column_by_name(&b, "streams.objects_metadata")?;
143 let stream_tags_column: &GenericListArray<i32> = typed_column_by_name(&b, "streams.tags")?;
144 let stream_properties_accessor = properties_column_by_name(&b, "streams.properties")?;
145 let stream_format_column = string_column_by_name(&b, "streams.format")?;
146
147 let process_start_time_column: &TimestampNanosecondArray =
148 typed_column_by_name(&b, "processes.start_time")?;
149 let process_start_ticks_column: &Int64Array =
150 typed_column_by_name(&b, "processes.start_ticks")?;
151 let process_tsc_freq_column: &Int64Array =
152 typed_column_by_name(&b, "processes.tsc_frequency")?;
153 let process_exe_column = string_column_by_name(&b, "processes.exe")?;
154 let process_username_column = string_column_by_name(&b, "processes.username")?;
155 let process_realname_column = string_column_by_name(&b, "processes.realname")?;
156 let process_computer_column = string_column_by_name(&b, "processes.computer")?;
157 let process_distro_column = string_column_by_name(&b, "processes.distro")?;
158 let process_cpu_column = string_column_by_name(&b, "processes.cpu_brand")?;
159 let process_parent_column = string_column_by_name(&b, "processes.parent_process_id")?;
160 let process_properties_accessor = properties_column_by_name(&b, "processes.properties")?;
161 for ir in 0..b.num_rows() {
162 let block_insert_time = block_insert_time_column.value(ir);
163 let stream_id = Uuid::parse_str(stream_id_column.value(ir)?)?;
164 let process_id = Uuid::parse_str(process_id_column.value(ir)?)?;
165 let block = BlockMetadata {
166 block_id: Uuid::parse_str(block_id_column.value(ir)?)?,
167 stream_id,
168 process_id,
169 begin_time: DateTime::from_timestamp_nanos(begin_time_column.value(ir)),
170 end_time: DateTime::from_timestamp_nanos(end_time_column.value(ir)),
171 begin_ticks: begin_ticks_column.value(ir),
172 end_ticks: end_ticks_column.value(ir),
173 nb_objects: nb_objects_column.value(ir),
174 payload_size: payload_size_column.value(ir),
175 object_offset: object_offset_column.value(ir),
176 insert_time: DateTime::from_timestamp_nanos(block_insert_time),
177 };
178
179 let dependencies_metadata = dependencies_metadata_column.value(ir);
180 let objects_metadata = objects_metadata_column.value(ir);
181 let stream_tags = stream_tags_column
182 .value(ir)
183 .as_any()
184 .downcast_ref::<StringArray>()
185 .with_context(|| "casting stream_tags")?
186 .iter()
187 .map(|item| String::from(item.unwrap_or_default()))
188 .collect();
189 let stream_properties_jsonb = stream_properties_accessor.jsonb_value(ir)?;
190 let stream = StreamMetadata {
191 process_id,
192 stream_id,
193 dependencies_metadata: ciborium::from_reader(dependencies_metadata)
194 .with_context(|| "decoding dependencies_metadata")?,
195 objects_metadata: ciborium::from_reader(objects_metadata)
196 .with_context(|| "decoding objects_metadata")?,
197 tags: stream_tags,
198 properties: Arc::new(stream_properties_jsonb),
199 };
200 let process_properties_jsonb = process_properties_accessor.jsonb_value(ir)?;
201 let parent_value = process_parent_column.value(ir)?;
202 let parent_process_id = if parent_value.is_empty() {
203 None
204 } else {
205 Some(Uuid::parse_str(parent_value).with_context(|| "parsing parent process_id")?)
206 };
207
208 let process = ProcessMetadata {
209 process_id,
210 exe: process_exe_column.value(ir)?.into(),
211 username: process_username_column.value(ir)?.into(),
212 realname: process_realname_column.value(ir)?.into(),
213 computer: process_computer_column.value(ir)?.into(),
214 distro: process_distro_column.value(ir)?.into(),
215 cpu_brand: process_cpu_column.value(ir)?.into(),
216 tsc_frequency: process_tsc_freq_column.value(ir),
217 start_time: DateTime::from_timestamp_nanos(process_start_time_column.value(ir)),
218 start_ticks: process_start_ticks_column.value(ir),
219 parent_process_id,
220 properties: Arc::new(process_properties_jsonb),
221 };
222 let format = stream_format_column.value(ir)?.to_string();
223 yield Arc::new(PartitionSourceBlock {
224 block,
225 stream: Arc::new(stream),
226 process: Arc::new(process),
227 format,
228 });
229 }
230 }
231
232 })
233 }
234}
235
236pub fn hash_to_object_count(hash: &[u8]) -> Result<i64> {
238 Ok(i64::from_le_bytes(
239 hash.try_into().with_context(|| "hash_to_object_count")?,
240 ))
241}
242
243pub async fn fetch_partition_source_data(
245 lakehouse: Arc<LakehouseContext>,
246 existing_partitions: Arc<PartitionCache>,
247 insert_range: TimeRange,
248 source_stream_tag: &str,
249) -> Result<SourceDataBlocks> {
250 let begin_rfc = insert_range.begin.to_rfc3339();
251 let end_rfc = insert_range.end.to_rfc3339();
252 let sql = format!(
253 r#"
254 SELECT block_id, stream_id, process_id, begin_time, begin_ticks, end_time, end_ticks, nb_objects,
255 object_offset, payload_size, insert_time,
256 "streams.dependencies_metadata", "streams.objects_metadata", "streams.tags", "streams.properties", "streams.format",
257 "processes.start_time", "processes.start_ticks", "processes.tsc_frequency", "processes.exe",
258 "processes.username", "processes.realname", "processes.computer", "processes.distro", "processes.cpu_brand",
259 "processes.parent_process_id", "processes.properties"
260 FROM source
261 WHERE array_has( "streams.tags", '{source_stream_tag}' )
262 AND insert_time >= '{begin_rfc}'
263 AND insert_time < '{end_rfc}'
264 ;"#
265 );
266 let block_partitions = existing_partitions
267 .filter("blocks", "global", &blocks_file_schema_hash(), insert_range)
268 .partitions;
269 let reader_factory = lakehouse.reader_factory().clone();
270 let df = query_partitions(
271 lakehouse.runtime().clone(),
272 reader_factory,
273 lakehouse.lake().blob_storage.inner(),
274 Arc::new(blocks_view_schema()),
275 Arc::new(block_partitions),
276 &sql,
277 )
278 .await
279 .with_context(|| "blocks query")?;
280 let blocks_stats_df = df.clone().aggregate(
281 vec![],
282 vec![
283 sum(col("nb_objects")),
284 count_all(),
285 max(col("payload_size")),
286 ],
287 )?;
288 let blocks_stats_rbs = blocks_stats_df.collect().await?;
289 if blocks_stats_rbs.len() != 1 {
290 anyhow::bail!("nb_objects_rbs has size {}", blocks_stats_rbs.len());
291 }
292 if blocks_stats_rbs[0].num_rows() != 1 {
293 anyhow::bail!(
294 "nb_objects_rbs[0] has size {}",
295 blocks_stats_rbs[0].num_rows()
296 );
297 }
298 let sub_nb_objects_column: &Int64Array = typed_column(&blocks_stats_rbs[0], 0)?;
299 let object_count = sub_nb_objects_column.value(0);
300 let block_count_column: &Int64Array = typed_column(&blocks_stats_rbs[0], 1)?;
301 let block_count = block_count_column.value(0);
302 let max_payload_size_column: &Int64Array = typed_column(&blocks_stats_rbs[0], 2)?;
303 let max_payload = max_payload_size_column.value(0);
304
305 Ok(SourceDataBlocks {
306 blocks_dataframe: df,
307 object_count,
308 block_count,
309 max_payload,
310 })
311}