micromegas_analytics/lakehouse/
view.rs1use super::{
2 batch_update::PartitionCreationStrategy,
3 dataframe_time_bounds::DataFrameTimeBounds,
4 lakehouse_context::LakehouseContext,
5 materialized_view::MaterializedView,
6 merge::{PartitionMerger, QueryMerger},
7 partition::Partition,
8 partition_cache::PartitionCache,
9 session_configurator::NoOpSessionConfigurator,
10 view_factory::ViewFactory,
11};
12use crate::{response_writer::Logger, time::TimeRange};
13use anyhow::Result;
14use async_trait::async_trait;
15use chrono::{DateTime, TimeDelta, Utc};
16use datafusion::{
17 arrow::datatypes::Schema, execution::SendableRecordBatchStream, logical_expr::Expr, prelude::*,
18 sql::TableReference,
19};
20use micromegas_ingestion::data_lake_connection::DataLakeConnection;
21use std::fmt::Debug;
22use std::sync::Arc;
23
24#[async_trait]
26pub trait PartitionSpec: Send + Sync + Debug {
27 fn is_empty(&self) -> bool;
29 fn get_source_data_hash(&self) -> Vec<u8>;
31 async fn write(&self, lake: Arc<DataLakeConnection>, logger: Arc<dyn Logger>) -> Result<()>;
33}
34
35#[derive(Debug, Clone)]
37pub struct ViewMetadata {
38 pub view_set_name: Arc<String>,
39 pub view_instance_id: Arc<String>,
40 pub file_schema_hash: Vec<u8>,
41}
42
43#[async_trait]
45pub trait View: std::fmt::Debug + Send + Sync {
46 fn get_view_set_name(&self) -> Arc<String>;
48
49 fn get_view_instance_id(&self) -> Arc<String>;
51
52 async fn make_batch_partition_spec(
55 &self,
56 lakehouse: Arc<LakehouseContext>,
57 existing_partitions: Arc<PartitionCache>,
58 insert_range: TimeRange,
59 ) -> Result<Arc<dyn PartitionSpec>>;
60
61 fn get_file_schema_hash(&self) -> Vec<u8>;
64
65 fn get_file_schema(&self) -> Arc<Schema>;
67
68 async fn jit_update(
70 &self,
71 lakehouse: Arc<LakehouseContext>,
72 query_range: Option<TimeRange>,
73 ) -> Result<()>;
74
75 fn make_time_filter(&self, _begin: DateTime<Utc>, _end: DateTime<Utc>) -> Result<Vec<Expr>>;
78
79 fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds>;
81
82 async fn register_table(&self, ctx: &SessionContext, table: MaterializedView) -> Result<()> {
84 let view_set_name = self.get_view_set_name().to_string();
85 ctx.register_table(
86 TableReference::Bare {
87 table: view_set_name.into(),
88 },
89 Arc::new(table),
90 )?;
91 Ok(())
92 }
93
94 async fn merge_partitions(
95 &self,
96 lakehouse: Arc<LakehouseContext>,
97 partitions_to_merge: Arc<Vec<Partition>>,
98 partitions_all_views: Arc<PartitionCache>,
99 insert_range: TimeRange,
100 ) -> Result<SendableRecordBatchStream> {
101 let merge_query = Arc::new(String::from("SELECT * FROM source;"));
102 let empty_view_factory = Arc::new(ViewFactory::new(vec![]));
103 let merger = QueryMerger::new(
104 empty_view_factory,
105 Arc::new(NoOpSessionConfigurator),
106 self.get_file_schema(),
107 merge_query,
108 );
109 merger
110 .execute_merge_query(
111 lakehouse,
112 partitions_to_merge,
113 partitions_all_views,
114 insert_range,
115 )
116 .await
117 }
118
119 fn get_update_group(&self) -> Option<i32>;
121
122 fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
124 TimeDelta::days(1)
125 }
126}
127
128impl dyn View {
129 pub fn get_meta(&self) -> ViewMetadata {
130 ViewMetadata {
131 view_set_name: self.get_view_set_name(),
132 view_instance_id: self.get_view_instance_id(),
133 file_schema_hash: self.get_file_schema_hash(),
134 }
135 }
136}