micromegas_analytics/lakehouse/
view.rs

1use super::{
2    batch_update::PartitionCreationStrategy,
3    dataframe_time_bounds::DataFrameTimeBounds,
4    lakehouse_context::LakehouseContext,
5    materialized_view::MaterializedView,
6    merge::{PartitionMerger, QueryMerger},
7    partition::Partition,
8    partition_cache::PartitionCache,
9    session_configurator::NoOpSessionConfigurator,
10    view_factory::ViewFactory,
11};
12use crate::{response_writer::Logger, time::TimeRange};
13use anyhow::Result;
14use async_trait::async_trait;
15use chrono::{DateTime, TimeDelta, Utc};
16use datafusion::{
17    arrow::datatypes::Schema, execution::SendableRecordBatchStream, logical_expr::Expr, prelude::*,
18    sql::TableReference,
19};
20use micromegas_ingestion::data_lake_connection::DataLakeConnection;
21use std::fmt::Debug;
22use std::sync::Arc;
23
24/// A trait for defining a partition specification.
25#[async_trait]
26pub trait PartitionSpec: Send + Sync + Debug {
27    /// Returns true if the partition is empty.
28    fn is_empty(&self) -> bool;
29    /// Returns a hash of the source data.
30    fn get_source_data_hash(&self) -> Vec<u8>;
31    /// Writes the partition to the data lake.
32    async fn write(&self, lake: Arc<DataLakeConnection>, logger: Arc<dyn Logger>) -> Result<()>;
33}
34
35/// Metadata about a view.
36#[derive(Debug, Clone)]
37pub struct ViewMetadata {
38    pub view_set_name: Arc<String>,
39    pub view_instance_id: Arc<String>,
40    pub file_schema_hash: Vec<u8>,
41}
42
43/// A trait for defining a view.
44#[async_trait]
45pub trait View: std::fmt::Debug + Send + Sync {
46    /// name of the table from the user's perspective
47    fn get_view_set_name(&self) -> Arc<String>;
48
49    /// get_view_instance_id can be a process_id, a stream_id or 'global'.
50    fn get_view_instance_id(&self) -> Arc<String>;
51
52    /// make_batch_partition_spec determines what should be found in an up to date partition.
53    /// The resulting PartitionSpec can be used to validate existing partitions are create a new one.
54    async fn make_batch_partition_spec(
55        &self,
56        lakehouse: Arc<LakehouseContext>,
57        existing_partitions: Arc<PartitionCache>,
58        insert_range: TimeRange,
59    ) -> Result<Arc<dyn PartitionSpec>>;
60
61    /// get_file_schema_hash returns a hash (can be a version number, version string, etc.) that allows
62    /// to identify out of date partitions.
63    fn get_file_schema_hash(&self) -> Vec<u8>;
64
65    /// get_file_schema returns the schema of the partition file in object storage
66    fn get_file_schema(&self) -> Arc<Schema>;
67
68    /// jit_update creates or updates process-specific partitions before a query
69    async fn jit_update(
70        &self,
71        lakehouse: Arc<LakehouseContext>,
72        query_range: Option<TimeRange>,
73    ) -> Result<()>;
74
75    /// make_time_filter returns a set of expressions that will filter out the rows of the partition
76    /// outside the time range requested.
77    fn make_time_filter(&self, _begin: DateTime<Utc>, _end: DateTime<Utc>) -> Result<Vec<Expr>>;
78
79    // a view must provide a way to compute the time bounds of a DataFrame corresponding to its schema
80    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds>;
81
82    /// register the table in the SessionContext
83    async fn register_table(&self, ctx: &SessionContext, table: MaterializedView) -> Result<()> {
84        let view_set_name = self.get_view_set_name().to_string();
85        ctx.register_table(
86            TableReference::Bare {
87                table: view_set_name.into(),
88            },
89            Arc::new(table),
90        )?;
91        Ok(())
92    }
93
94    async fn merge_partitions(
95        &self,
96        lakehouse: Arc<LakehouseContext>,
97        partitions_to_merge: Arc<Vec<Partition>>,
98        partitions_all_views: Arc<PartitionCache>,
99        insert_range: TimeRange,
100    ) -> Result<SendableRecordBatchStream> {
101        let merge_query = Arc::new(String::from("SELECT * FROM source;"));
102        let empty_view_factory = Arc::new(ViewFactory::new(vec![]));
103        let merger = QueryMerger::new(
104            empty_view_factory,
105            Arc::new(NoOpSessionConfigurator),
106            self.get_file_schema(),
107            merge_query,
108        );
109        merger
110            .execute_merge_query(
111                lakehouse,
112                partitions_to_merge,
113                partitions_all_views,
114                insert_range,
115            )
116            .await
117    }
118
119    /// tells the daemon which view should be materialized and in what order
120    fn get_update_group(&self) -> Option<i32>;
121
122    /// allow the view to subdivide the requested partition
123    fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
124        TimeDelta::days(1)
125    }
126}
127
128impl dyn View {
129    pub fn get_meta(&self) -> ViewMetadata {
130        ViewMetadata {
131            view_set_name: self.get_view_set_name(),
132            view_instance_id: self.get_view_instance_id(),
133            file_schema_hash: self.get_file_schema_hash(),
134        }
135    }
136}