micromegas_tracing/event/
stream.rs1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6use crate::event::TracingBlock;
7
8#[derive(Debug, Serialize, Deserialize)]
10pub struct StreamDesc {
11 pub stream_id: uuid::Uuid,
12 pub process_id: uuid::Uuid,
13 pub tags: Vec<String>,
14 pub properties: HashMap<String, String>,
15}
16
17impl StreamDesc {
18 pub fn new(
19 process_id: uuid::Uuid,
20 tags: &[String],
21 properties: HashMap<String, String>,
22 ) -> Self {
23 let stream_id = uuid::Uuid::new_v4();
24 Self {
25 stream_id,
26 process_id,
27 tags: tags.to_vec(),
28 properties,
29 }
30 }
31}
32
33#[derive(Debug)]
35pub struct EventStream<Block> {
36 stream_desc: Arc<StreamDesc>,
37 current_block: Arc<Block>,
38 full_threshold: AtomicUsize,
39}
40
41impl<Block> EventStream<Block>
42where
43 Block: TracingBlock,
44{
45 pub fn new(
46 buffer_size: usize,
47 process_id: uuid::Uuid,
48 tags: &[String],
49 properties: HashMap<String, String>,
50 ) -> Self {
51 let stream_desc = Arc::new(StreamDesc::new(process_id, tags, properties));
52 let block = Arc::new(Block::new(
53 buffer_size,
54 process_id,
55 stream_desc.stream_id,
56 0,
57 ));
58 let max_obj_size = block.hint_max_obj_size();
59 Self {
60 stream_desc,
61 current_block: block,
62 full_threshold: AtomicUsize::new(buffer_size - max_obj_size),
63 }
64 }
65
66 pub fn desc(&self) -> Arc<StreamDesc> {
67 self.stream_desc.clone()
68 }
69
70 pub fn stream_id(&self) -> uuid::Uuid {
71 self.stream_desc.stream_id
72 }
73
74 pub fn set_full(&mut self) {
75 self.full_threshold.store(0, Ordering::Relaxed);
76 }
77
78 pub fn replace_block(&mut self, new_block: Arc<Block>) -> Arc<Block> {
79 let old_block = self.current_block.clone();
80 let max_obj_size = new_block.hint_max_obj_size();
81 self.full_threshold
82 .store(new_block.capacity_bytes() - max_obj_size, Ordering::Relaxed);
83 self.current_block = new_block;
84 old_block
85 }
86
87 pub fn is_full(&self) -> bool {
88 let full_size = self.full_threshold.load(Ordering::Relaxed);
89 self.current_block.len_bytes() > full_size
90 }
91
92 pub fn is_empty(&self) -> bool {
93 self.current_block.len_bytes() == 0
94 }
95
96 pub fn get_block_ref(&self) -> &Block {
97 &self.current_block
98 }
99
100 pub fn get_events_mut(&mut self) -> &mut Block::Queue {
101 Arc::get_mut(&mut self.current_block).unwrap().events_mut()
103 }
104
105 pub fn process_id(&self) -> uuid::Uuid {
106 self.stream_desc.process_id
107 }
108
109 pub fn tags(&self) -> &[String] {
110 &self.stream_desc.tags
111 }
112
113 pub fn properties(&self) -> &HashMap<String, String> {
114 &self.stream_desc.properties
115 }
116}