micromegas_analytics/lakehouse/
log_stats_view.rs1use super::{
2 session_configurator::NoOpSessionConfigurator, sql_batch_view::SqlBatchView,
3 view_factory::ViewFactory,
4};
5use anyhow::Result;
6use chrono::TimeDelta;
7use datafusion::execution::runtime_env::RuntimeEnv;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use std::sync::Arc;
10
11pub async fn make_log_stats_view(
13 runtime: Arc<RuntimeEnv>,
14 lake: Arc<DataLakeConnection>,
15 view_factory: Arc<ViewFactory>,
16) -> Result<SqlBatchView> {
17 let count_src_query = Arc::new(String::from(
19 r#"
20 SELECT sum(nb_objects) as count
21 FROM blocks
22 WHERE array_has("streams.tags", 'log')
23 AND insert_time >= '{begin}'
24 AND insert_time < '{end}'
25 ;"#,
26 ));
27
28 let transform_query = Arc::new(String::from(
30 r#"
31 SELECT date_bin('1 minute', time) as time_bin,
32 process_id,
33 level,
34 target,
35 count(*) as count
36 FROM log_entries
37 WHERE insert_time >= '{begin}'
38 AND insert_time < '{end}'
39 GROUP BY process_id, level, target, time_bin
40 ;"#,
41 ));
42
43 let merge_query = Arc::new(String::from(
45 r#"
46 SELECT time_bin,
47 process_id,
48 level,
49 target,
50 sum(count) as count
51 FROM {source}
52 GROUP BY process_id, level, target, time_bin
53 ;"#,
54 ));
55
56 let time_column = Arc::new(String::from("time_bin"));
57
58 SqlBatchView::new(
59 runtime,
60 Arc::new("log_stats".to_owned()),
61 time_column.clone(), time_column, count_src_query,
64 transform_query,
65 merge_query,
66 lake,
67 view_factory,
68 Arc::new(NoOpSessionConfigurator),
69 Some(3000), TimeDelta::days(1), TimeDelta::days(1), None, )
74 .await
75}