micromegas_ingestion/
sql_telemetry_db.rs

1use anyhow::{Context, Result};
2use sqlx::Executor;
3
4async fn create_migration_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
5    sqlx::query("CREATE table migration(version integer);")
6        .execute(&mut **tr)
7        .await
8        .with_context(|| String::from("Creating table migration"))?;
9    sqlx::query("INSERT INTO migration VALUES(1);")
10        .execute(&mut **tr)
11        .await
12        .with_context(|| String::from("Recording the initial schema version"))?;
13    Ok(())
14}
15
16async fn create_property_type(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
17    let sql = "CREATE TYPE micromegas_property as (key TEXT, value TEXT);";
18    tr.execute(sql)
19        .await
20        .with_context(|| String::from("Creating property type"))?;
21    Ok(())
22}
23
24async fn create_processes_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
25    let sql = "
26         CREATE TABLE processes(
27                  process_id UUID, 
28                  exe VARCHAR(255), 
29                  username VARCHAR(255), 
30                  realname VARCHAR(255), 
31                  computer VARCHAR(255), 
32                  distro VARCHAR(255), 
33                  cpu_brand VARCHAR(255), 
34                  tsc_frequency BIGINT,
35                  start_time TIMESTAMPTZ,
36                  start_ticks BIGINT,
37                  insert_time TIMESTAMPTZ,
38                  parent_process_id UUID,
39                  properties micromegas_property[]
40                  );
41         CREATE INDEX process_id on processes(process_id);
42         CREATE INDEX parent_process_id on processes(parent_process_id);
43         CREATE INDEX process_start_time on processes(start_time);";
44    tr.execute(sql)
45        .await
46        .with_context(|| String::from("Creating table processes and its indices"))?;
47    Ok(())
48}
49
50async fn create_streams_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
51    let sql = "
52         CREATE TABLE streams(
53                  stream_id UUID, 
54                  process_id UUID, 
55                  dependencies_metadata BYTEA,
56                  objects_metadata BYTEA,
57                  tags TEXT[],
58                  properties micromegas_property[],
59                  insert_time TIMESTAMPTZ
60                  );
61         CREATE INDEX stream_id on streams(stream_id);
62         CREATE INDEX stream_process_id on streams(process_id);
63         CREATE INDEX stream_insert_time on streams(insert_time);";
64    tr.execute(sql)
65        .await
66        .with_context(|| String::from("Creating table streams and its indices"))?;
67    Ok(())
68}
69
70async fn create_blocks_table(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
71    // begin_ticks and end_ticks are relative to the start of the process
72    let sql = "
73         CREATE TABLE blocks(
74                  block_id UUID, 
75                  stream_id UUID, 
76                  process_id UUID, 
77                  begin_time TIMESTAMPTZ,
78                  begin_ticks BIGINT,
79                  end_time TIMESTAMPTZ,
80                  end_ticks BIGINT,
81                  nb_objects INT,
82                  object_offset BIGINT,
83                  payload_size BIGINT
84                  );
85         CREATE INDEX block_id on blocks(block_id);
86         CREATE INDEX block_stream_id on blocks(stream_id);";
87    tr.execute(sql)
88        .await
89        .with_context(|| String::from("Creating table blocks and its indices"))?;
90    Ok(())
91}
92
93/// Creates the tables for the telemetry database.
94pub async fn create_tables(tr: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<()> {
95    create_property_type(tr).await?;
96    create_processes_table(tr).await?;
97    create_streams_table(tr).await?;
98    create_blocks_table(tr).await?;
99    create_migration_table(tr).await?;
100    Ok(())
101}