micromegas_analytics/lakehouse/
streams_view.rs

1use super::{
2    session_configurator::NoOpSessionConfigurator, sql_batch_view::SqlBatchView,
3    view_factory::ViewFactory,
4};
5use anyhow::Result;
6use chrono::TimeDelta;
7use datafusion::execution::runtime_env::RuntimeEnv;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use std::sync::Arc;
10
11/// Creates a new `SqlBatchView` for streams.
12pub async fn make_streams_view(
13    runtime: Arc<RuntimeEnv>,
14    lake: Arc<DataLakeConnection>,
15    view_factory: Arc<ViewFactory>,
16) -> Result<SqlBatchView> {
17    let count_src_query = Arc::new(String::from(
18        r#"
19        SELECT count(*) as count
20        FROM  blocks
21        WHERE insert_time >= '{begin}'
22        AND   insert_time < '{end}'
23        ;"#,
24    ));
25    let transform_query = Arc::new(String::from(
26        r#"
27SELECT stream_id,
28       first_value("process_id") as process_id,
29       first_value("streams.dependencies_metadata") as dependencies_metadata,
30       first_value("streams.objects_metadata") as objects_metadata,
31       first_value("streams.tags") as tags,
32       first_value("streams.properties") as properties,
33       first_value("streams.insert_time") as insert_time,
34       max(insert_time) as last_update_time
35FROM blocks
36GROUP BY stream_id
37        ;"#,
38    ));
39    let merge_query = Arc::new(String::from(
40        r#"
41SELECT stream_id,
42       first_value(process_id) as process_id,
43       first_value(dependencies_metadata) as dependencies_metadata,
44       first_value(objects_metadata) as objects_metadata,
45       first_value(tags) as tags,
46       first_value(properties) as properties,
47       first_value(insert_time) as insert_time,
48       max(last_update_time) as last_update_time
49FROM {source}
50GROUP BY stream_id
51        ;"#,
52    ));
53    let min_time_column = Arc::new(String::from("insert_time"));
54    let max_time_column = Arc::new(String::from("last_update_time"));
55    SqlBatchView::new(
56        runtime,
57        Arc::new("streams".to_owned()),
58        min_time_column,
59        max_time_column,
60        count_src_query,
61        transform_query,
62        merge_query,
63        lake,
64        view_factory,
65        Arc::new(NoOpSessionConfigurator),
66        Some(2000),
67        TimeDelta::days(1), // from source
68        TimeDelta::days(1), // when merging
69        None,
70    )
71    .await
72}