micromegas_analytics/lakehouse/
view_factory.rs

1//! [`default_view_factory`] makes the default [`ViewFactory`], giving users access to view instances, grouped in sets.
2//!
3//! # View sets
4//!
5//! A ViewFactory defines the available view sets and can instanciate view instances.
6//! All view instances in a set have the same schema.
7//! Some view instances are global (their view_instance_id is 'global').
8//! Global view instances are implicitly accessible to SQL queries.
9//! Non-global view instances are accessible using the table function `view_instance`. See [ViewInstanceTableFunction](super::view_instance_table_function::ViewInstanceTableFunction).
10//!
11//! ## log_entries
12//!
13//! | field        | type                        | description                                               |
14//! |------------- |-----------------------------|-----------------------------------------------------------|
15//! |process_id    |Utf8                         | unique id of the process, references the processes table  |
16//! |exe           |Utf8                         | filename of the process                                   |
17//! |username      |Utf8                         | username of the process                                   |
18//! |computer      |Utf8                         | computer name of the process                              |
19//! |time          |UTC Timestamp (nanoseconds)  | time of the log entry event                               |
20//! |target        |Utf8                         | category or module name of the log entry                  |
21//! |level         |int32                        | verbosity level (Fatal=1, Error=2, Warning=3, Info=4, Debug=5, Trace=6)|                                           |
22//! |msg           |Utf8                         | message                                                   |
23//!
24//! ### log_entries view instances
25//!
26//! The implicit use of the `log_entries` table corresponds to the 'global' instance, which contains the log entries of all the processes.
27//!
28//! Except the 'global' instance, the instance_id refers to any process_id. `view_instance('log_entries', process_id)` contains that process's log. Process-specific views are materialized just-in-time and can provide much better query performance compared to the 'global' instance.
29//!
30//! ## measures
31//!
32//! | field        | type                        | description                                               |
33//! |------------- |-----------------------------|-----------------------------------------------------------|
34//! |process_id    |Utf8                         | unique id of the process, references the processes table  |
35//! |exe           |Utf8                         | filename of the process                                   |
36//! |username      |Utf8                         | username of the process                                   |
37//! |computer      |Utf8                         | computer name of the process                              |
38//! |time          |UTC Timestamp (nanoseconds)  | time of the measure event                                 |
39//! |target        |Utf8                         | category or module name of the measure                    |
40//! |name          |Utf8                         | name of the measure                                       |
41//! |unit          |Utf8                         | unit of measure                                           |
42//! |value         |Float64                      | value measured                                            |
43//!
44//!
45//! ### measures view instances
46//!
47//! The implicit use of the `measures` table corresponds to the 'global' instance, which contains the metrics of all the processes.
48//!
49//! Except the 'global' instance, the instance_id refers to any process_id. `view_instance('measures', process_id)` contains that process's metrics. Process-specific views are materialized just-in-time and can provide much better query performance compared to the 'global' instance.
50//!
51//! ## thread_spans
52//!
53//! | field        | type                        | description                                                |
54//! |------------- |-----------------------------|------------------------------------------------------------|
55//! |id            |Int64                        | span id, unique within this thread                         |
56//! |parent        |Int64                        | span id of the calling span                                |
57//! |depth         |UInt32                       | call stack depth                                           |
58//! |hash          |UInt32                       | identifies a call site (name, filename, line)              |
59//! |begin         |UTC Timestamp (nanoseconds)  | when the span started its execution                        |
60//! |end           |UTC Timestamp (nanoseconds)  | when the span finished its execution                       |
61//! |duration      |Int64 (nanoseconds)          | end-begin                                                  |
62//! |name          |Utf8                         | name of the span, usually a function name                  |
63//! |target        |Utf8                         | category or module name                                    |
64//! |filename      |Utf8                         | name or path of the source file where the span is coded    |
65//! |line          |UInt32                       | line number in the file where the span can be found        |
66//!
67//! ### thread_spans view instances
68//!
69//! There is no 'global' instance in the 'thread_spans' view set, there is therefore no implicit thread_spans table availble.
70//! Users can call the table function `view_instance('thread_spans', stream_id)` to query the spans in the thread associated with the specified stream_id.
71//!
72//! ## async_events
73//!
74//! | field        | type                        | description                                               |
75//! |------------- |-----------------------------|-----------------------------------------------------------|
76//! |stream_id     |Dictionary(Int16, Utf8)      | identifier of the thread stream that emitted the event   |
77//! |block_id      |Dictionary(Int16, Utf8)      | unique identifier of the event block                     |
78//! |time          |UTC Timestamp (nanoseconds)  | time when the async event occurred                       |
79//! |event_type    |Dictionary(Int16, Utf8)      | type of event: "begin" or "end"                          |
80//! |span_id       |Int64                        | unique async span identifier                             |
81//! |parent_span_id|Int64                        | span id of the parent async span                         |
82//! |name          |Dictionary(Int16, Utf8)      | name of the async span, usually a function name          |
83//! |filename      |Dictionary(Int16, Utf8)      | name or path of the source file where the span is coded  |
84//! |target        |Dictionary(Int16, Utf8)      | category or module name                                   |
85//! |line          |UInt32                       | line number in the file where the span can be found      |
86//!
87//! ### async_events view instances
88//!
89//! There is no 'global' instance in the 'async_events' view set, there is therefore no implicit async_events table available.
90//! Users can call the table function `view_instance('async_events', process_id)` to query the async events in all thread streams associated with the specified process_id.
91//! Process-specific views are materialized just-in-time and can provide good query performance.
92//!
93//! The schema is optimized for high-frequency data. Process information (exe, username, computer, etc.) can be joined when needed:
94//! ```sql
95//! SELECT ae.*, p.exe, p.username, p.computer
96//! FROM view_instance('async_events', process_id) ae
97//! JOIN streams s ON ae.stream_id = s.stream_id  
98//! JOIN processes p ON s.process_id = p.process_id
99//! ```
100//!
101//! ## processes
102//!
103//! | field        | type                        | description                                                |
104//! |------------- |-----------------------------|------------------------------------------------------------|
105//! |process_id    |Utf8                         | process unique id                                          |
106//! |exe           |Utf8                         | filename of the process                                    |
107//! |username      |Utf8                         | username of the process                                    |
108//! |realname      |Utf8                         | real name of the user launching the process                |
109//! |computer      |Utf8                         | name of the computer or vm                                 |
110//! |distro        |Utf8                         | name of operating system                                   |
111//! |cpu_brand     |Utf8                         | identifies the cpu                                         |
112//! |tsc frequency |Int64                        | number of ticks per second                                 |
113//! |start_time    |UTC Timestamp (nanoseconds)  | when the process started (as reported by the instrumented process) |
114//! |start_ticks   |Int64                        | tick count associated with start_time                      |
115//! |insert_time   |UTC Timestamp (nanoseconds)  | server-side timestamp when the process metedata was received |
116//! |parent_process_id |Utf8                     | unique id of the parent process                            |
117//! |properties | Array of {key: utf8, value: utf8} | self-reported metadata by the process                   |
118//!
119//! There is only one instance in this view set and it is implicitly available.
120//!
121//! ## streams
122//!
123//! | field        | type                        | description                                                |
124//! |------------- |-----------------------------|------------------------------------------------------------|
125//! |stream_id     |Utf8                         | stream unique id                                           |
126//! |process_id    |Utf8                         | process unique id                                          |
127//! |dependencies_metadata|Binary                | memory layout of the event dependencies                    |
128//! |objects_metadata|Binary                     | memory layout of the events                                |
129//! |tags          | Array of utf8               | Purpose of the stream, can contain "log", "metrics" or "cpu" |
130//! |properties | Array of {key: utf8, value: utf8} | self-reported stream metadata by the process            |
131//! |insert_time   |UTC Timestamp (nanoseconds)  | server-side timestamp when the stream metedata was received |
132//!
133//! There is only one instance in this view set and it is implicitly available.
134//!
135//! ## blocks
136//!
137//! | field        | type                        | description                                                |
138//! |------------- |-----------------------------|------------------------------------------------------------|
139//! |block_id      |Utf8                         | block unique id                                            |
140//! |stream_id     |Utf8                         | stream unique id                                           |
141//! |process_id    |Utf8                         | process unique id                                          |
142//! |begin_time    |UTC Timestamp (nanoseconds)  | system time marking the beginning of this event batch      |
143//! |begin_ticks   |Int64                        | tick count associated with begin_time                      |
144//! |end_time      |UTC Timestamp (nanoseconds)  | system time marking the ending of this event batch         |
145//! |end_ticks     |Int64                        | tick count associated with end_time                        |
146//! |nb_objects    |Int32                        | number of events in this batch                             |
147//! |object_offset |Int64                        | number of events preceding this batch                      |
148//! |payload_size  |Int64                        | number of bytes of the binary payload                      |
149//! |insert_time   |UTC Timestamp (nanoseconds)  | server-side timestamp when the block was received          |
150//! |streams.dependencies_metadata|Binary        | memory layout of the event dependencies                    |
151//! |streams.objects_metadata|Binary             | memory layout of the events                                |
152//! |streams.tags  | Array of utf8               | Purpose of the stream, can contain "log", "metrics" or "cpu" |
153//! |streams.properties | Array of {key: utf8, value: utf8} | self-reported stream metadata by the process            |
154//! |processes.start_time    |UTC Timestamp (nanoseconds)  | when the process started (as reported by the instrumented process) |
155//! |processes.start_ticks   |Int64                        | tick count associated with start_time                      |
156//! |processes.tsc frequency |Int64                        | number of ticks per second                                 |
157//! |processes.exe           |Utf8                         | filename of the process                                    |
158//! |processes.username      |Utf8                         | username of the process                                    |
159//! |processes.realname      |Utf8                         | real name of the user launching the process                |
160//! |processes.computer      |Utf8                         | name of the computer or vm                                 |
161//! |processes.distro        |Utf8                         | name of operating system                                   |
162//! |processes.cpu_brand     |Utf8                         | identifies the cpu                                         |
163//!
164//! There is only one instance in this view set and it is implicitly available.
165//!
166//!
167//!
168use super::blocks_view::BlocksView;
169use super::log_stats_view::make_log_stats_view;
170use super::processes_view::make_processes_view;
171use super::streams_view::make_streams_view;
172use super::{
173    async_events_view::AsyncEventsViewMaker, log_view::LogViewMaker,
174    metrics_view::MetricsViewMaker, thread_spans_view::ThreadSpansViewMaker, view::View,
175};
176use anyhow::Result;
177use datafusion::arrow::datatypes::Schema;
178use datafusion::execution::runtime_env::RuntimeEnv;
179use micromegas_ingestion::data_lake_connection::DataLakeConnection;
180use std::fmt::Debug;
181use std::{collections::HashMap, sync::Arc};
182
183/// A trait for creating views.
184pub trait ViewMaker: Send + Sync + Debug {
185    /// Creates a new view with the given instance ID.
186    fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>>;
187
188    /// Returns the schema hash for views created by this maker.
189    fn get_schema_hash(&self) -> Vec<u8>;
190
191    /// Returns the schema for views created by this maker.
192    fn get_schema(&self) -> Arc<Schema>;
193}
194
195/// A factory for creating and managing views.
196#[derive(Debug, Clone)]
197pub struct ViewFactory {
198    view_sets: HashMap<String, Arc<dyn ViewMaker>>,
199    global_views: Vec<Arc<dyn View>>,
200}
201
202impl ViewFactory {
203    pub fn new(global_views: Vec<Arc<dyn View>>) -> Self {
204        Self {
205            view_sets: HashMap::new(),
206            global_views,
207        }
208    }
209
210    pub fn get_global_views(&self) -> &[Arc<dyn View>] {
211        &self.global_views
212    }
213
214    pub fn get_global_view(&self, view_name: &str) -> Option<Arc<dyn View>> {
215        self.global_views
216            .iter()
217            .find(|v| *(v.get_view_set_name()) == view_name)
218            .cloned()
219    }
220
221    pub fn add_global_view(&mut self, view: Arc<dyn View>) {
222        self.global_views.push(view);
223    }
224
225    pub fn add_view_set(&mut self, view_set_name: String, maker: Arc<dyn ViewMaker>) {
226        self.view_sets.insert(view_set_name, maker);
227    }
228
229    pub fn get_view_sets(&self) -> &HashMap<String, Arc<dyn ViewMaker>> {
230        &self.view_sets
231    }
232
233    pub fn make_view(&self, view_set_name: &str, view_instance_id: &str) -> Result<Arc<dyn View>> {
234        if let Some(maker) = self.view_sets.get(view_set_name) {
235            maker.make_view(view_instance_id)
236        } else {
237            anyhow::bail!("view set {view_set_name} not found");
238        }
239    }
240}
241
242/// Creates the default `ViewFactory` with all built-in views.
243pub async fn default_view_factory(
244    runtime: Arc<RuntimeEnv>,
245    lake: Arc<DataLakeConnection>,
246) -> Result<ViewFactory> {
247    let blocks_view = Arc::new(BlocksView::new()?);
248    let processes_view = Arc::new(
249        make_processes_view(
250            runtime.clone(),
251            lake.clone(),
252            Arc::new(ViewFactory::new(vec![blocks_view.clone()])),
253        )
254        .await?,
255    );
256    let streams_view = Arc::new(
257        make_streams_view(
258            runtime.clone(),
259            lake.clone(),
260            Arc::new(ViewFactory::new(vec![blocks_view.clone()])),
261        )
262        .await?,
263    );
264    let log_view_maker = Arc::new(LogViewMaker {});
265    let metrics_view_maker = Arc::new(MetricsViewMaker {});
266
267    // Create base views first
268    let global_views = vec![
269        log_view_maker.make_view("global")?,
270        metrics_view_maker.make_view("global")?,
271        processes_view,
272        streams_view,
273        blocks_view,
274    ];
275    let mut factory = ViewFactory::new(global_views);
276    factory.add_view_set(String::from("log_entries"), log_view_maker.clone());
277    factory.add_view_set(String::from("measures"), metrics_view_maker);
278    factory.add_view_set(
279        String::from("thread_spans"),
280        Arc::new(ThreadSpansViewMaker {}),
281    );
282
283    // Create the factory as Arc to pass to other view makers
284    let factory_arc = Arc::new(factory);
285
286    // Create log_stats view with access to the complete factory (including log_entries)
287    let log_stats_view =
288        Arc::new(make_log_stats_view(runtime.clone(), lake.clone(), factory_arc.clone()).await?);
289
290    // Clone factory and add log_stats view
291    let mut updated_factory = (*factory_arc).clone();
292    updated_factory.add_global_view(log_stats_view);
293
294    // Add async_events view maker
295    updated_factory.add_view_set(
296        String::from("async_events"),
297        Arc::new(AsyncEventsViewMaker::new(Arc::new(updated_factory.clone()))),
298    );
299
300    Ok(updated_factory)
301}