micromegas_ingestion/
sql_telemetry_db.rs1use anyhow::{Context, Result};
2use sqlx::Executor;
3
4async fn create_migration_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
5 sqlx::query("CREATE table migration(version integer);")
6 .execute(&mut **tr)
7 .await
8 .with_context(|| String::from("Creating table migration"))?;
9 sqlx::query("INSERT INTO migration VALUES(1);")
10 .execute(&mut **tr)
11 .await
12 .with_context(|| String::from("Recording the initial schema version"))?;
13 Ok(())
14}
15
16async fn create_property_type(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
17 let sql = "CREATE TYPE micromegas_property as (key TEXT, value TEXT);";
18 tr.execute(sql)
19 .await
20 .with_context(|| String::from("Creating property type"))?;
21 Ok(())
22}
23
24async fn create_processes_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
25 let sql = "
26 CREATE TABLE processes(
27 process_id UUID,
28 exe VARCHAR(255),
29 username VARCHAR(255),
30 realname VARCHAR(255),
31 computer VARCHAR(255),
32 distro VARCHAR(255),
33 cpu_brand VARCHAR(255),
34 tsc_frequency BIGINT,
35 start_time TIMESTAMPTZ,
36 start_ticks BIGINT,
37 insert_time TIMESTAMPTZ,
38 parent_process_id UUID,
39 properties micromegas_property[]
40 );
41 CREATE INDEX process_id on processes(process_id);
42 CREATE INDEX parent_process_id on processes(parent_process_id);
43 CREATE INDEX process_start_time on processes(start_time);";
44 tr.execute(sql)
45 .await
46 .with_context(|| String::from("Creating table processes and its indices"))?;
47 Ok(())
48}
49
50async fn create_streams_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
51 let sql = "
52 CREATE TABLE streams(
53 stream_id UUID,
54 process_id UUID,
55 dependencies_metadata BYTEA,
56 objects_metadata BYTEA,
57 tags TEXT[],
58 properties micromegas_property[],
59 insert_time TIMESTAMPTZ
60 );
61 CREATE INDEX stream_id on streams(stream_id);
62 CREATE INDEX stream_process_id on streams(process_id);
63 CREATE INDEX stream_insert_time on streams(insert_time);";
64 tr.execute(sql)
65 .await
66 .with_context(|| String::from("Creating table streams and its indices"))?;
67 Ok(())
68}
69
70async fn create_blocks_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
71 let sql = "
73 CREATE TABLE blocks(
74 block_id UUID,
75 stream_id UUID,
76 process_id UUID,
77 begin_time TIMESTAMPTZ,
78 begin_ticks BIGINT,
79 end_time TIMESTAMPTZ,
80 end_ticks BIGINT,
81 nb_objects INT,
82 object_offset BIGINT,
83 payload_size BIGINT
84 );
85 CREATE INDEX block_id on blocks(block_id);
86 CREATE INDEX block_stream_id on blocks(stream_id);";
87 tr.execute(sql)
88 .await
89 .with_context(|| String::from("Creating table blocks and its indices"))?;
90 Ok(())
91}
92
93pub async fn create_tables(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
95 create_property_type(tr).await?;
96 create_processes_table(tr).await?;
97 create_streams_table(tr).await?;
98 create_blocks_table(tr).await?;
99 create_migration_table(tr).await?;
100 Ok(())
101}