micromegas_analytics/lakehouse/view_factory.rs
1//! [`default_view_factory`] makes the default [`ViewFactory`], giving users access to view instances, grouped in sets.
2//!
3//! # View sets
4//!
5//! A ViewFactory defines the available view sets and can instanciate view instances.
6//! All view instances in a set have the same schema.
7//! Some view instances are global (their view_instance_id is 'global').
8//! Global view instances are implicitly accessible to SQL queries.
9//! Non-global view instances are accessible using the table function `view_instance`. See [ViewInstanceTableFunction](super::view_instance_table_function::ViewInstanceTableFunction).
10//!
11//! ## log_entries
12//!
13//! | field | type | description |
14//! |------------- |-----------------------------|-----------------------------------------------------------|
15//! |process_id |Utf8 | unique id of the process, references the processes table |
16//! |exe |Utf8 | filename of the process |
17//! |username |Utf8 | username of the process |
18//! |computer |Utf8 | computer name of the process |
19//! |time |UTC Timestamp (nanoseconds) | time of the log entry event |
20//! |target |Utf8 | category or module name of the log entry |
21//! |level |int32 | verbosity level (Fatal=1, Error=2, Warning=3, Info=4, Debug=5, Trace=6)| |
22//! |msg |Utf8 | message |
23//!
24//! ### log_entries view instances
25//!
26//! The implicit use of the `log_entries` table corresponds to the 'global' instance, which contains the log entries of all the processes.
27//!
28//! Except the 'global' instance, the instance_id refers to any process_id. `view_instance('log_entries', process_id)` contains that process's log. Process-specific views are materialized just-in-time and can provide much better query performance compared to the 'global' instance.
29//!
30//! ## measures
31//!
32//! | field | type | description |
33//! |------------- |-----------------------------|-----------------------------------------------------------|
34//! |process_id |Utf8 | unique id of the process, references the processes table |
35//! |exe |Utf8 | filename of the process |
36//! |username |Utf8 | username of the process |
37//! |computer |Utf8 | computer name of the process |
38//! |time |UTC Timestamp (nanoseconds) | time of the measure event |
39//! |target |Utf8 | category or module name of the measure |
40//! |name |Utf8 | name of the measure |
41//! |unit |Utf8 | unit of measure |
42//! |value |Float64 | value measured |
43//!
44//!
45//! ### measures view instances
46//!
47//! The implicit use of the `measures` table corresponds to the 'global' instance, which contains the metrics of all the processes.
48//!
49//! Except the 'global' instance, the instance_id refers to any process_id. `view_instance('measures', process_id)` contains that process's metrics. Process-specific views are materialized just-in-time and can provide much better query performance compared to the 'global' instance.
50//!
51//! ## thread_spans
52//!
53//! | field | type | description |
54//! |------------- |-----------------------------|------------------------------------------------------------|
55//! |id |Int64 | span id, unique within this thread |
56//! |parent |Int64 | span id of the calling span |
57//! |depth |UInt32 | call stack depth |
58//! |hash |UInt32 | identifies a call site (name, filename, line) |
59//! |begin |UTC Timestamp (nanoseconds) | when the span started its execution |
60//! |end |UTC Timestamp (nanoseconds) | when the span finished its execution |
61//! |duration |Int64 (nanoseconds) | end-begin |
62//! |name |Utf8 | name of the span, usually a function name |
63//! |target |Utf8 | category or module name |
64//! |filename |Utf8 | name or path of the source file where the span is coded |
65//! |line |UInt32 | line number in the file where the span can be found |
66//!
67//! ### thread_spans view instances
68//!
69//! There is no 'global' instance in the 'thread_spans' view set, there is therefore no implicit thread_spans table availble.
70//! Users can call the table function `view_instance('thread_spans', stream_id)` to query the spans in the thread associated with the specified stream_id.
71//!
72//! ## async_events
73//!
74//! | field | type | description |
75//! |------------- |-----------------------------|-----------------------------------------------------------|
76//! |stream_id |Dictionary(Int16, Utf8) | identifier of the thread stream that emitted the event |
77//! |block_id |Dictionary(Int16, Utf8) | unique identifier of the event block |
78//! |time |UTC Timestamp (nanoseconds) | time when the async event occurred |
79//! |event_type |Dictionary(Int16, Utf8) | type of event: "begin" or "end" |
80//! |span_id |Int64 | unique async span identifier |
81//! |parent_span_id|Int64 | span id of the parent async span |
82//! |name |Dictionary(Int16, Utf8) | name of the async span, usually a function name |
83//! |filename |Dictionary(Int16, Utf8) | name or path of the source file where the span is coded |
84//! |target |Dictionary(Int16, Utf8) | category or module name |
85//! |line |UInt32 | line number in the file where the span can be found |
86//!
87//! ### async_events view instances
88//!
89//! There is no 'global' instance in the 'async_events' view set, there is therefore no implicit async_events table available.
90//! Users can call the table function `view_instance('async_events', process_id)` to query the async events in all thread streams associated with the specified process_id.
91//! Process-specific views are materialized just-in-time and can provide good query performance.
92//!
93//! The schema is optimized for high-frequency data. Process information (exe, username, computer, etc.) can be joined when needed:
94//! ```sql
95//! SELECT ae.*, p.exe, p.username, p.computer
96//! FROM view_instance('async_events', process_id) ae
97//! JOIN streams s ON ae.stream_id = s.stream_id
98//! JOIN processes p ON s.process_id = p.process_id
99//! ```
100//!
101//! ## net_spans
102//!
103//! | field | type | description |
104//! |------------------|-----------------------------|-------------------------------------------------------------------------|
105//! |process_id |Dictionary(Int16, Utf8) | process unique id |
106//! |stream_id |Dictionary(Int16, Utf8) | net stream unique id |
107//! |span_id |Int64 | unique span id within the stream |
108//! |parent_span_id |Int64 | span id of the enclosing span (-1 sentinel at Connection roots) |
109//! |depth |UInt32 | tree depth (0 = Connection) |
110//! |kind |Dictionary(Int16, Utf8) | one of `connection` / `object` / `property` / `rpc` |
111//! |name |Dictionary(Int16, Utf8) | span name (connection / object / property / function name) |
112//! |connection_name |Dictionary(Int16, Utf8) | enclosing connection name (denormalized onto every row) |
113//! |is_outgoing |Boolean | direction inherited from the enclosing connection |
114//! |begin_bits |Int64 | cumulative bit offset within the parent span (0 at the Connection root) |
115//! |end_bits |Int64 | `begin_bits + bit_size` |
116//! |bit_size |Int64 | inclusive number of bits attributed to this span |
117//! |begin_time |UTC Timestamp (nanoseconds) | timestamp of the span's Begin event |
118//! |end_time |UTC Timestamp (nanoseconds) | timestamp of the span's End event (equals `begin_time` for properties) |
119//!
120//! ### net_spans view instances
121//!
122//! There is no 'global' instance in the 'net_spans' view set. Users call
123//! `view_instance('net_spans', process_id)` to query the network bandwidth spans emitted by a
124//! given process's net stream. Partitions are materialized just-in-time.
125//!
126//! ## processes
127//!
128//! | field | type | description |
129//! |------------- |-----------------------------|------------------------------------------------------------|
130//! |process_id |Utf8 | process unique id |
131//! |exe |Utf8 | filename of the process |
132//! |username |Utf8 | username of the process |
133//! |realname |Utf8 | real name of the user launching the process |
134//! |computer |Utf8 | name of the computer or vm |
135//! |distro |Utf8 | name of operating system |
136//! |cpu_brand |Utf8 | identifies the cpu |
137//! |tsc frequency |Int64 | number of ticks per second |
138//! |start_time |UTC Timestamp (nanoseconds) | when the process started (as reported by the instrumented process) |
139//! |start_ticks |Int64 | tick count associated with start_time |
140//! |insert_time |UTC Timestamp (nanoseconds) | server-side timestamp when the process metedata was received |
141//! |parent_process_id |Utf8 | unique id of the parent process |
142//! |properties | Array of {key: utf8, value: utf8} | self-reported metadata by the process |
143//!
144//! There is only one instance in this view set and it is implicitly available.
145//!
146//! ## streams
147//!
148//! | field | type | description |
149//! |------------- |-----------------------------|------------------------------------------------------------|
150//! |stream_id |Utf8 | stream unique id |
151//! |process_id |Utf8 | process unique id |
152//! |dependencies_metadata|Binary | memory layout of the event dependencies |
153//! |objects_metadata|Binary | memory layout of the events |
154//! |tags | Array of utf8 | Purpose of the stream, can contain "log", "metrics" or "cpu" |
155//! |properties | Array of {key: utf8, value: utf8} | self-reported stream metadata by the process |
156//! |insert_time |UTC Timestamp (nanoseconds) | server-side timestamp when the stream metedata was received |
157//!
158//! There is only one instance in this view set and it is implicitly available.
159//!
160//! ## blocks
161//!
162//! | field | type | description |
163//! |------------- |-----------------------------|------------------------------------------------------------|
164//! |block_id |Utf8 | block unique id |
165//! |stream_id |Utf8 | stream unique id |
166//! |process_id |Utf8 | process unique id |
167//! |begin_time |UTC Timestamp (nanoseconds) | system time marking the beginning of this event batch |
168//! |begin_ticks |Int64 | tick count associated with begin_time |
169//! |end_time |UTC Timestamp (nanoseconds) | system time marking the ending of this event batch |
170//! |end_ticks |Int64 | tick count associated with end_time |
171//! |nb_objects |Int32 | number of events in this batch |
172//! |object_offset |Int64 | number of events preceding this batch |
173//! |payload_size |Int64 | number of bytes of the binary payload |
174//! |insert_time |UTC Timestamp (nanoseconds) | server-side timestamp when the block was received |
175//! |streams.dependencies_metadata|Binary | memory layout of the event dependencies |
176//! |streams.objects_metadata|Binary | memory layout of the events |
177//! |streams.tags | Array of utf8 | Purpose of the stream, can contain "log", "metrics" or "cpu" |
178//! |streams.properties | Array of {key: utf8, value: utf8} | self-reported stream metadata by the process |
179//! |processes.start_time |UTC Timestamp (nanoseconds) | when the process started (as reported by the instrumented process) |
180//! |processes.start_ticks |Int64 | tick count associated with start_time |
181//! |processes.tsc frequency |Int64 | number of ticks per second |
182//! |processes.exe |Utf8 | filename of the process |
183//! |processes.username |Utf8 | username of the process |
184//! |processes.realname |Utf8 | real name of the user launching the process |
185//! |processes.computer |Utf8 | name of the computer or vm |
186//! |processes.distro |Utf8 | name of operating system |
187//! |processes.cpu_brand |Utf8 | identifies the cpu |
188//!
189//! There is only one instance in this view set and it is implicitly available.
190//!
191//!
192//!
193use super::blocks_view::BlocksView;
194use super::log_stats_view::make_log_stats_view;
195use super::processes_view::make_processes_view;
196use super::streams_view::make_streams_view;
197use super::{
198 async_events_view::AsyncEventsViewMaker, log_view::LogViewMaker,
199 metrics_view::MetricsViewMaker, net_spans_view::NetSpansViewMaker,
200 otel::spans_view::OtelSpansViewMaker, thread_spans_view::ThreadSpansViewMaker, view::View,
201};
202use anyhow::Result;
203use datafusion::arrow::datatypes::Schema;
204use datafusion::execution::runtime_env::RuntimeEnv;
205use micromegas_ingestion::data_lake_connection::DataLakeConnection;
206use std::fmt::Debug;
207use std::{collections::HashMap, sync::Arc};
208
209/// A trait for creating views.
210pub trait ViewMaker: Send + Sync + Debug {
211 /// Creates a new view with the given instance ID.
212 fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>>;
213
214 /// Returns the schema hash for views created by this maker.
215 fn get_schema_hash(&self) -> Vec<u8>;
216
217 /// Returns the schema for views created by this maker.
218 fn get_schema(&self) -> Arc<Schema>;
219}
220
221/// A factory for creating and managing views.
222#[derive(Debug, Clone)]
223pub struct ViewFactory {
224 view_sets: HashMap<String, Arc<dyn ViewMaker>>,
225 global_views: Vec<Arc<dyn View>>,
226}
227
228impl ViewFactory {
229 pub fn new(global_views: Vec<Arc<dyn View>>) -> Self {
230 Self {
231 view_sets: HashMap::new(),
232 global_views,
233 }
234 }
235
236 pub fn get_global_views(&self) -> &[Arc<dyn View>] {
237 &self.global_views
238 }
239
240 pub fn get_global_view(&self, view_name: &str) -> Option<Arc<dyn View>> {
241 self.global_views
242 .iter()
243 .find(|v| *(v.get_view_set_name()) == view_name)
244 .cloned()
245 }
246
247 pub fn add_global_view(&mut self, view: Arc<dyn View>) {
248 self.global_views.push(view);
249 }
250
251 pub fn add_view_set(&mut self, view_set_name: String, maker: Arc<dyn ViewMaker>) {
252 self.view_sets.insert(view_set_name, maker);
253 }
254
255 pub fn get_view_sets(&self) -> &HashMap<String, Arc<dyn ViewMaker>> {
256 &self.view_sets
257 }
258
259 pub fn make_view(&self, view_set_name: &str, view_instance_id: &str) -> Result<Arc<dyn View>> {
260 if let Some(maker) = self.view_sets.get(view_set_name) {
261 maker.make_view(view_instance_id)
262 } else {
263 anyhow::bail!("view set {view_set_name} not found");
264 }
265 }
266}
267
268/// Creates the default `ViewFactory` with all built-in views.
269pub async fn default_view_factory(
270 runtime: Arc<RuntimeEnv>,
271 lake: Arc<DataLakeConnection>,
272) -> Result<ViewFactory> {
273 let blocks_view = Arc::new(BlocksView::new()?);
274 let processes_view = Arc::new(
275 make_processes_view(
276 runtime.clone(),
277 lake.clone(),
278 Arc::new(ViewFactory::new(vec![blocks_view.clone()])),
279 )
280 .await?,
281 );
282 let streams_view = Arc::new(
283 make_streams_view(
284 runtime.clone(),
285 lake.clone(),
286 Arc::new(ViewFactory::new(vec![blocks_view.clone()])),
287 )
288 .await?,
289 );
290 let log_view_maker = Arc::new(LogViewMaker {});
291 let metrics_view_maker = Arc::new(MetricsViewMaker {});
292
293 // Create base views first
294 let global_views = vec![
295 log_view_maker.make_view("global")?,
296 metrics_view_maker.make_view("global")?,
297 processes_view,
298 streams_view,
299 blocks_view,
300 ];
301 let mut factory = ViewFactory::new(global_views);
302 factory.add_view_set(String::from("log_entries"), log_view_maker.clone());
303 factory.add_view_set(String::from("measures"), metrics_view_maker);
304 factory.add_view_set(
305 String::from("thread_spans"),
306 Arc::new(ThreadSpansViewMaker {}),
307 );
308
309 // Create the factory as Arc to pass to other view makers
310 let factory_arc = Arc::new(factory);
311
312 // Create log_stats view with access to the complete factory (including log_entries)
313 let log_stats_view =
314 Arc::new(make_log_stats_view(runtime.clone(), lake.clone(), factory_arc.clone()).await?);
315
316 // Clone factory and add log_stats view
317 let mut updated_factory = (*factory_arc).clone();
318 updated_factory.add_global_view(log_stats_view);
319
320 // Add async_events view maker
321 updated_factory.add_view_set(
322 String::from("async_events"),
323 Arc::new(AsyncEventsViewMaker::new(Arc::new(updated_factory.clone()))),
324 );
325
326 // Add net_spans view maker
327 updated_factory.add_view_set(
328 String::from("net_spans"),
329 Arc::new(NetSpansViewMaker::new(Arc::new(updated_factory.clone()))),
330 );
331
332 // Add otel_spans view maker (per-process JIT only — no global instance)
333 updated_factory.add_view_set(String::from("otel_spans"), Arc::new(OtelSpansViewMaker {}));
334
335 Ok(updated_factory)
336}