diff --git a/src/interpreter/clickhouse.rs b/src/interpreter/clickhouse.rs index 0b3565e..03906f0 100644 --- a/src/interpreter/clickhouse.rs +++ b/src/interpreter/clickhouse.rs @@ -151,7 +151,7 @@ pub struct ClickHouseServerSummary { } pub struct QueryMetricRow { - pub query_id: String, + pub host_name: String, pub timestamp_ns: u64, pub memory_usage: i64, pub peak_memory_usage: i64, @@ -1232,7 +1232,8 @@ impl ClickHouse { operation_name, start_time_us, finish_time_us, - attribute['clickhouse.query_id'] AS query_id + attribute['clickhouse.query_id'] AS query_id, + hostName() AS host_name FROM {dbtable} WHERE start_time_us BETWEEN {start_us} AND {end_us} {query_id_filter} @@ -1268,7 +1269,8 @@ impl ClickHouse { query_id, event, increment, - event_time_microseconds + event_time_microseconds, + hostName() AS host_name FROM {dbtable} WHERE trace_type = 'ProfileEvent' AND increment != 0 {query_id_filter} @@ -1309,6 +1311,7 @@ impl ClickHouse { event_time_microseconds, memory_usage, peak_memory_usage, + hostName() AS host_name, COLUMNS('ProfileEvent_') FROM {dbtable} WHERE 1 @@ -1355,7 +1358,7 @@ impl ClickHouse { } }; rows.push(QueryMetricRow { - query_id: block.get(i, "query_id").unwrap_or_default(), + host_name: block.get(i, "host_name").unwrap_or_default(), timestamp_ns: ts_ns, memory_usage: block.get(i, "memory_usage").unwrap_or(0), peak_memory_usage: block.get(i, "peak_memory_usage").unwrap_or(0), @@ -1392,7 +1395,8 @@ impl ClickHouse { part_name, query_id, rows, - size_in_bytes + size_in_bytes, + hostName() AS host_name FROM {dbtable} WHERE event_type NOT IN ('MergePartsStart', 'MutatePartStart') {query_id_filter} @@ -1444,7 +1448,8 @@ impl ClickHouse { trace_type::String AS trace_type, {symbol_expr} AS stack, size, - query_id + query_id, + hostName() AS host_name FROM {dbtable} WHERE trace_type IN ('CPU', 'Real', 'Memory') {query_id_filter} @@ -1487,7 +1492,8 @@ impl ClickHouse { level::String AS level, logger_name::String AS logger_name, message, - query_id + query_id, + hostName() AS host_name FROM {dbtable} WHERE 1 {query_id_filter} @@ -1530,7 +1536,8 @@ impl ClickHouse { query_duration_ms, ProfileEvents.Names, ProfileEvents.Values, - peak_memory_usage + peak_memory_usage, + hostName() AS host_name FROM {dbtable} WHERE 1 {query_id_filter} diff --git a/src/interpreter/perfetto.rs b/src/interpreter/perfetto.rs index 48e5b9e..1a03e4e 100644 --- a/src/interpreter/perfetto.rs +++ b/src/interpreter/perfetto.rs @@ -59,7 +59,6 @@ pub struct PerfettoTraceBuilder { next_intern_id: u64, host_uuids: HashMap, - query_id_to_host: HashMap, // (host_name, category) → category track uuid host_category_uuids: HashMap<(String, &'static str), u64>, per_server: bool, @@ -80,7 +79,6 @@ impl PerfettoTraceBuilder { next_intern_id: 1, host_uuids: HashMap::new(), - query_id_to_host: HashMap::new(), host_category_uuids: HashMap::new(), per_server, text_log_android, @@ -250,17 +248,7 @@ impl PerfettoTraceBuilder { let mut user_uuids: HashMap<(String, String), u64> = HashMap::new(); for q in queries { - let host_uuid = if let Some(&uuid) = self.host_uuids.get(&q.host_name) { - uuid - } else { - let uuid = self.alloc_uuid(); - self.add_process_track(uuid, &q.host_name); - self.host_uuids.insert(q.host_name.clone(), uuid); - uuid - }; - - self.query_id_to_host - .insert(q.query_id.clone(), q.host_name.clone()); + let host_uuid = self.get_or_create_host_uuid(&q.host_name); let user_key = (q.host_name.clone(), q.user.clone()); let user_uuid = *user_uuids.entry(user_key).or_insert_with(|| { @@ -308,13 +296,22 @@ impl PerfettoTraceBuilder { } } - fn get_host_category_track(&mut self, query_id: &str, category: &'static str) -> Option { - if !self.per_server { + fn get_or_create_host_uuid(&mut self, host_name: &str) -> u64 { + if let Some(&uuid) = self.host_uuids.get(host_name) { + return uuid; + } + let uuid = self.alloc_uuid(); + self.add_process_track(uuid, host_name); + self.host_uuids.insert(host_name.to_string(), uuid); + uuid + } + + fn get_host_category_track(&mut self, host_name: &str, category: &'static str) -> Option { + if !self.per_server || host_name.is_empty() { return None; } - let host = self.query_id_to_host.get(query_id)?.clone(); - let host_uuid = *self.host_uuids.get(&host)?; - let key = (host, category); + let host_uuid = self.get_or_create_host_uuid(host_name); + let key = (host_name.to_string(), category); if let Some(&uuid) = self.host_category_uuids.get(&key) { Some(uuid) } else { @@ -356,6 +353,7 @@ impl PerfettoTraceBuilder { } }; let query_id: String = columns.get(i, "query_id").unwrap_or_default(); + let host_name: String = columns.get(i, "host_name").unwrap_or_default(); let start_ns = start_us.saturating_mul(1000); let end_ns = finish_us.saturating_mul(1000); @@ -375,7 +373,8 @@ impl PerfettoTraceBuilder { self.add_slice_begin(track_uuid, &operation_name, start_ns, annotations.clone()); self.add_slice_end(track_uuid, end_ns); - if let Some(cat_uuid) = self.get_host_category_track(&query_id, "OpenTelemetry Spans") { + if let Some(cat_uuid) = self.get_host_category_track(&host_name, "OpenTelemetry Spans") + { let server_track = *server_op_uuids .entry((cat_uuid, operation_name.clone())) .or_insert_with(|| { @@ -405,7 +404,7 @@ impl PerfettoTraceBuilder { for i in 0..columns.row_count() { let event: String = columns.get(i, "event").unwrap_or_default(); let increment: i64 = columns.get(i, "increment").unwrap_or(0); - let query_id: String = columns.get(i, "query_id").unwrap_or_default(); + let host_name: String = columns.get(i, "host_name").unwrap_or_default(); let timestamp_ns: u64 = match columns.get::, _>(i, "event_time_microseconds") { Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, @@ -431,7 +430,8 @@ impl PerfettoTraceBuilder { *running_total += scaled_increment; self.add_counter_value(*track_uuid, timestamp_ns, *running_total); - if let Some(cat_uuid) = self.get_host_category_track(&query_id, "ProfileEvent Counters") + if let Some(cat_uuid) = + self.get_host_category_track(&host_name, "ProfileEvent Counters") { let (track_uuid, running_total) = server_tracks .entry((cat_uuid, event.clone())) @@ -476,7 +476,8 @@ impl PerfettoTraceBuilder { }); self.add_counter_value(track_uuid, row.timestamp_ns, value); - if let Some(cat_uuid) = self.get_host_category_track(&row.query_id, "Query Metrics") + if let Some(cat_uuid) = + self.get_host_category_track(&row.host_name, "Query Metrics") { let server_track = *server_tracks .entry((cat_uuid, name.to_string())) @@ -500,7 +501,8 @@ impl PerfettoTraceBuilder { }); self.add_counter_value(track_uuid, row.timestamp_ns, scaled_value); - if let Some(cat_uuid) = self.get_host_category_track(&row.query_id, "Query Metrics") + if let Some(cat_uuid) = + self.get_host_category_track(&row.host_name, "Query Metrics") { let server_track = *server_tracks .entry((cat_uuid, name.clone())) @@ -548,6 +550,7 @@ impl PerfettoTraceBuilder { let query_id: String = columns.get(i, "query_id").unwrap_or_default(); let rows: u64 = columns.get(i, "rows").unwrap_or(0); let size_in_bytes: u64 = columns.get(i, "size_in_bytes").unwrap_or(0); + let host_name: String = columns.get(i, "host_name").unwrap_or_default(); let table_key = format!("{}.{}", database, table); let track_uuid = *table_uuids.entry(table_key.clone()).or_insert_with(|| { @@ -576,7 +579,7 @@ impl PerfettoTraceBuilder { self.add_slice_begin(track_uuid, &label, start_ns, annotations.clone()); self.add_slice_end(track_uuid, end_ns); - if let Some(cat_uuid) = self.get_host_category_track(&query_id, "Part Log") { + if let Some(cat_uuid) = self.get_host_category_track(&host_name, "Part Log") { let server_track = *server_table_uuids .entry((cat_uuid, table_key.clone())) .or_insert_with(|| { @@ -606,6 +609,7 @@ impl PerfettoTraceBuilder { for i in 0..columns.row_count() { let query_id: String = columns.get(i, "query_id").unwrap_or_default(); let thread_name: String = columns.get(i, "thread_name").unwrap_or_default(); + let host_name: String = columns.get(i, "host_name").unwrap_or_default(); let timestamp_ns: u64 = match columns.get::, _>(i, "event_time_microseconds") { Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, @@ -651,7 +655,7 @@ impl PerfettoTraceBuilder { self.add_slice_begin(track_uuid, &query_id, start_ns, annotations.clone()); self.add_slice_end(track_uuid, end_ns); - if let Some(cat_uuid) = self.get_host_category_track(&query_id, "Query Threads") { + if let Some(cat_uuid) = self.get_host_category_track(&host_name, "Query Threads") { let server_track = *server_thread_uuids .entry((cat_uuid, thread_name.clone())) .or_insert_with(|| { @@ -689,6 +693,7 @@ impl PerfettoTraceBuilder { let logger_name: String = columns.get(i, "logger_name").unwrap_or_default(); let message: String = columns.get(i, "message").unwrap_or_default(); let query_id: String = columns.get(i, "query_id").unwrap_or_default(); + let host_name: String = columns.get(i, "host_name").unwrap_or_default(); let timestamp_ns: u64 = match columns.get::, _>(i, "event_time_microseconds") { Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, @@ -716,7 +721,7 @@ impl PerfettoTraceBuilder { self.add_instant(track_uuid, &message, timestamp_ns, annotations.clone()); - if let Some(cat_uuid) = self.get_host_category_track(&query_id, "Query Logs") { + if let Some(cat_uuid) = self.get_host_category_track(&host_name, "Query Logs") { let server_track = *server_level_uuids .entry((cat_uuid, level.clone())) .or_insert_with(|| { @@ -1358,10 +1363,10 @@ impl PerfettoTraceBuilder { }); if self.per_server { - let query_id: String = columns.get(i, "query_id").unwrap_or_default(); - if let Some(host) = self.query_id_to_host.get(&query_id) { + let host_name: String = columns.get(i, "host_name").unwrap_or_default(); + if !host_name.is_empty() { samples_by_host_type - .entry((host.clone(), trace_type)) + .entry((host_name, trace_type)) .or_default() .push(Sample { callstack_iid, diff --git a/src/interpreter/query.rs b/src/interpreter/query.rs index 08cd9cb..ce483b9 100644 --- a/src/interpreter/query.rs +++ b/src/interpreter/query.rs @@ -23,6 +23,7 @@ where pub struct Query { pub selection: bool, pub host_name: String, + pub display_host_name: Option, pub user: String, pub threads: usize, pub memory: i64, @@ -75,6 +76,7 @@ impl Query { Ok(Query { selection: false, host_name: columns.get::<_, _>(row_index, "host_name")?, + display_host_name: None, user: columns.get::<_, _>(row_index, "user")?, threads: columns.get::(row_index, "peak_threads_usage")? as usize, memory: columns.get::<_, _>(row_index, "peak_memory_usage")?, diff --git a/src/view/queries_view.rs b/src/view/queries_view.rs index 65339c8..f8fe13c 100644 --- a/src/view/queries_view.rs +++ b/src/view/queries_view.rs @@ -34,18 +34,22 @@ use crate::{ const QUERY_TIME_DRIFT_BUFFER_SECONDS: i64 = 1; // count() OVER (PARTITION BY initial_query_id) -fn queries_count_subqueries(queries: &mut HashMap) { - // - let mut subqueries = HashMap::::new(); - for v in queries.values_mut() { - if let Some(c) = subqueries.get_mut(v.initial_query_id.as_str()) { - *c += 1; - } else { - subqueries.insert(v.initial_query_id.clone(), 1); - } +type QueryKey = (String, String); // (query_id, host_name) + +fn query_key(q: &Query) -> QueryKey { + (q.query_id.clone(), q.host_name.clone()) +} + +fn queries_count_subqueries(queries: &mut HashMap) { + // <(initial_query_id, host_name), count()> + let mut subqueries = HashMap::<(String, String), u64>::new(); + for v in queries.values() { + *subqueries + .entry((v.initial_query_id.clone(), v.host_name.clone())) + .or_default() += 1; } for v in queries.values_mut() { - v.subqueries = subqueries[&v.initial_query_id]; + v.subqueries = subqueries[&(v.initial_query_id.clone(), v.host_name.clone())]; } } fn sum_map(m1: &HashMap, m2: &HashMap) -> HashMap @@ -63,20 +67,23 @@ where } return dst; } -// if(is_initial_query, (sumMap(ProfileEvents) OVER (PARTITION BY initial_query_id)), ProfileEvents) -fn queries_sum_profile_events(queries: &mut HashMap) { - // - let mut profile_events = HashMap::>::new(); - for v in queries.values_mut() { - if let Some(pe) = profile_events.get_mut(v.initial_query_id.as_str()) { +// if(is_initial_query, (sumMap(ProfileEvents) OVER (PARTITION BY initial_query_id, host_name)), ProfileEvents) +fn queries_sum_profile_events(queries: &mut HashMap) { + // <(initial_query_id, host_name), sumMap(ProfileEvents)> + let mut profile_events = HashMap::<(String, String), HashMap>::new(); + for v in queries.values() { + let key = (v.initial_query_id.clone(), v.host_name.clone()); + if let Some(pe) = profile_events.get_mut(&key) { *pe = sum_map(pe, &v.profile_events); } else { - profile_events.insert(v.initial_query_id.clone(), v.profile_events.clone()); + profile_events.insert(key, v.profile_events.clone()); } } for v in queries.values_mut() { - if v.is_initial_query { - v.profile_events = profile_events.remove(&v.initial_query_id).unwrap(); + if v.is_initial_query + && let Some(pe) = profile_events.get(&(v.initial_query_id.clone(), v.host_name.clone())) + { + v.profile_events = pe.clone(); } } } @@ -102,7 +109,7 @@ pub enum QueriesColumn { } impl PartialEq for Query { fn eq(&self, other: &Self) -> bool { - return self.query_id == other.query_id; + return self.query_id == other.query_id && self.host_name == other.host_name; } } @@ -120,7 +127,11 @@ impl TableViewItem for Query { " ".to_string() } } - QueriesColumn::HostName => self.host_name.to_string(), + QueriesColumn::HostName => self + .display_host_name + .as_deref() + .unwrap_or(&self.host_name) + .to_string(), QueriesColumn::SubQueries => { if self.is_initial_query { return self.subqueries.to_string(); @@ -180,11 +191,11 @@ impl TableViewItem for Query { pub struct QueriesView { context: ContextArc, table: TableView, - items: HashMap, + items: HashMap, // For show only specific query query_id: Option, // For multi selection - selected_query_ids: HashSet, + selected_query_ids: HashSet, has_selection_column: bool, options: ViewOptions, // Is this running processes, or queries from system.query_log? @@ -220,16 +231,17 @@ impl QueriesView { for i in 0..processes.row_count() { let mut query = Query::from_clickhouse_block(&processes, i, self.is_system_processes)?; - if self.selected_query_ids.contains(&query.query_id) { - new_selected_query_ids.insert(query.query_id.clone()); + let key = query_key(&query); + if self.selected_query_ids.contains(&key) { + new_selected_query_ids.insert(key.clone()); } - if let Some(prev_item) = prev_items.get(&query.query_id) { + if let Some(prev_item) = prev_items.get(&key) { query.prev_elapsed = Some(prev_item.elapsed); query.prev_profile_events = Some(prev_item.profile_events.clone()); } - self.items.insert(query.query_id.clone(), query); + self.items.insert(key, query); } queries_count_subqueries(&mut self.items); @@ -268,7 +280,7 @@ impl QueriesView { } } - // Strip common hostname prefix and suffix + // Compute stripped hostname for display (to_column uses display_host_name) if !self.options.no_strip_hostname_suffix && items.len() > 1 { let (common_prefix, common_suffix) = find_common_hostname_prefix_and_suffix(items.iter().map(|q| q.host_name.as_str())); @@ -289,7 +301,7 @@ impl QueriesView { hostname = stripped; } - item.host_name = hostname.to_string(); + item.display_host_name = Some(hostname.to_string()); } } } @@ -301,7 +313,7 @@ impl QueriesView { self.has_selection_column = true; } for item in &mut items { - item.selection = self.selected_query_ids.contains(&item.query_id); + item.selection = self.selected_query_ids.contains(&query_key(item)); } } else if self.has_selection_column { self.table.remove_column(0); @@ -361,9 +373,12 @@ impl QueriesView { if !self.selected_query_ids.is_empty() { for q in self.items.values() { // NOTE: we have to look at both here, since selected_query_ids contains - // query_id not initial_query_id, while we are curious about both - if self.selected_query_ids.contains(&q.initial_query_id) - || self.selected_query_ids.contains(&q.query_id) + // (query_id, host_name) not (initial_query_id, host_name), while we are + // curious about both + let key = query_key(q); + let initial_key = (q.initial_query_id.clone(), q.host_name.clone()); + if self.selected_query_ids.contains(&initial_key) + || self.selected_query_ids.contains(&key) { query_ids.push(q.query_id.clone()); } @@ -467,7 +482,7 @@ impl QueriesView { let queries: Vec = self .items .values() - .filter(|q| self.selected_query_ids.contains(&q.query_id)) + .filter(|q| self.selected_query_ids.contains(&query_key(q))) .cloned() .collect(); @@ -623,12 +638,12 @@ impl QueriesView { fn action_select(&mut self) -> Result> { let selected_query = self.get_selected_query()?; - let query_id = selected_query.query_id.clone(); + let key = query_key(&selected_query); - if self.selected_query_ids.contains(&query_id) { - self.selected_query_ids.remove(&query_id); + if self.selected_query_ids.contains(&key) { + self.selected_query_ids.remove(&key); } else { - self.selected_query_ids.insert(query_id); + self.selected_query_ids.insert(key); } self.update_view();