micromegas_analytics/lakehouse/otel/
spans_view.rs1use super::spans_block_processor::OtelSpansBlockProcessor;
13use super::spans_table::otel_spans_table_schema;
14use crate::lakehouse::{
15 batch_update::PartitionCreationStrategy,
16 block_partition_spec::{BlockProcessor, BlockProcessorMap},
17 blocks_view::BlocksView,
18 dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
19 jit_partitions::{
20 JitPartitionConfig, generate_process_jit_partitions, is_jit_partition_up_to_date,
21 write_partition_from_blocks,
22 },
23 lakehouse_context::LakehouseContext,
24 partition_cache::PartitionCache,
25 view::{PartitionSpec, View, ViewMetadata},
26 view_factory::ViewMaker,
27};
28use crate::{
29 metadata::find_process,
30 time::{TimeRange, datetime_to_scalar},
31};
32use anyhow::{Context, Result};
33use async_trait::async_trait;
34use chrono::{DateTime, TimeDelta, Utc};
35use datafusion::{
36 arrow::datatypes::Schema,
37 logical_expr::{BinaryExpr, Expr, Operator, col},
38};
39use micromegas_ingestion::web_ingestion_service::FORMAT_OTLP_TRACES;
40use micromegas_tracing::prelude::*;
41use std::collections::HashMap;
42use std::sync::Arc;
43use uuid::Uuid;
44
45const VIEW_SET_NAME: &str = "otel_spans";
46const SCHEMA_VERSION: u8 = 1;
47lazy_static::lazy_static! {
48 static ref TIME_COLUMN: Arc<String> = Arc::new(String::from("start_time"));
49 static ref END_TIME_COLUMN: Arc<String> = Arc::new(String::from("end_time"));
50}
51
52#[derive(Debug)]
54pub struct OtelSpansViewMaker {}
55
56impl ViewMaker for OtelSpansViewMaker {
57 fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
58 Ok(Arc::new(OtelSpansView::new(view_instance_id)?))
59 }
60
61 fn get_schema_hash(&self) -> Vec<u8> {
62 vec![SCHEMA_VERSION]
63 }
64
65 fn get_schema(&self) -> Arc<Schema> {
66 Arc::new(otel_spans_table_schema())
67 }
68}
69
70#[derive(Debug)]
71pub struct OtelSpansView {
72 view_set_name: Arc<String>,
73 view_instance_id: Arc<String>,
74 process_id: sqlx::types::Uuid,
75}
76
77impl OtelSpansView {
78 pub fn new(view_instance_id: &str) -> Result<Self> {
79 if view_instance_id == "global" {
80 anyhow::bail!("OtelSpansView does not support global view access");
81 }
82 let process_id = Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?;
83 Ok(Self {
84 view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
85 view_instance_id: Arc::new(view_instance_id.into()),
86 process_id,
87 })
88 }
89}
90
91#[async_trait]
92impl View for OtelSpansView {
93 fn get_view_set_name(&self) -> Arc<String> {
94 self.view_set_name.clone()
95 }
96
97 fn get_view_instance_id(&self) -> Arc<String> {
98 self.view_instance_id.clone()
99 }
100
101 async fn make_batch_partition_spec(
102 &self,
103 _lakehouse: Arc<LakehouseContext>,
104 _existing_partitions: Arc<PartitionCache>,
105 _insert_range: TimeRange,
106 ) -> Result<Arc<dyn PartitionSpec>> {
107 anyhow::bail!("OtelSpansView does not support batch partition specs");
108 }
109
110 fn get_file_schema_hash(&self) -> Vec<u8> {
111 vec![SCHEMA_VERSION]
112 }
113
114 fn get_file_schema(&self) -> Arc<Schema> {
115 Arc::new(otel_spans_table_schema())
116 }
117
118 #[span_fn]
119 async fn jit_update(
120 &self,
121 lakehouse: Arc<LakehouseContext>,
122 query_range: Option<TimeRange>,
123 ) -> Result<()> {
124 let process = Arc::new(
125 find_process(&lakehouse.lake().db_pool, &self.process_id)
126 .await
127 .with_context(|| "find_process")?,
128 );
129 let query_range =
130 query_range.unwrap_or_else(|| TimeRange::new(process.start_time, chrono::Utc::now()));
131
132 let blocks_view = BlocksView::new()?;
133 let all_partitions = generate_process_jit_partitions(
134 &JitPartitionConfig::default(),
135 lakehouse.clone(),
136 &blocks_view,
137 &query_range,
138 process.clone(),
139 "trace",
140 )
141 .await
142 .with_context(|| "generate_process_jit_partitions")?;
143 let view_meta = ViewMetadata {
144 view_set_name: self.get_view_set_name(),
145 view_instance_id: self.get_view_instance_id(),
146 file_schema_hash: self.get_file_schema_hash(),
147 };
148
149 let mut block_processors: BlockProcessorMap = HashMap::new();
150 block_processors.insert(
151 FORMAT_OTLP_TRACES,
152 Arc::new(OtelSpansBlockProcessor {}) as Arc<dyn BlockProcessor>,
153 );
154 let block_processors = Arc::new(block_processors);
155
156 for part in all_partitions {
157 if !is_jit_partition_up_to_date(&lakehouse.lake().db_pool, view_meta.clone(), &part)
158 .await?
159 {
160 write_partition_from_blocks(
161 lakehouse.lake().clone(),
162 view_meta.clone(),
163 self.get_file_schema(),
164 part,
165 block_processors.clone(),
166 )
167 .await
168 .with_context(|| "write_partition_from_blocks")?;
169 }
170 }
171 Ok(())
172 }
173
174 fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
175 Ok(vec![
180 Expr::BinaryExpr(BinaryExpr::new(
181 col("start_time").into(),
182 Operator::LtEq,
183 Expr::Literal(datetime_to_scalar(end), None).into(),
184 )),
185 Expr::BinaryExpr(BinaryExpr::new(
186 col("end_time").into(),
187 Operator::GtEq,
188 Expr::Literal(datetime_to_scalar(begin), None).into(),
189 )),
190 ])
191 }
192
193 fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
194 Arc::new(NamedColumnsTimeBounds::new(
195 TIME_COLUMN.clone(),
196 END_TIME_COLUMN.clone(),
197 ))
198 }
199
200 fn get_update_group(&self) -> Option<i32> {
201 None }
203
204 fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
205 TimeDelta::hours(1)
206 }
207}