micromegas_analytics/
time.rs1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, TimeDelta, Utc};
5use datafusion::scalar::ScalarValue;
6use micromegas_telemetry::types::block::BlockMetadata;
7use sqlx::Row;
8
9use crate::metadata::ProcessMetadata;
10
11const NANOS_PER_SEC: f64 = 1000.0 * 1000.0 * 1000.0;
12
13#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
15pub struct TimeRange {
16 pub begin: DateTime<Utc>,
17 pub end: DateTime<Utc>,
18}
19
20impl TimeRange {
21 pub fn new(begin: DateTime<Utc>, end: DateTime<Utc>) -> Self {
22 Self { begin, end }
23 }
24}
25
26pub async fn make_time_converter_from_db(
28 pool: &sqlx::Pool<sqlx::Postgres>,
29 process: &ProcessMetadata,
30) -> Result<ConvertTicks> {
31 if process.tsc_frequency > 0 {
32 return ConvertTicks::from_meta_data(
34 process.start_ticks,
35 process.start_time.timestamp_nanos_opt().unwrap_or_default(),
36 process.tsc_frequency,
37 );
38 }
39 let row = sqlx::query(
41 "SELECT end_time, end_ticks
42 FROM blocks
43 WHERE process_id = $1
44 ORDER BY end_time DESC
45 LIMIT 1",
46 )
47 .bind(process.process_id)
48 .fetch_one(pool)
49 .await
50 .with_context(|| "getting last block end time for tsc estimation")?;
51 let end_time: chrono::DateTime<chrono::Utc> = row.try_get("end_time")?;
52 let relative_end_ticks: i64 = row.try_get("end_ticks")?;
53 let delta_time = end_time - process.start_time;
54 let nb_seconds = delta_time.num_nanoseconds().unwrap_or_default() as f64 / 1_000_000_000.0;
55 let ticks_per_second = relative_end_ticks as f64 / nb_seconds;
56 ConvertTicks::from_meta_data(
57 process.start_ticks,
58 process.start_time.timestamp_nanos_opt().unwrap_or_default(),
59 ticks_per_second.round() as i64,
60 )
61}
62
63pub fn make_time_converter_from_block_meta(
65 process: &ProcessMetadata,
66 block: &BlockMetadata,
67) -> Result<ConvertTicks> {
68 if process.tsc_frequency > 0 {
69 return ConvertTicks::from_meta_data(
71 process.start_ticks,
72 process.start_time.timestamp_nanos_opt().unwrap_or_default(),
73 process.tsc_frequency,
74 );
75 }
76 let delta_time = block.end_time - process.start_time;
77 let nb_seconds = delta_time.num_nanoseconds().unwrap_or_default() as f64 / 1_000_000_000.0;
78 let ticks_per_second = block.end_ticks as f64 / nb_seconds;
79 ConvertTicks::from_meta_data(
80 process.start_ticks,
81 process.start_time.timestamp_nanos_opt().unwrap_or_default(),
82 ticks_per_second.round() as i64,
83 )
84}
85
86pub fn make_time_converter_from_latest_timing(
90 process: &ProcessMetadata,
91 last_block_end_ticks: i64,
92 last_block_end_time: chrono::DateTime<chrono::Utc>,
93) -> Result<ConvertTicks> {
94 if process.tsc_frequency > 0 {
95 return ConvertTicks::from_meta_data(
97 process.start_ticks,
98 process.start_time.timestamp_nanos_opt().unwrap_or_default(),
99 process.tsc_frequency,
100 );
101 }
102 let delta_time = last_block_end_time - process.start_time;
104 let nb_seconds = delta_time.num_nanoseconds().unwrap_or_default() as f64 / 1_000_000_000.0;
105 let ticks_per_second = last_block_end_ticks as f64 / nb_seconds;
106 ConvertTicks::from_meta_data(
107 process.start_ticks,
108 process.start_time.timestamp_nanos_opt().unwrap_or_default(),
109 ticks_per_second.round() as i64,
110 )
111}
112
113#[derive(Debug, Clone)]
115pub struct ConvertTicks {
116 tick_offset: i64,
117 process_start_ns: i64,
118 frequency: i64, inv_tsc_frequency_ns: f64,
120 inv_tsc_frequency_ms: f64,
121}
122
123impl ConvertTicks {
124 pub fn from_meta_data(start_ticks: i64, process_start_ns: i64, frequency: i64) -> Result<Self> {
125 if frequency <= 0 {
126 anyhow::bail!("invalid frequency")
127 }
128 Ok(Self {
129 tick_offset: start_ticks,
130 process_start_ns,
131 frequency,
132 inv_tsc_frequency_ns: get_tsc_frequency_inverse_ns(frequency),
133 inv_tsc_frequency_ms: get_tsc_frequency_inverse_ms(frequency),
134 })
135 }
136
137 pub fn get_frequency(&self) -> i64 {
139 self.frequency
140 }
141
142 pub fn to_ticks(&self, delta: TimeDelta) -> i64 {
144 let mut seconds = delta.num_seconds() as f64;
145 seconds += delta.subsec_nanos() as f64 / NANOS_PER_SEC;
146 let freq = self.frequency as f64;
147 (seconds * freq).round() as i64
148 }
149
150 pub fn ticks_to_nanoseconds(&self, ticks: i64) -> i64 {
152 let delta = (ticks - self.tick_offset) as f64;
153 let ns_since_process_start = (delta * self.inv_tsc_frequency_ns).round() as i64;
154 self.process_start_ns + ns_since_process_start
155 }
156
157 pub fn delta_ticks_to_time(&self, delta: i64) -> DateTime<Utc> {
159 let ns_since_process_start = (delta as f64 * self.inv_tsc_frequency_ns).round() as i64;
160 DateTime::from_timestamp_nanos(self.process_start_ns + ns_since_process_start)
161 }
162
163 pub fn delta_ticks_to_ns(&self, delta: i64) -> i64 {
165 let ns_since_process_start = (delta as f64 * self.inv_tsc_frequency_ns).round() as i64;
166 self.process_start_ns + ns_since_process_start
167 }
168
169 pub fn delta_ticks_to_ms(&self, delta_ticks: i64) -> f64 {
171 let delta = delta_ticks as f64;
172 delta * self.inv_tsc_frequency_ms
173 }
174
175 pub fn time_to_delta_ticks(&self, time: DateTime<Utc>) -> i64 {
177 self.to_ticks(time - DateTime::from_timestamp_nanos(self.process_start_ns))
178 }
179}
180
181#[allow(clippy::cast_precision_loss)]
183pub fn get_tsc_frequency_inverse_ms(tsc_frequency: i64) -> f64 {
184 1000.0 / tsc_frequency as f64
185}
186
187#[allow(clippy::cast_precision_loss)]
189pub fn get_tsc_frequency_inverse_ns(tsc_frequency: i64) -> f64 {
190 NANOS_PER_SEC / tsc_frequency as f64
191}
192
193pub fn datetime_to_scalar(v: DateTime<Utc>) -> ScalarValue {
195 lazy_static::lazy_static! {
196 static ref UTC_OFFSET: Arc<str> = Arc::from("+00:00");
197 }
198 ScalarValue::TimestampNanosecond(v.timestamp_nanos_opt(), Some(UTC_OFFSET.clone()))
199}