micromegas_analytics/lakehouse/
streams_view.rs

1use super::{
2    session_configurator::NoOpSessionConfigurator, sql_batch_view::SqlBatchView,
3    view_factory::ViewFactory,
4};
5use anyhow::Result;
6use chrono::TimeDelta;
7use datafusion::execution::runtime_env::RuntimeEnv;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use std::sync::Arc;
10
11/// Creates a new `SqlBatchView` for streams.
12pub async fn make_streams_view(
13    runtime: Arc<RuntimeEnv>,
14    lake: Arc<DataLakeConnection>,
15    view_factory: Arc<ViewFactory>,
16) -> Result<SqlBatchView> {
17    let count_src_query = Arc::new(String::from(
18        r#"
19        SELECT count(*) as count
20        FROM  blocks
21        WHERE insert_time >= '{begin}'
22        AND   insert_time < '{end}'
23        ;"#,
24    ));
25    let transform_query = Arc::new(String::from(
26        r#"
27SELECT stream_id,
28       first_value("process_id") as process_id,
29       first_value("streams.dependencies_metadata") as dependencies_metadata,
30       first_value("streams.objects_metadata") as objects_metadata,
31       first_value("streams.tags") as tags,
32       first_value("streams.properties") as properties,
33       first_value("streams.insert_time") as insert_time,
34       first_value("streams.format") as format,
35       max(insert_time) as last_update_time
36FROM blocks
37GROUP BY stream_id
38        ;"#,
39    ));
40    let merge_query = Arc::new(String::from(
41        r#"
42SELECT stream_id,
43       first_value(process_id) as process_id,
44       first_value(dependencies_metadata) as dependencies_metadata,
45       first_value(objects_metadata) as objects_metadata,
46       first_value(tags) as tags,
47       first_value(properties) as properties,
48       first_value(insert_time) as insert_time,
49       first_value(format) as format,
50       max(last_update_time) as last_update_time
51FROM {source}
52GROUP BY stream_id
53        ;"#,
54    ));
55    let min_time_column = Arc::new(String::from("insert_time"));
56    let max_time_column = Arc::new(String::from("last_update_time"));
57    SqlBatchView::new(
58        runtime,
59        Arc::new("streams".to_owned()),
60        min_time_column,
61        max_time_column,
62        count_src_query,
63        transform_query,
64        merge_query,
65        lake,
66        view_factory,
67        Arc::new(NoOpSessionConfigurator),
68        Some(2000),
69        TimeDelta::days(1), // from source
70        TimeDelta::days(1), // when merging
71        None,
72    )
73    .await
74}