micromegas_analytics/lakehouse/otel/
spans_view.rs

1//! Per-process JIT view of OTel spans.
2//!
3//! `otel_spans` mirrors the `AsyncEventsView` pattern (no batch path; JIT-only;
4//! materialized per-process) but the time-conversion plumbing is simpler: OTLP
5//! timestamps are absolute nanoseconds (`tsc_frequency = 1_000_000_000`), so we
6//! use the plain `find_process` flow instead of the latest-timing variant.
7//!
8//! Cross-process trace traversal (`WHERE trace_id = X` across services) is a
9//! documented v1 limitation — users supply the `process_id` to
10//! `view_instance('otel_spans', '<uuid>')` or UNION across multiple instances.
11
12use super::spans_block_processor::OtelSpansBlockProcessor;
13use super::spans_table::otel_spans_table_schema;
14use crate::lakehouse::{
15    batch_update::PartitionCreationStrategy,
16    block_partition_spec::{BlockProcessor, BlockProcessorMap},
17    blocks_view::BlocksView,
18    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
19    jit_partitions::{
20        JitPartitionConfig, generate_process_jit_partitions, is_jit_partition_up_to_date,
21        write_partition_from_blocks,
22    },
23    lakehouse_context::LakehouseContext,
24    partition_cache::PartitionCache,
25    view::{PartitionSpec, View, ViewMetadata},
26    view_factory::ViewMaker,
27};
28use crate::{
29    metadata::find_process,
30    time::{TimeRange, datetime_to_scalar},
31};
32use anyhow::{Context, Result};
33use async_trait::async_trait;
34use chrono::{DateTime, TimeDelta, Utc};
35use datafusion::{
36    arrow::datatypes::Schema,
37    logical_expr::{BinaryExpr, Expr, Operator, col},
38};
39use micromegas_ingestion::web_ingestion_service::FORMAT_OTLP_TRACES;
40use micromegas_tracing::prelude::*;
41use std::collections::HashMap;
42use std::sync::Arc;
43use uuid::Uuid;
44
45const VIEW_SET_NAME: &str = "otel_spans";
46const SCHEMA_VERSION: u8 = 1;
47lazy_static::lazy_static! {
48    static ref TIME_COLUMN: Arc<String> = Arc::new(String::from("start_time"));
49    static ref END_TIME_COLUMN: Arc<String> = Arc::new(String::from("end_time"));
50}
51
52/// `ViewMaker` for `OtelSpansView`. Per-process only — `"global"` is rejected.
53#[derive(Debug)]
54pub struct OtelSpansViewMaker {}
55
56impl ViewMaker for OtelSpansViewMaker {
57    fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
58        Ok(Arc::new(OtelSpansView::new(view_instance_id)?))
59    }
60
61    fn get_schema_hash(&self) -> Vec<u8> {
62        vec![SCHEMA_VERSION]
63    }
64
65    fn get_schema(&self) -> Arc<Schema> {
66        Arc::new(otel_spans_table_schema())
67    }
68}
69
70#[derive(Debug)]
71pub struct OtelSpansView {
72    view_set_name: Arc<String>,
73    view_instance_id: Arc<String>,
74    process_id: sqlx::types::Uuid,
75}
76
77impl OtelSpansView {
78    pub fn new(view_instance_id: &str) -> Result<Self> {
79        if view_instance_id == "global" {
80            anyhow::bail!("OtelSpansView does not support global view access");
81        }
82        let process_id = Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?;
83        Ok(Self {
84            view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
85            view_instance_id: Arc::new(view_instance_id.into()),
86            process_id,
87        })
88    }
89}
90
91#[async_trait]
92impl View for OtelSpansView {
93    fn get_view_set_name(&self) -> Arc<String> {
94        self.view_set_name.clone()
95    }
96
97    fn get_view_instance_id(&self) -> Arc<String> {
98        self.view_instance_id.clone()
99    }
100
101    async fn make_batch_partition_spec(
102        &self,
103        _lakehouse: Arc<LakehouseContext>,
104        _existing_partitions: Arc<PartitionCache>,
105        _insert_range: TimeRange,
106    ) -> Result<Arc<dyn PartitionSpec>> {
107        anyhow::bail!("OtelSpansView does not support batch partition specs");
108    }
109
110    fn get_file_schema_hash(&self) -> Vec<u8> {
111        vec![SCHEMA_VERSION]
112    }
113
114    fn get_file_schema(&self) -> Arc<Schema> {
115        Arc::new(otel_spans_table_schema())
116    }
117
118    #[span_fn]
119    async fn jit_update(
120        &self,
121        lakehouse: Arc<LakehouseContext>,
122        query_range: Option<TimeRange>,
123    ) -> Result<()> {
124        let process = Arc::new(
125            find_process(&lakehouse.lake().db_pool, &self.process_id)
126                .await
127                .with_context(|| "find_process")?,
128        );
129        let query_range =
130            query_range.unwrap_or_else(|| TimeRange::new(process.start_time, chrono::Utc::now()));
131
132        let blocks_view = BlocksView::new()?;
133        let all_partitions = generate_process_jit_partitions(
134            &JitPartitionConfig::default(),
135            lakehouse.clone(),
136            &blocks_view,
137            &query_range,
138            process.clone(),
139            "trace",
140        )
141        .await
142        .with_context(|| "generate_process_jit_partitions")?;
143        let view_meta = ViewMetadata {
144            view_set_name: self.get_view_set_name(),
145            view_instance_id: self.get_view_instance_id(),
146            file_schema_hash: self.get_file_schema_hash(),
147        };
148
149        let mut block_processors: BlockProcessorMap = HashMap::new();
150        block_processors.insert(
151            FORMAT_OTLP_TRACES,
152            Arc::new(OtelSpansBlockProcessor {}) as Arc<dyn BlockProcessor>,
153        );
154        let block_processors = Arc::new(block_processors);
155
156        for part in all_partitions {
157            if !is_jit_partition_up_to_date(&lakehouse.lake().db_pool, view_meta.clone(), &part)
158                .await?
159            {
160                write_partition_from_blocks(
161                    lakehouse.lake().clone(),
162                    view_meta.clone(),
163                    self.get_file_schema(),
164                    part,
165                    block_processors.clone(),
166                )
167                .await
168                .with_context(|| "write_partition_from_blocks")?;
169            }
170        }
171        Ok(())
172    }
173
174    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
175        // Dual-bound (start_time <= end AND end_time >= begin) so spans whose
176        // duration crosses the query window — i.e. start before `begin` or end
177        // after `end` — are still included. A `start_time BETWEEN begin AND end`
178        // filter would silently drop long-running spans that overlap the window.
179        Ok(vec![
180            Expr::BinaryExpr(BinaryExpr::new(
181                col("start_time").into(),
182                Operator::LtEq,
183                Expr::Literal(datetime_to_scalar(end), None).into(),
184            )),
185            Expr::BinaryExpr(BinaryExpr::new(
186                col("end_time").into(),
187                Operator::GtEq,
188                Expr::Literal(datetime_to_scalar(begin), None).into(),
189            )),
190        ])
191    }
192
193    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
194        Arc::new(NamedColumnsTimeBounds::new(
195            TIME_COLUMN.clone(),
196            END_TIME_COLUMN.clone(),
197        ))
198    }
199
200    fn get_update_group(&self) -> Option<i32> {
201        None // process-specific JIT view; no scheduled updates
202    }
203
204    fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
205        TimeDelta::hours(1)
206    }
207}