Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions src/interpreter/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub struct ClickHouseServerSummary {
}

pub struct QueryMetricRow {
pub query_id: String,
pub host_name: String,
pub timestamp_ns: u64,
pub memory_usage: i64,
pub peak_memory_usage: i64,
Expand Down Expand Up @@ -1232,7 +1232,8 @@ impl ClickHouse {
operation_name,
start_time_us,
finish_time_us,
attribute['clickhouse.query_id'] AS query_id
attribute['clickhouse.query_id'] AS query_id,
hostName() AS host_name
FROM {dbtable}
WHERE start_time_us BETWEEN {start_us} AND {end_us}
{query_id_filter}
Expand Down Expand Up @@ -1268,7 +1269,8 @@ impl ClickHouse {
query_id,
event,
increment,
event_time_microseconds
event_time_microseconds,
hostName() AS host_name
FROM {dbtable}
WHERE trace_type = 'ProfileEvent' AND increment != 0
{query_id_filter}
Expand Down Expand Up @@ -1309,6 +1311,7 @@ impl ClickHouse {
event_time_microseconds,
memory_usage,
peak_memory_usage,
hostName() AS host_name,
COLUMNS('ProfileEvent_')
FROM {dbtable}
WHERE 1
Expand Down Expand Up @@ -1355,7 +1358,7 @@ impl ClickHouse {
}
};
rows.push(QueryMetricRow {
query_id: block.get(i, "query_id").unwrap_or_default(),
host_name: block.get(i, "host_name").unwrap_or_default(),
timestamp_ns: ts_ns,
memory_usage: block.get(i, "memory_usage").unwrap_or(0),
peak_memory_usage: block.get(i, "peak_memory_usage").unwrap_or(0),
Expand Down Expand Up @@ -1392,7 +1395,8 @@ impl ClickHouse {
part_name,
query_id,
rows,
size_in_bytes
size_in_bytes,
hostName() AS host_name
FROM {dbtable}
WHERE event_type NOT IN ('MergePartsStart', 'MutatePartStart')
{query_id_filter}
Expand Down Expand Up @@ -1444,7 +1448,8 @@ impl ClickHouse {
trace_type::String AS trace_type,
{symbol_expr} AS stack,
size,
query_id
query_id,
hostName() AS host_name
FROM {dbtable}
WHERE trace_type IN ('CPU', 'Real', 'Memory')
{query_id_filter}
Expand Down Expand Up @@ -1487,7 +1492,8 @@ impl ClickHouse {
level::String AS level,
logger_name::String AS logger_name,
message,
query_id
query_id,
hostName() AS host_name
FROM {dbtable}
WHERE 1
{query_id_filter}
Expand Down Expand Up @@ -1530,7 +1536,8 @@ impl ClickHouse {
query_duration_ms,
ProfileEvents.Names,
ProfileEvents.Values,
peak_memory_usage
peak_memory_usage,
hostName() AS host_name
FROM {dbtable}
WHERE 1
{query_id_filter}
Expand Down
63 changes: 34 additions & 29 deletions src/interpreter/perfetto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ pub struct PerfettoTraceBuilder {
next_intern_id: u64,

host_uuids: HashMap<String, u64>,
query_id_to_host: HashMap<String, String>,
// (host_name, category) → category track uuid
host_category_uuids: HashMap<(String, &'static str), u64>,
per_server: bool,
Expand All @@ -80,7 +79,6 @@ impl PerfettoTraceBuilder {
next_intern_id: 1,

host_uuids: HashMap::new(),
query_id_to_host: HashMap::new(),
host_category_uuids: HashMap::new(),
per_server,
text_log_android,
Expand Down Expand Up @@ -250,17 +248,7 @@ impl PerfettoTraceBuilder {
let mut user_uuids: HashMap<(String, String), u64> = HashMap::new();

for q in queries {
let host_uuid = if let Some(&uuid) = self.host_uuids.get(&q.host_name) {
uuid
} else {
let uuid = self.alloc_uuid();
self.add_process_track(uuid, &q.host_name);
self.host_uuids.insert(q.host_name.clone(), uuid);
uuid
};

self.query_id_to_host
.insert(q.query_id.clone(), q.host_name.clone());
let host_uuid = self.get_or_create_host_uuid(&q.host_name);

let user_key = (q.host_name.clone(), q.user.clone());
let user_uuid = *user_uuids.entry(user_key).or_insert_with(|| {
Expand Down Expand Up @@ -308,13 +296,22 @@ impl PerfettoTraceBuilder {
}
}

fn get_host_category_track(&mut self, query_id: &str, category: &'static str) -> Option<u64> {
if !self.per_server {
fn get_or_create_host_uuid(&mut self, host_name: &str) -> u64 {
if let Some(&uuid) = self.host_uuids.get(host_name) {
return uuid;
}
let uuid = self.alloc_uuid();
self.add_process_track(uuid, host_name);
self.host_uuids.insert(host_name.to_string(), uuid);
uuid
}

fn get_host_category_track(&mut self, host_name: &str, category: &'static str) -> Option<u64> {
if !self.per_server || host_name.is_empty() {
return None;
}
let host = self.query_id_to_host.get(query_id)?.clone();
let host_uuid = *self.host_uuids.get(&host)?;
let key = (host, category);
let host_uuid = self.get_or_create_host_uuid(host_name);
let key = (host_name.to_string(), category);
if let Some(&uuid) = self.host_category_uuids.get(&key) {
Some(uuid)
} else {
Expand Down Expand Up @@ -356,6 +353,7 @@ impl PerfettoTraceBuilder {
}
};
let query_id: String = columns.get(i, "query_id").unwrap_or_default();
let host_name: String = columns.get(i, "host_name").unwrap_or_default();

let start_ns = start_us.saturating_mul(1000);
let end_ns = finish_us.saturating_mul(1000);
Expand All @@ -375,7 +373,8 @@ impl PerfettoTraceBuilder {
self.add_slice_begin(track_uuid, &operation_name, start_ns, annotations.clone());
self.add_slice_end(track_uuid, end_ns);

if let Some(cat_uuid) = self.get_host_category_track(&query_id, "OpenTelemetry Spans") {
if let Some(cat_uuid) = self.get_host_category_track(&host_name, "OpenTelemetry Spans")
{
let server_track = *server_op_uuids
.entry((cat_uuid, operation_name.clone()))
.or_insert_with(|| {
Expand Down Expand Up @@ -405,7 +404,7 @@ impl PerfettoTraceBuilder {
for i in 0..columns.row_count() {
let event: String = columns.get(i, "event").unwrap_or_default();
let increment: i64 = columns.get(i, "increment").unwrap_or(0);
let query_id: String = columns.get(i, "query_id").unwrap_or_default();
let host_name: String = columns.get(i, "host_name").unwrap_or_default();
let timestamp_ns: u64 =
match columns.get::<DateTime<Tz>, _>(i, "event_time_microseconds") {
Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64,
Expand All @@ -431,7 +430,8 @@ impl PerfettoTraceBuilder {
*running_total += scaled_increment;
self.add_counter_value(*track_uuid, timestamp_ns, *running_total);

if let Some(cat_uuid) = self.get_host_category_track(&query_id, "ProfileEvent Counters")
if let Some(cat_uuid) =
self.get_host_category_track(&host_name, "ProfileEvent Counters")
{
let (track_uuid, running_total) = server_tracks
.entry((cat_uuid, event.clone()))
Expand Down Expand Up @@ -476,7 +476,8 @@ impl PerfettoTraceBuilder {
});
self.add_counter_value(track_uuid, row.timestamp_ns, value);

if let Some(cat_uuid) = self.get_host_category_track(&row.query_id, "Query Metrics")
if let Some(cat_uuid) =
self.get_host_category_track(&row.host_name, "Query Metrics")
{
let server_track = *server_tracks
.entry((cat_uuid, name.to_string()))
Expand All @@ -500,7 +501,8 @@ impl PerfettoTraceBuilder {
});
self.add_counter_value(track_uuid, row.timestamp_ns, scaled_value);

if let Some(cat_uuid) = self.get_host_category_track(&row.query_id, "Query Metrics")
if let Some(cat_uuid) =
self.get_host_category_track(&row.host_name, "Query Metrics")
{
let server_track = *server_tracks
.entry((cat_uuid, name.clone()))
Expand Down Expand Up @@ -548,6 +550,7 @@ impl PerfettoTraceBuilder {
let query_id: String = columns.get(i, "query_id").unwrap_or_default();
let rows: u64 = columns.get(i, "rows").unwrap_or(0);
let size_in_bytes: u64 = columns.get(i, "size_in_bytes").unwrap_or(0);
let host_name: String = columns.get(i, "host_name").unwrap_or_default();

let table_key = format!("{}.{}", database, table);
let track_uuid = *table_uuids.entry(table_key.clone()).or_insert_with(|| {
Expand Down Expand Up @@ -576,7 +579,7 @@ impl PerfettoTraceBuilder {
self.add_slice_begin(track_uuid, &label, start_ns, annotations.clone());
self.add_slice_end(track_uuid, end_ns);

if let Some(cat_uuid) = self.get_host_category_track(&query_id, "Part Log") {
if let Some(cat_uuid) = self.get_host_category_track(&host_name, "Part Log") {
let server_track = *server_table_uuids
.entry((cat_uuid, table_key.clone()))
.or_insert_with(|| {
Expand Down Expand Up @@ -606,6 +609,7 @@ impl PerfettoTraceBuilder {
for i in 0..columns.row_count() {
let query_id: String = columns.get(i, "query_id").unwrap_or_default();
let thread_name: String = columns.get(i, "thread_name").unwrap_or_default();
let host_name: String = columns.get(i, "host_name").unwrap_or_default();
let timestamp_ns: u64 =
match columns.get::<DateTime<Tz>, _>(i, "event_time_microseconds") {
Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64,
Expand Down Expand Up @@ -651,7 +655,7 @@ impl PerfettoTraceBuilder {
self.add_slice_begin(track_uuid, &query_id, start_ns, annotations.clone());
self.add_slice_end(track_uuid, end_ns);

if let Some(cat_uuid) = self.get_host_category_track(&query_id, "Query Threads") {
if let Some(cat_uuid) = self.get_host_category_track(&host_name, "Query Threads") {
let server_track = *server_thread_uuids
.entry((cat_uuid, thread_name.clone()))
.or_insert_with(|| {
Expand Down Expand Up @@ -689,6 +693,7 @@ impl PerfettoTraceBuilder {
let logger_name: String = columns.get(i, "logger_name").unwrap_or_default();
let message: String = columns.get(i, "message").unwrap_or_default();
let query_id: String = columns.get(i, "query_id").unwrap_or_default();
let host_name: String = columns.get(i, "host_name").unwrap_or_default();
let timestamp_ns: u64 =
match columns.get::<DateTime<Tz>, _>(i, "event_time_microseconds") {
Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64,
Expand Down Expand Up @@ -716,7 +721,7 @@ impl PerfettoTraceBuilder {

self.add_instant(track_uuid, &message, timestamp_ns, annotations.clone());

if let Some(cat_uuid) = self.get_host_category_track(&query_id, "Query Logs") {
if let Some(cat_uuid) = self.get_host_category_track(&host_name, "Query Logs") {
let server_track = *server_level_uuids
.entry((cat_uuid, level.clone()))
.or_insert_with(|| {
Expand Down Expand Up @@ -1358,10 +1363,10 @@ impl PerfettoTraceBuilder {
});

if self.per_server {
let query_id: String = columns.get(i, "query_id").unwrap_or_default();
if let Some(host) = self.query_id_to_host.get(&query_id) {
let host_name: String = columns.get(i, "host_name").unwrap_or_default();
if !host_name.is_empty() {
samples_by_host_type
.entry((host.clone(), trace_type))
.entry((host_name, trace_type))
.or_default()
.push(Sample {
callstack_iid,
Expand Down
2 changes: 2 additions & 0 deletions src/interpreter/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ where
pub struct Query {
pub selection: bool,
pub host_name: String,
pub display_host_name: Option<String>,
pub user: String,
pub threads: usize,
pub memory: i64,
Expand Down Expand Up @@ -75,6 +76,7 @@ impl Query {
Ok(Query {
selection: false,
host_name: columns.get::<_, _>(row_index, "host_name")?,
display_host_name: None,
user: columns.get::<_, _>(row_index, "user")?,
threads: columns.get::<u64, _>(row_index, "peak_threads_usage")? as usize,
memory: columns.get::<_, _>(row_index, "peak_memory_usage")?,
Expand Down
Loading
Loading