micromegas_analytics/lakehouse/
partition_source_data.rs

1use super::blocks_view::blocks_file_schema_hash;
2use super::lakehouse_context::LakehouseContext;
3use super::partition_cache::PartitionCache;
4use crate::dfext::{
5    string_column_accessor::string_column_by_name, typed_column::typed_column_by_name,
6};
7use crate::metadata::{ProcessMetadata, StreamMetadata};
8use crate::properties::properties_column_accessor::properties_column_by_name;
9use crate::time::TimeRange;
10use crate::{
11    dfext::typed_column::typed_column,
12    lakehouse::{blocks_view::blocks_view_schema, query::query_partitions},
13};
14use anyhow::{Context, Result};
15use async_stream::try_stream;
16use async_trait::async_trait;
17use chrono::DateTime;
18use datafusion::functions_aggregate::{count::count_all, expr_fn::sum, min_max::max};
19use datafusion::{
20    arrow::array::{
21        Array, BinaryArray, GenericListArray, Int32Array, Int64Array, StringArray,
22        TimestampNanosecondArray,
23    },
24    prelude::*,
25};
26use futures::{StreamExt, stream::BoxStream};
27use micromegas_telemetry::types::block::BlockMetadata;
28use std::fmt::Debug;
29use std::sync::Arc;
30use uuid::Uuid;
31
32/// Represents a single block of source data for a partition.
33#[derive(Debug)]
34pub struct PartitionSourceBlock {
35    pub block: BlockMetadata,
36    pub stream: Arc<StreamMetadata>,
37    pub process: Arc<ProcessMetadata>,
38}
39
40/// A trait for providing blocks of source data for partitions.
41#[async_trait]
42pub trait PartitionBlocksSource: Sync + Send + Debug {
43    /// Returns true if there are no blocks in the source.
44    fn is_empty(&self) -> bool;
45    /// Returns the number of blocks in the source.
46    fn get_nb_blocks(&self) -> i64;
47    /// Returns the maximum payload size of the blocks in the source.
48    fn get_max_payload_size(&self) -> i64;
49    /// Returns a hash of the source data.
50    fn get_source_data_hash(&self) -> Vec<u8>;
51    /// Returns a stream of the source blocks.
52    async fn get_blocks_stream(&self) -> BoxStream<'static, Result<Arc<PartitionSourceBlock>>>;
53}
54
55/// A `PartitionBlocksSource` implementation that stores blocks in memory.
56#[derive(Debug)]
57pub struct SourceDataBlocksInMemory {
58    pub blocks: Vec<Arc<PartitionSourceBlock>>,
59    pub block_ids_hash: Vec<u8>,
60}
61
62#[async_trait]
63impl PartitionBlocksSource for SourceDataBlocksInMemory {
64    fn is_empty(&self) -> bool {
65        self.blocks.is_empty()
66    }
67
68    fn get_nb_blocks(&self) -> i64 {
69        self.blocks.len() as i64
70    }
71
72    fn get_max_payload_size(&self) -> i64 {
73        let mut max_size = self.blocks[0].block.payload_size;
74        for block in &self.blocks {
75            max_size = max_size.max(block.block.payload_size);
76        }
77        max_size
78    }
79
80    fn get_source_data_hash(&self) -> Vec<u8> {
81        self.block_ids_hash.clone()
82    }
83
84    async fn get_blocks_stream(&self) -> BoxStream<'static, Result<Arc<PartitionSourceBlock>>> {
85        let stream = futures::stream::iter(self.blocks.clone()).map(Ok);
86        stream.boxed()
87    }
88}
89
90/// A `PartitionBlocksSource` implementation that fetches blocks from a DataFusion DataFrame.
91#[derive(Debug)]
92pub struct SourceDataBlocks {
93    pub blocks_dataframe: DataFrame,
94    pub object_count: i64,
95    pub block_count: i64,
96    pub max_payload: i64,
97}
98
99#[async_trait]
100impl PartitionBlocksSource for SourceDataBlocks {
101    fn is_empty(&self) -> bool {
102        self.object_count == 0
103    }
104
105    fn get_nb_blocks(&self) -> i64 {
106        self.block_count
107    }
108
109    fn get_max_payload_size(&self) -> i64 {
110        self.max_payload
111    }
112
113    fn get_source_data_hash(&self) -> Vec<u8> {
114        self.object_count.to_le_bytes().to_vec()
115    }
116
117    async fn get_blocks_stream(&self) -> BoxStream<'static, Result<Arc<PartitionSourceBlock>>> {
118        let df = self.blocks_dataframe.clone();
119        Box::pin(try_stream! {
120            let mut stream = df.execute_stream().await?;
121            while let Some(res) = stream.next().await {
122                let b = res.with_context(|| "fetching blocks query results")?;
123                let block_id_column = string_column_by_name(&b, "block_id")?;
124                let stream_id_column = string_column_by_name(&b, "stream_id")?;
125                let process_id_column = string_column_by_name(&b, "process_id")?;
126                let begin_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "begin_time")?;
127                let begin_ticks_column: &Int64Array = typed_column_by_name(&b, "begin_ticks")?;
128                let end_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "end_time")?;
129                let end_ticks_column: &Int64Array = typed_column_by_name(&b, "end_ticks")?;
130                let nb_objects_column: &Int32Array = typed_column_by_name(&b, "nb_objects")?;
131                let object_offset_column: &Int64Array = typed_column_by_name(&b, "object_offset")?;
132                let payload_size_column: &Int64Array = typed_column_by_name(&b, "payload_size")?;
133                let block_insert_time_column: &TimestampNanosecondArray =
134                    typed_column_by_name(&b, "insert_time")?;
135                let dependencies_metadata_column: &BinaryArray =
136                    typed_column_by_name(&b, "streams.dependencies_metadata")?;
137                let objects_metadata_column: &BinaryArray =
138                    typed_column_by_name(&b, "streams.objects_metadata")?;
139                let stream_tags_column: &GenericListArray<i32> = typed_column_by_name(&b, "streams.tags")?;
140                let stream_properties_accessor = properties_column_by_name(&b, "streams.properties")?;
141
142                let process_start_time_column: &TimestampNanosecondArray =
143                    typed_column_by_name(&b, "processes.start_time")?;
144                let process_start_ticks_column: &Int64Array =
145                    typed_column_by_name(&b, "processes.start_ticks")?;
146                let process_tsc_freq_column: &Int64Array =
147                    typed_column_by_name(&b, "processes.tsc_frequency")?;
148                let process_exe_column = string_column_by_name(&b, "processes.exe")?;
149                let process_username_column = string_column_by_name(&b, "processes.username")?;
150                let process_realname_column = string_column_by_name(&b, "processes.realname")?;
151                let process_computer_column = string_column_by_name(&b, "processes.computer")?;
152                let process_distro_column = string_column_by_name(&b, "processes.distro")?;
153                let process_cpu_column = string_column_by_name(&b, "processes.cpu_brand")?;
154                let process_parent_column = string_column_by_name(&b, "processes.parent_process_id")?;
155                let process_properties_accessor = properties_column_by_name(&b, "processes.properties")?;
156                for ir in 0..b.num_rows() {
157                    let block_insert_time = block_insert_time_column.value(ir);
158                    let stream_id = Uuid::parse_str(stream_id_column.value(ir)?)?;
159                    let process_id = Uuid::parse_str(process_id_column.value(ir)?)?;
160                    let block = BlockMetadata {
161                        block_id: Uuid::parse_str(block_id_column.value(ir)?)?,
162                        stream_id,
163                        process_id,
164                        begin_time: DateTime::from_timestamp_nanos(begin_time_column.value(ir)),
165                        end_time: DateTime::from_timestamp_nanos(end_time_column.value(ir)),
166                        begin_ticks: begin_ticks_column.value(ir),
167                        end_ticks: end_ticks_column.value(ir),
168                        nb_objects: nb_objects_column.value(ir),
169                        payload_size: payload_size_column.value(ir),
170                        object_offset: object_offset_column.value(ir),
171                        insert_time: DateTime::from_timestamp_nanos(block_insert_time),
172                    };
173
174                    let dependencies_metadata = dependencies_metadata_column.value(ir);
175                    let objects_metadata = objects_metadata_column.value(ir);
176                    let stream_tags = stream_tags_column
177                        .value(ir)
178                        .as_any()
179                        .downcast_ref::<StringArray>()
180                        .with_context(|| "casting stream_tags")?
181                        .iter()
182                        .map(|item| String::from(item.unwrap_or_default()))
183                        .collect();
184                    let stream_properties_jsonb = stream_properties_accessor.jsonb_value(ir)?;
185                    let stream = StreamMetadata {
186                        process_id,
187                        stream_id,
188                        dependencies_metadata: ciborium::from_reader(dependencies_metadata)
189                            .with_context(|| "decoding dependencies_metadata")?,
190                        objects_metadata: ciborium::from_reader(objects_metadata)
191                            .with_context(|| "decoding objects_metadata")?,
192                        tags: stream_tags,
193                        properties: Arc::new(stream_properties_jsonb),
194                    };
195                    let process_properties_jsonb = process_properties_accessor.jsonb_value(ir)?;
196                    let parent_value = process_parent_column.value(ir)?;
197                    let parent_process_id = if parent_value.is_empty() {
198                        None
199                    } else {
200                        Some(Uuid::parse_str(parent_value).with_context(|| "parsing parent process_id")?)
201                    };
202
203                    let process = ProcessMetadata {
204                        process_id,
205                        exe: process_exe_column.value(ir)?.into(),
206                        username: process_username_column.value(ir)?.into(),
207                        realname: process_realname_column.value(ir)?.into(),
208                        computer: process_computer_column.value(ir)?.into(),
209                        distro: process_distro_column.value(ir)?.into(),
210                        cpu_brand: process_cpu_column.value(ir)?.into(),
211                        tsc_frequency: process_tsc_freq_column.value(ir),
212                        start_time: DateTime::from_timestamp_nanos(process_start_time_column.value(ir)),
213                        start_ticks: process_start_ticks_column.value(ir),
214                        parent_process_id,
215                        properties: Arc::new(process_properties_jsonb),
216                    };
217                    yield Arc::new(PartitionSourceBlock {
218                        block,
219                        stream: Arc::new(stream),
220                        process: Arc::new(process),
221                    });
222                }
223            }
224
225        })
226    }
227}
228
229/// Converts a hash (expected to be an i64 as bytes) to an object count.
230pub fn hash_to_object_count(hash: &[u8]) -> Result<i64> {
231    Ok(i64::from_le_bytes(
232        hash.try_into().with_context(|| "hash_to_object_count")?,
233    ))
234}
235
236/// Fetches partition source data from the data lake.
237pub async fn fetch_partition_source_data(
238    lakehouse: Arc<LakehouseContext>,
239    existing_partitions: Arc<PartitionCache>,
240    insert_range: TimeRange,
241    source_stream_tag: &str,
242) -> Result<SourceDataBlocks> {
243    let begin_rfc = insert_range.begin.to_rfc3339();
244    let end_rfc = insert_range.end.to_rfc3339();
245    let sql = format!(
246        r#"
247          SELECT block_id, stream_id, process_id, begin_time, begin_ticks, end_time, end_ticks, nb_objects,
248              object_offset, payload_size, insert_time,
249              "streams.dependencies_metadata", "streams.objects_metadata", "streams.tags", "streams.properties",
250              "processes.start_time", "processes.start_ticks", "processes.tsc_frequency", "processes.exe",
251              "processes.username", "processes.realname", "processes.computer", "processes.distro", "processes.cpu_brand",
252              "processes.parent_process_id", "processes.properties"
253          FROM source
254          WHERE array_has( "streams.tags", '{source_stream_tag}' )
255          AND insert_time >= '{begin_rfc}'
256          AND insert_time < '{end_rfc}'
257          ;"#
258    );
259    let block_partitions = existing_partitions
260        .filter("blocks", "global", &blocks_file_schema_hash(), insert_range)
261        .partitions;
262    let reader_factory = lakehouse.reader_factory().clone();
263    let df = query_partitions(
264        lakehouse.runtime().clone(),
265        reader_factory,
266        lakehouse.lake().blob_storage.inner(),
267        Arc::new(blocks_view_schema()),
268        Arc::new(block_partitions),
269        &sql,
270    )
271    .await
272    .with_context(|| "blocks query")?;
273    let blocks_stats_df = df.clone().aggregate(
274        vec![],
275        vec![
276            sum(col("nb_objects")),
277            count_all(),
278            max(col("payload_size")),
279        ],
280    )?;
281    let blocks_stats_rbs = blocks_stats_df.collect().await?;
282    if blocks_stats_rbs.len() != 1 {
283        anyhow::bail!("nb_objects_rbs has size {}", blocks_stats_rbs.len());
284    }
285    if blocks_stats_rbs[0].num_rows() != 1 {
286        anyhow::bail!(
287            "nb_objects_rbs[0] has size {}",
288            blocks_stats_rbs[0].num_rows()
289        );
290    }
291    let sub_nb_objects_column: &Int64Array = typed_column(&blocks_stats_rbs[0], 0)?;
292    let object_count = sub_nb_objects_column.value(0);
293    let block_count_column: &Int64Array = typed_column(&blocks_stats_rbs[0], 1)?;
294    let block_count = block_count_column.value(0);
295    let max_payload_size_column: &Int64Array = typed_column(&blocks_stats_rbs[0], 2)?;
296    let max_payload = max_payload_size_column.value(0);
297
298    Ok(SourceDataBlocks {
299        blocks_dataframe: df,
300        object_count,
301        block_count,
302        max_payload,
303    })
304}