From 3f1ee979730ca899e1005cd4698cf8319e696e24 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 16 Apr 2026 10:27:26 +0200 Subject: [PATCH 1/4] Fix queries with same query_id on different hosts being deduplicated HashMap was keyed by query_id alone, so cluster queries with the same query_id on multiple hosts (e.g. ON CLUSTER DDL) would overwrite each other. Use (query_id, host_name) composite key instead. Though note that i.e. logs/traces/perfetto will show data for all queries with the same query_id. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/view/queries_view.rs | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/view/queries_view.rs b/src/view/queries_view.rs index 65339c8..ba5e849 100644 --- a/src/view/queries_view.rs +++ b/src/view/queries_view.rs @@ -34,7 +34,13 @@ use crate::{ const QUERY_TIME_DRIFT_BUFFER_SECONDS: i64 = 1; // count() OVER (PARTITION BY initial_query_id) -fn queries_count_subqueries(queries: &mut HashMap) { +type QueryKey = (String, String); // (query_id, host_name) + +fn query_key(q: &Query) -> QueryKey { + (q.query_id.clone(), q.host_name.clone()) +} + +fn queries_count_subqueries(queries: &mut HashMap) { // let mut subqueries = HashMap::::new(); for v in queries.values_mut() { @@ -64,7 +70,7 @@ where return dst; } // if(is_initial_query, (sumMap(ProfileEvents) OVER (PARTITION BY initial_query_id)), ProfileEvents) -fn queries_sum_profile_events(queries: &mut HashMap) { +fn queries_sum_profile_events(queries: &mut HashMap) { // let mut profile_events = HashMap::>::new(); for v in queries.values_mut() { @@ -75,8 +81,10 @@ fn queries_sum_profile_events(queries: &mut HashMap) { } } for v in queries.values_mut() { - if v.is_initial_query { - v.profile_events = profile_events.remove(&v.initial_query_id).unwrap(); + if v.is_initial_query + && let Some(pe) = profile_events.get(&v.initial_query_id) + { + v.profile_events = pe.clone(); } } } @@ -102,7 +110,7 @@ pub enum QueriesColumn { } impl PartialEq for Query { fn eq(&self, other: &Self) -> bool { - return self.query_id == other.query_id; + return self.query_id == other.query_id && self.host_name == other.host_name; } } @@ -180,7 +188,7 @@ impl TableViewItem for Query { pub struct QueriesView { context: ContextArc, table: TableView, - items: HashMap, + items: HashMap, // For show only specific query query_id: Option, // For multi selection @@ -224,12 +232,13 @@ impl QueriesView { new_selected_query_ids.insert(query.query_id.clone()); } - if let Some(prev_item) = prev_items.get(&query.query_id) { + let key = query_key(&query); + if let Some(prev_item) = prev_items.get(&key) { query.prev_elapsed = Some(prev_item.elapsed); query.prev_profile_events = Some(prev_item.profile_events.clone()); } - self.items.insert(query.query_id.clone(), query); + self.items.insert(key, query); } queries_count_subqueries(&mut self.items); From 2abf6f263cdb892164d8361108033b25dc1324e5 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 16 Apr 2026 11:00:01 +0200 Subject: [PATCH 2/4] Fix subquery count and profile event aggregation for duplicate query_ids queries_count_subqueries and queries_sum_profile_events were keyed by initial_query_id alone. When the same initial_query_id existed on multiple cluster hosts, subquery counts were summed across hosts and profile events were merged, making cpu/io_wait/cpu_wait identical for all hosts. Key by (initial_query_id, host_name) instead. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/view/queries_view.rs | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/src/view/queries_view.rs b/src/view/queries_view.rs index ba5e849..e4fbfbf 100644 --- a/src/view/queries_view.rs +++ b/src/view/queries_view.rs @@ -41,17 +41,15 @@ fn query_key(q: &Query) -> QueryKey { } fn queries_count_subqueries(queries: &mut HashMap) { - // - let mut subqueries = HashMap::::new(); - for v in queries.values_mut() { - if let Some(c) = subqueries.get_mut(v.initial_query_id.as_str()) { - *c += 1; - } else { - subqueries.insert(v.initial_query_id.clone(), 1); - } + // <(initial_query_id, host_name), count()> + let mut subqueries = HashMap::<(String, String), u64>::new(); + for v in queries.values() { + *subqueries + .entry((v.initial_query_id.clone(), v.host_name.clone())) + .or_default() += 1; } for v in queries.values_mut() { - v.subqueries = subqueries[&v.initial_query_id]; + v.subqueries = subqueries[&(v.initial_query_id.clone(), v.host_name.clone())]; } } fn sum_map(m1: &HashMap, m2: &HashMap) -> HashMap @@ -69,20 +67,21 @@ where } return dst; } -// if(is_initial_query, (sumMap(ProfileEvents) OVER (PARTITION BY initial_query_id)), ProfileEvents) +// if(is_initial_query, (sumMap(ProfileEvents) OVER (PARTITION BY initial_query_id, host_name)), ProfileEvents) fn queries_sum_profile_events(queries: &mut HashMap) { - // - let mut profile_events = HashMap::>::new(); - for v in queries.values_mut() { - if let Some(pe) = profile_events.get_mut(v.initial_query_id.as_str()) { + // <(initial_query_id, host_name), sumMap(ProfileEvents)> + let mut profile_events = HashMap::<(String, String), HashMap>::new(); + for v in queries.values() { + let key = (v.initial_query_id.clone(), v.host_name.clone()); + if let Some(pe) = profile_events.get_mut(&key) { *pe = sum_map(pe, &v.profile_events); } else { - profile_events.insert(v.initial_query_id.clone(), v.profile_events.clone()); + profile_events.insert(key, v.profile_events.clone()); } } for v in queries.values_mut() { if v.is_initial_query - && let Some(pe) = profile_events.get(&v.initial_query_id) + && let Some(pe) = profile_events.get(&(v.initial_query_id.clone(), v.host_name.clone())) { v.profile_events = pe.clone(); } From bc962b62a0e191e45a29b627a3732b3d77625b8d Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 16 Apr 2026 10:51:13 +0200 Subject: [PATCH 3/4] Fix per-server Perfetto trace aggregation for duplicate query_ids The query_id_to_host map was 1:1, so when the same query_id appeared on multiple cluster hosts only the last-seen host survived. All per-server Perfetto tracks (counters, spans, logs, stack traces, etc.) were then attributed to the wrong host. Add hostName() to all Perfetto data source SQL queries so each row carries its actual origin host, and pass it directly to get_host_category_track instead of the lossy query_id lookup. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/interpreter/clickhouse.rs | 23 ++++++++----- src/interpreter/perfetto.rs | 63 +++++++++++++++++++---------------- 2 files changed, 49 insertions(+), 37 deletions(-) diff --git a/src/interpreter/clickhouse.rs b/src/interpreter/clickhouse.rs index 0b3565e..03906f0 100644 --- a/src/interpreter/clickhouse.rs +++ b/src/interpreter/clickhouse.rs @@ -151,7 +151,7 @@ pub struct ClickHouseServerSummary { } pub struct QueryMetricRow { - pub query_id: String, + pub host_name: String, pub timestamp_ns: u64, pub memory_usage: i64, pub peak_memory_usage: i64, @@ -1232,7 +1232,8 @@ impl ClickHouse { operation_name, start_time_us, finish_time_us, - attribute['clickhouse.query_id'] AS query_id + attribute['clickhouse.query_id'] AS query_id, + hostName() AS host_name FROM {dbtable} WHERE start_time_us BETWEEN {start_us} AND {end_us} {query_id_filter} @@ -1268,7 +1269,8 @@ impl ClickHouse { query_id, event, increment, - event_time_microseconds + event_time_microseconds, + hostName() AS host_name FROM {dbtable} WHERE trace_type = 'ProfileEvent' AND increment != 0 {query_id_filter} @@ -1309,6 +1311,7 @@ impl ClickHouse { event_time_microseconds, memory_usage, peak_memory_usage, + hostName() AS host_name, COLUMNS('ProfileEvent_') FROM {dbtable} WHERE 1 @@ -1355,7 +1358,7 @@ impl ClickHouse { } }; rows.push(QueryMetricRow { - query_id: block.get(i, "query_id").unwrap_or_default(), + host_name: block.get(i, "host_name").unwrap_or_default(), timestamp_ns: ts_ns, memory_usage: block.get(i, "memory_usage").unwrap_or(0), peak_memory_usage: block.get(i, "peak_memory_usage").unwrap_or(0), @@ -1392,7 +1395,8 @@ impl ClickHouse { part_name, query_id, rows, - size_in_bytes + size_in_bytes, + hostName() AS host_name FROM {dbtable} WHERE event_type NOT IN ('MergePartsStart', 'MutatePartStart') {query_id_filter} @@ -1444,7 +1448,8 @@ impl ClickHouse { trace_type::String AS trace_type, {symbol_expr} AS stack, size, - query_id + query_id, + hostName() AS host_name FROM {dbtable} WHERE trace_type IN ('CPU', 'Real', 'Memory') {query_id_filter} @@ -1487,7 +1492,8 @@ impl ClickHouse { level::String AS level, logger_name::String AS logger_name, message, - query_id + query_id, + hostName() AS host_name FROM {dbtable} WHERE 1 {query_id_filter} @@ -1530,7 +1536,8 @@ impl ClickHouse { query_duration_ms, ProfileEvents.Names, ProfileEvents.Values, - peak_memory_usage + peak_memory_usage, + hostName() AS host_name FROM {dbtable} WHERE 1 {query_id_filter} diff --git a/src/interpreter/perfetto.rs b/src/interpreter/perfetto.rs index 48e5b9e..1a03e4e 100644 --- a/src/interpreter/perfetto.rs +++ b/src/interpreter/perfetto.rs @@ -59,7 +59,6 @@ pub struct PerfettoTraceBuilder { next_intern_id: u64, host_uuids: HashMap, - query_id_to_host: HashMap, // (host_name, category) → category track uuid host_category_uuids: HashMap<(String, &'static str), u64>, per_server: bool, @@ -80,7 +79,6 @@ impl PerfettoTraceBuilder { next_intern_id: 1, host_uuids: HashMap::new(), - query_id_to_host: HashMap::new(), host_category_uuids: HashMap::new(), per_server, text_log_android, @@ -250,17 +248,7 @@ impl PerfettoTraceBuilder { let mut user_uuids: HashMap<(String, String), u64> = HashMap::new(); for q in queries { - let host_uuid = if let Some(&uuid) = self.host_uuids.get(&q.host_name) { - uuid - } else { - let uuid = self.alloc_uuid(); - self.add_process_track(uuid, &q.host_name); - self.host_uuids.insert(q.host_name.clone(), uuid); - uuid - }; - - self.query_id_to_host - .insert(q.query_id.clone(), q.host_name.clone()); + let host_uuid = self.get_or_create_host_uuid(&q.host_name); let user_key = (q.host_name.clone(), q.user.clone()); let user_uuid = *user_uuids.entry(user_key).or_insert_with(|| { @@ -308,13 +296,22 @@ impl PerfettoTraceBuilder { } } - fn get_host_category_track(&mut self, query_id: &str, category: &'static str) -> Option { - if !self.per_server { + fn get_or_create_host_uuid(&mut self, host_name: &str) -> u64 { + if let Some(&uuid) = self.host_uuids.get(host_name) { + return uuid; + } + let uuid = self.alloc_uuid(); + self.add_process_track(uuid, host_name); + self.host_uuids.insert(host_name.to_string(), uuid); + uuid + } + + fn get_host_category_track(&mut self, host_name: &str, category: &'static str) -> Option { + if !self.per_server || host_name.is_empty() { return None; } - let host = self.query_id_to_host.get(query_id)?.clone(); - let host_uuid = *self.host_uuids.get(&host)?; - let key = (host, category); + let host_uuid = self.get_or_create_host_uuid(host_name); + let key = (host_name.to_string(), category); if let Some(&uuid) = self.host_category_uuids.get(&key) { Some(uuid) } else { @@ -356,6 +353,7 @@ impl PerfettoTraceBuilder { } }; let query_id: String = columns.get(i, "query_id").unwrap_or_default(); + let host_name: String = columns.get(i, "host_name").unwrap_or_default(); let start_ns = start_us.saturating_mul(1000); let end_ns = finish_us.saturating_mul(1000); @@ -375,7 +373,8 @@ impl PerfettoTraceBuilder { self.add_slice_begin(track_uuid, &operation_name, start_ns, annotations.clone()); self.add_slice_end(track_uuid, end_ns); - if let Some(cat_uuid) = self.get_host_category_track(&query_id, "OpenTelemetry Spans") { + if let Some(cat_uuid) = self.get_host_category_track(&host_name, "OpenTelemetry Spans") + { let server_track = *server_op_uuids .entry((cat_uuid, operation_name.clone())) .or_insert_with(|| { @@ -405,7 +404,7 @@ impl PerfettoTraceBuilder { for i in 0..columns.row_count() { let event: String = columns.get(i, "event").unwrap_or_default(); let increment: i64 = columns.get(i, "increment").unwrap_or(0); - let query_id: String = columns.get(i, "query_id").unwrap_or_default(); + let host_name: String = columns.get(i, "host_name").unwrap_or_default(); let timestamp_ns: u64 = match columns.get::, _>(i, "event_time_microseconds") { Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, @@ -431,7 +430,8 @@ impl PerfettoTraceBuilder { *running_total += scaled_increment; self.add_counter_value(*track_uuid, timestamp_ns, *running_total); - if let Some(cat_uuid) = self.get_host_category_track(&query_id, "ProfileEvent Counters") + if let Some(cat_uuid) = + self.get_host_category_track(&host_name, "ProfileEvent Counters") { let (track_uuid, running_total) = server_tracks .entry((cat_uuid, event.clone())) @@ -476,7 +476,8 @@ impl PerfettoTraceBuilder { }); self.add_counter_value(track_uuid, row.timestamp_ns, value); - if let Some(cat_uuid) = self.get_host_category_track(&row.query_id, "Query Metrics") + if let Some(cat_uuid) = + self.get_host_category_track(&row.host_name, "Query Metrics") { let server_track = *server_tracks .entry((cat_uuid, name.to_string())) @@ -500,7 +501,8 @@ impl PerfettoTraceBuilder { }); self.add_counter_value(track_uuid, row.timestamp_ns, scaled_value); - if let Some(cat_uuid) = self.get_host_category_track(&row.query_id, "Query Metrics") + if let Some(cat_uuid) = + self.get_host_category_track(&row.host_name, "Query Metrics") { let server_track = *server_tracks .entry((cat_uuid, name.clone())) @@ -548,6 +550,7 @@ impl PerfettoTraceBuilder { let query_id: String = columns.get(i, "query_id").unwrap_or_default(); let rows: u64 = columns.get(i, "rows").unwrap_or(0); let size_in_bytes: u64 = columns.get(i, "size_in_bytes").unwrap_or(0); + let host_name: String = columns.get(i, "host_name").unwrap_or_default(); let table_key = format!("{}.{}", database, table); let track_uuid = *table_uuids.entry(table_key.clone()).or_insert_with(|| { @@ -576,7 +579,7 @@ impl PerfettoTraceBuilder { self.add_slice_begin(track_uuid, &label, start_ns, annotations.clone()); self.add_slice_end(track_uuid, end_ns); - if let Some(cat_uuid) = self.get_host_category_track(&query_id, "Part Log") { + if let Some(cat_uuid) = self.get_host_category_track(&host_name, "Part Log") { let server_track = *server_table_uuids .entry((cat_uuid, table_key.clone())) .or_insert_with(|| { @@ -606,6 +609,7 @@ impl PerfettoTraceBuilder { for i in 0..columns.row_count() { let query_id: String = columns.get(i, "query_id").unwrap_or_default(); let thread_name: String = columns.get(i, "thread_name").unwrap_or_default(); + let host_name: String = columns.get(i, "host_name").unwrap_or_default(); let timestamp_ns: u64 = match columns.get::, _>(i, "event_time_microseconds") { Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, @@ -651,7 +655,7 @@ impl PerfettoTraceBuilder { self.add_slice_begin(track_uuid, &query_id, start_ns, annotations.clone()); self.add_slice_end(track_uuid, end_ns); - if let Some(cat_uuid) = self.get_host_category_track(&query_id, "Query Threads") { + if let Some(cat_uuid) = self.get_host_category_track(&host_name, "Query Threads") { let server_track = *server_thread_uuids .entry((cat_uuid, thread_name.clone())) .or_insert_with(|| { @@ -689,6 +693,7 @@ impl PerfettoTraceBuilder { let logger_name: String = columns.get(i, "logger_name").unwrap_or_default(); let message: String = columns.get(i, "message").unwrap_or_default(); let query_id: String = columns.get(i, "query_id").unwrap_or_default(); + let host_name: String = columns.get(i, "host_name").unwrap_or_default(); let timestamp_ns: u64 = match columns.get::, _>(i, "event_time_microseconds") { Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, @@ -716,7 +721,7 @@ impl PerfettoTraceBuilder { self.add_instant(track_uuid, &message, timestamp_ns, annotations.clone()); - if let Some(cat_uuid) = self.get_host_category_track(&query_id, "Query Logs") { + if let Some(cat_uuid) = self.get_host_category_track(&host_name, "Query Logs") { let server_track = *server_level_uuids .entry((cat_uuid, level.clone())) .or_insert_with(|| { @@ -1358,10 +1363,10 @@ impl PerfettoTraceBuilder { }); if self.per_server { - let query_id: String = columns.get(i, "query_id").unwrap_or_default(); - if let Some(host) = self.query_id_to_host.get(&query_id) { + let host_name: String = columns.get(i, "host_name").unwrap_or_default(); + if !host_name.is_empty() { samples_by_host_type - .entry((host.clone(), trace_type)) + .entry((host_name, trace_type)) .or_default() .push(Sample { callstack_iid, From 1ed763d430bbd33756789eb3439a8a5ce05f485c Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 16 Apr 2026 11:03:30 +0200 Subject: [PATCH 4/4] Fix query selection to respect host_name for duplicate query_ids MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit selected_query_ids was HashSet (query_id only), so selecting a query with a duplicate query_id on another host would select both. This also caused the profile events diff view to include wrong queries. Change to HashSet ((query_id, host_name)) so selection, diff view, and query_id collection for flamegraphs/logs/perfetto all respect the host boundary. Stop mutating host_name for display stripping — use a separate display_host_name field instead, so query_key() always returns the original hostname and selection/lookup works correctly. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/interpreter/query.rs | 2 ++ src/view/queries_view.rs | 39 +++++++++++++++++++++++---------------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/src/interpreter/query.rs b/src/interpreter/query.rs index 08cd9cb..ce483b9 100644 --- a/src/interpreter/query.rs +++ b/src/interpreter/query.rs @@ -23,6 +23,7 @@ where pub struct Query { pub selection: bool, pub host_name: String, + pub display_host_name: Option, pub user: String, pub threads: usize, pub memory: i64, @@ -75,6 +76,7 @@ impl Query { Ok(Query { selection: false, host_name: columns.get::<_, _>(row_index, "host_name")?, + display_host_name: None, user: columns.get::<_, _>(row_index, "user")?, threads: columns.get::(row_index, "peak_threads_usage")? as usize, memory: columns.get::<_, _>(row_index, "peak_memory_usage")?, diff --git a/src/view/queries_view.rs b/src/view/queries_view.rs index e4fbfbf..f8fe13c 100644 --- a/src/view/queries_view.rs +++ b/src/view/queries_view.rs @@ -127,7 +127,11 @@ impl TableViewItem for Query { " ".to_string() } } - QueriesColumn::HostName => self.host_name.to_string(), + QueriesColumn::HostName => self + .display_host_name + .as_deref() + .unwrap_or(&self.host_name) + .to_string(), QueriesColumn::SubQueries => { if self.is_initial_query { return self.subqueries.to_string(); @@ -191,7 +195,7 @@ pub struct QueriesView { // For show only specific query query_id: Option, // For multi selection - selected_query_ids: HashSet, + selected_query_ids: HashSet, has_selection_column: bool, options: ViewOptions, // Is this running processes, or queries from system.query_log? @@ -227,11 +231,11 @@ impl QueriesView { for i in 0..processes.row_count() { let mut query = Query::from_clickhouse_block(&processes, i, self.is_system_processes)?; - if self.selected_query_ids.contains(&query.query_id) { - new_selected_query_ids.insert(query.query_id.clone()); + let key = query_key(&query); + if self.selected_query_ids.contains(&key) { + new_selected_query_ids.insert(key.clone()); } - let key = query_key(&query); if let Some(prev_item) = prev_items.get(&key) { query.prev_elapsed = Some(prev_item.elapsed); query.prev_profile_events = Some(prev_item.profile_events.clone()); @@ -276,7 +280,7 @@ impl QueriesView { } } - // Strip common hostname prefix and suffix + // Compute stripped hostname for display (to_column uses display_host_name) if !self.options.no_strip_hostname_suffix && items.len() > 1 { let (common_prefix, common_suffix) = find_common_hostname_prefix_and_suffix(items.iter().map(|q| q.host_name.as_str())); @@ -297,7 +301,7 @@ impl QueriesView { hostname = stripped; } - item.host_name = hostname.to_string(); + item.display_host_name = Some(hostname.to_string()); } } } @@ -309,7 +313,7 @@ impl QueriesView { self.has_selection_column = true; } for item in &mut items { - item.selection = self.selected_query_ids.contains(&item.query_id); + item.selection = self.selected_query_ids.contains(&query_key(item)); } } else if self.has_selection_column { self.table.remove_column(0); @@ -369,9 +373,12 @@ impl QueriesView { if !self.selected_query_ids.is_empty() { for q in self.items.values() { // NOTE: we have to look at both here, since selected_query_ids contains - // query_id not initial_query_id, while we are curious about both - if self.selected_query_ids.contains(&q.initial_query_id) - || self.selected_query_ids.contains(&q.query_id) + // (query_id, host_name) not (initial_query_id, host_name), while we are + // curious about both + let key = query_key(q); + let initial_key = (q.initial_query_id.clone(), q.host_name.clone()); + if self.selected_query_ids.contains(&initial_key) + || self.selected_query_ids.contains(&key) { query_ids.push(q.query_id.clone()); } @@ -475,7 +482,7 @@ impl QueriesView { let queries: Vec = self .items .values() - .filter(|q| self.selected_query_ids.contains(&q.query_id)) + .filter(|q| self.selected_query_ids.contains(&query_key(q))) .cloned() .collect(); @@ -631,12 +638,12 @@ impl QueriesView { fn action_select(&mut self) -> Result> { let selected_query = self.get_selected_query()?; - let query_id = selected_query.query_id.clone(); + let key = query_key(&selected_query); - if self.selected_query_ids.contains(&query_id) { - self.selected_query_ids.remove(&query_id); + if self.selected_query_ids.contains(&key) { + self.selected_query_ids.remove(&key); } else { - self.selected_query_ids.insert(query_id); + self.selected_query_ids.insert(key); } self.update_view();