micromegas_analytics/lakehouse/
log_stats_view.rs

1use super::{
2    session_configurator::NoOpSessionConfigurator, sql_batch_view::SqlBatchView,
3    view_factory::ViewFactory,
4};
5use anyhow::Result;
6use chrono::TimeDelta;
7use datafusion::execution::runtime_env::RuntimeEnv;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use std::sync::Arc;
10
11/// Creates a new `SqlBatchView` for log statistics aggregated by process, minute, level, and target.
12pub async fn make_log_stats_view(
13    runtime: Arc<RuntimeEnv>,
14    lake: Arc<DataLakeConnection>,
15    view_factory: Arc<ViewFactory>,
16) -> Result<SqlBatchView> {
17    // Query to count source rows in the time range by summing nb_objects from log blocks only
18    let count_src_query = Arc::new(String::from(
19        r#"
20        SELECT sum(nb_objects) as count
21        FROM blocks
22        WHERE array_has("streams.tags", 'log')
23        AND insert_time >= '{begin}'
24        AND insert_time < '{end}'
25        ;"#,
26    ));
27
28    // Transform query to aggregate logs by time bin, process, level, and target
29    let transform_query = Arc::new(String::from(
30        r#"
31        SELECT date_bin('1 minute', time) as time_bin,
32               process_id,
33               level,
34               target,
35               count(*) as count
36        FROM log_entries
37        WHERE insert_time >= '{begin}'
38        AND insert_time < '{end}'
39        GROUP BY process_id, level, target, time_bin
40        ;"#,
41    ));
42
43    // Merge query to combine partitions
44    let merge_query = Arc::new(String::from(
45        r#"
46        SELECT time_bin,
47               process_id,
48               level,
49               target,
50               sum(count) as count
51        FROM {source}
52        GROUP BY process_id, level, target, time_bin
53        ;"#,
54    ));
55
56    let time_column = Arc::new(String::from("time_bin"));
57
58    SqlBatchView::new(
59        runtime,
60        Arc::new("log_stats".to_owned()),
61        time_column.clone(), // min_time_column
62        time_column,         // max_time_column
63        count_src_query,
64        transform_query,
65        merge_query,
66        lake,
67        view_factory,
68        Arc::new(NoOpSessionConfigurator),
69        Some(3000),         // update_group
70        TimeDelta::days(1), // source partition delta
71        TimeDelta::days(1), // merge partition delta
72        None,               // custom merger
73    )
74    .await
75}