micromegas_tracing/event/
stream.rs

1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6use crate::event::TracingBlock;
7
8/// StreamDesc is the metadata associated with an event stream
9#[derive(Debug, Serialize, Deserialize)]
10pub struct StreamDesc {
11    pub stream_id: uuid::Uuid,
12    pub process_id: uuid::Uuid,
13    pub tags: Vec<String>,
14    pub properties: HashMap<String, String>,
15}
16
17impl StreamDesc {
18    pub fn new(
19        process_id: uuid::Uuid,
20        tags: &[String],
21        properties: HashMap<String, String>,
22    ) -> Self {
23        let stream_id = uuid::Uuid::new_v4();
24        Self {
25            stream_id,
26            process_id,
27            tags: tags.to_vec(),
28            properties,
29        }
30    }
31}
32
33/// EventStream are a sequence of blocks sent (or dropped) as they become full
34#[derive(Debug)]
35pub struct EventStream<Block> {
36    stream_desc: Arc<StreamDesc>,
37    current_block: Arc<Block>,
38    full_threshold: AtomicUsize,
39}
40
41impl<Block> EventStream<Block>
42where
43    Block: TracingBlock,
44{
45    pub fn new(
46        buffer_size: usize,
47        process_id: uuid::Uuid,
48        tags: &[String],
49        properties: HashMap<String, String>,
50    ) -> Self {
51        let stream_desc = Arc::new(StreamDesc::new(process_id, tags, properties));
52        let block = Arc::new(Block::new(
53            buffer_size,
54            process_id,
55            stream_desc.stream_id,
56            0,
57        ));
58        let max_obj_size = block.hint_max_obj_size();
59        Self {
60            stream_desc,
61            current_block: block,
62            full_threshold: AtomicUsize::new(buffer_size - max_obj_size),
63        }
64    }
65
66    pub fn desc(&self) -> Arc<StreamDesc> {
67        self.stream_desc.clone()
68    }
69
70    pub fn stream_id(&self) -> uuid::Uuid {
71        self.stream_desc.stream_id
72    }
73
74    pub fn set_full(&mut self) {
75        self.full_threshold.store(0, Ordering::Relaxed);
76    }
77
78    pub fn replace_block(&mut self, new_block: Arc<Block>) -> Arc<Block> {
79        let old_block = self.current_block.clone();
80        let max_obj_size = new_block.hint_max_obj_size();
81        self.full_threshold
82            .store(new_block.capacity_bytes() - max_obj_size, Ordering::Relaxed);
83        self.current_block = new_block;
84        old_block
85    }
86
87    pub fn is_full(&self) -> bool {
88        let full_size = self.full_threshold.load(Ordering::Relaxed);
89        self.current_block.len_bytes() > full_size
90    }
91
92    pub fn is_empty(&self) -> bool {
93        self.current_block.len_bytes() == 0
94    }
95
96    pub fn get_block_ref(&self) -> &Block {
97        &self.current_block
98    }
99
100    pub fn get_events_mut(&mut self) -> &mut Block::Queue {
101        //get_mut_unchecked should be faster
102        Arc::get_mut(&mut self.current_block).unwrap().events_mut()
103    }
104
105    pub fn process_id(&self) -> uuid::Uuid {
106        self.stream_desc.process_id
107    }
108
109    pub fn tags(&self) -> &[String] {
110        &self.stream_desc.tags
111    }
112
113    pub fn properties(&self) -> &HashMap<String, String> {
114        &self.stream_desc.properties
115    }
116}