From 09fcfd19156cf8c2741b01cd7b579763f3d2ef8a Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 30 Mar 2026 11:04:27 +0200 Subject: [PATCH 1/3] Add server-wide Perfetto export (all activity in time range) Follows the same pattern as ServerFlameGraph: a global action accessible via Ctrl+P/F8 that exports all server activity within the configured time range without requiring query selection. The _for_perfetto SQL methods now accept optional query_id filtering (None = all queries). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/interpreter/clickhouse.rs | 138 ++++++++++-- src/interpreter/worker.rs | 382 +++++++++++++++++++--------------- src/view/navigation.rs | 11 + 3 files changed, 341 insertions(+), 190 deletions(-) diff --git a/src/interpreter/clickhouse.rs b/src/interpreter/clickhouse.rs index dc0f1de..c5baf87 100644 --- a/src/interpreter/clickhouse.rs +++ b/src/interpreter/clickhouse.rs @@ -1204,13 +1204,21 @@ impl ClickHouse { pub async fn get_otel_spans_for_perfetto( &self, - query_ids: &[String], + query_ids: Option<&[String]>, start: DateTime, end: DateTime, ) -> Result { let dbtable = self.get_table_name("system", "opentelemetry_span_log"); let start_us = start.timestamp_micros(); let end_us = end.timestamp_micros(); + let query_id_filter = if let Some(ids) = query_ids { + format!( + "AND attribute['clickhouse.query_id'] IN ('{}')", + ids.join("','") + ) + } else { + String::new() + }; return self .execute(&format!( r#" @@ -1221,24 +1229,29 @@ impl ClickHouse { attribute['clickhouse.query_id'] AS query_id FROM {dbtable} WHERE start_time_us BETWEEN {start_us} AND {end_us} - AND attribute['clickhouse.query_id'] IN ('{query_ids}') + {query_id_filter} ORDER BY start_time_us "#, dbtable = dbtable, start_us = start_us, end_us = end_us, - query_ids = query_ids.join("','"), + query_id_filter = query_id_filter, )) .await; } pub async fn get_trace_log_counters_for_perfetto( &self, - query_ids: &[String], + query_ids: Option<&[String]>, start: DateTime, end: DateTime, ) -> Result { let dbtable = self.get_table_name("system", "trace_log"); + let query_id_filter = if let Some(ids) = query_ids { + format!("AND query_id IN ('{}')", ids.join("','")) + } else { + String::new() + }; return self .execute(&format!( r#" @@ -1252,7 +1265,7 @@ impl ClickHouse { event_time_microseconds FROM {dbtable} WHERE trace_type = 'ProfileEvent' AND increment != 0 - AND query_id IN ('{query_ids}') + {query_id_filter} AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) ORDER BY event_time_microseconds @@ -1262,18 +1275,23 @@ impl ClickHouse { .timestamp_nanos_opt() .ok_or(Error::msg("Invalid start"))?, end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, - query_ids = query_ids.join("','"), + query_id_filter = query_id_filter, )) .await; } pub async fn get_query_metrics_for_perfetto( &self, - query_ids: &[String], + query_ids: Option<&[String]>, start: DateTime, end: DateTime, ) -> Result> { let dbtable = self.get_table_name("system", "query_metric_log"); + let query_id_filter = if let Some(ids) = query_ids { + format!("AND query_id IN ('{}')", ids.join("','")) + } else { + String::new() + }; let block = self .execute(&format!( r#" @@ -1287,7 +1305,8 @@ impl ClickHouse { peak_memory_usage, COLUMNS('ProfileEvent_') FROM {dbtable} - WHERE query_id IN ('{query_ids}') + WHERE 1 + {query_id_filter} AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) ORDER BY event_time_microseconds @@ -1297,7 +1316,7 @@ impl ClickHouse { .timestamp_nanos_opt() .ok_or(Error::msg("Invalid start"))?, end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, - query_ids = query_ids.join("','"), + query_id_filter = query_id_filter, )) .await?; @@ -1342,11 +1361,16 @@ impl ClickHouse { pub async fn get_part_log_for_perfetto( &self, - query_ids: &[String], + query_ids: Option<&[String]>, start: DateTime, end: DateTime, ) -> Result { let dbtable = self.get_table_name("system", "part_log"); + let query_id_filter = if let Some(ids) = query_ids { + format!("AND query_id IN ('{}')", ids.join("','")) + } else { + String::new() + }; return self .execute(&format!( r#" @@ -1365,7 +1389,7 @@ impl ClickHouse { size_in_bytes FROM {dbtable} WHERE event_type NOT IN ('MergePartsStart', 'MutatePartStart') - AND query_id IN ('{query_ids}') + {query_id_filter} AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) ORDER BY event_time_microseconds @@ -1375,14 +1399,14 @@ impl ClickHouse { .timestamp_nanos_opt() .ok_or(Error::msg("Invalid start"))?, end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, - query_ids = query_ids.join("','"), + query_id_filter = query_id_filter, )) .await; } pub async fn get_stack_traces_for_perfetto( &self, - query_ids: &[String], + query_ids: Option<&[String]>, start: DateTime, end: DateTime, ) -> Result { @@ -1397,6 +1421,11 @@ impl ClickHouse { } else { "arrayReverse(arrayMap(addr -> demangle(addressToSymbol(addr)), trace))" }; + let query_id_filter = if let Some(ids) = query_ids { + format!("AND query_id IN ('{}')", ids.join("','")) + } else { + String::new() + }; return self .execute(&format!( r#" @@ -1412,7 +1441,7 @@ impl ClickHouse { query_id FROM {dbtable} WHERE trace_type IN ('CPU', 'Real', 'Memory') - AND query_id IN ('{query_ids}') + {query_id_filter} AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) ORDER BY event_time_microseconds @@ -1424,18 +1453,23 @@ impl ClickHouse { .timestamp_nanos_opt() .ok_or(Error::msg("Invalid start"))?, end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, - query_ids = query_ids.join("','"), + query_id_filter = query_id_filter, )) .await; } pub async fn get_text_log_for_perfetto( &self, - query_ids: &[String], + query_ids: Option<&[String]>, start: DateTime, end: DateTime, ) -> Result { let dbtable = self.get_table_name("system", "text_log"); + let query_id_filter = if let Some(ids) = query_ids { + format!("AND query_id IN ('{}')", ids.join("','")) + } else { + String::new() + }; return self .execute(&format!( r#" @@ -1449,7 +1483,8 @@ impl ClickHouse { message, query_id FROM {dbtable} - WHERE query_id IN ('{query_ids}') + WHERE 1 + {query_id_filter} AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) ORDER BY event_time_microseconds @@ -1459,18 +1494,23 @@ impl ClickHouse { .timestamp_nanos_opt() .ok_or(Error::msg("Invalid start"))?, end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, - query_ids = query_ids.join("','"), + query_id_filter = query_id_filter, )) .await; } pub async fn get_query_thread_log_for_perfetto( &self, - query_ids: &[String], + query_ids: Option<&[String]>, start: DateTime, end: DateTime, ) -> Result { let dbtable = self.get_table_name("system", "query_thread_log"); + let query_id_filter = if let Some(ids) = query_ids { + format!("AND query_id IN ('{}')", ids.join("','")) + } else { + String::new() + }; return self .execute(&format!( r#" @@ -1486,7 +1526,8 @@ impl ClickHouse { ProfileEvents.Values, peak_memory_usage FROM {dbtable} - WHERE query_id IN ('{query_ids}') + WHERE 1 + {query_id_filter} AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) ORDER BY event_time_microseconds @@ -1496,11 +1537,66 @@ impl ClickHouse { .timestamp_nanos_opt() .ok_or(Error::msg("Invalid start"))?, end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, - query_ids = query_ids.join("','"), + query_id_filter = query_id_filter, )) .await; } + pub async fn get_queries_for_perfetto( + &self, + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "query_log"); + return self + .execute( + format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + ProfileEvents.Names, + ProfileEvents.Values, + Settings.Names, + Settings.Values, + {peak_threads_usage} AS peak_threads_usage, + memory_usage::Int64 AS peak_memory_usage, + query_duration_ms/1e3 AS elapsed, + user, + is_initial_query, + initial_query_id, + query_id, + hostname as host_name, + current_database, + query_start_time_microseconds, + event_time_microseconds AS query_end_time_microseconds, + toValidUTF8(query) AS original_query, + normalizeQuery(query) AS normalized_query + FROM {dbtable} + WHERE type != 'QueryStart' + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + "#, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + dbtable = dbtable, + peak_threads_usage = if self + .quirks + .has(ClickHouseAvailableQuirks::QueryLogPeakThreadsUsage) + { + "peak_threads_usage" + } else { + "length(thread_ids)" + }, + ) + .as_str(), + ) + .await; + } + pub async fn get_warnings(&self) -> Result> { let table_exists: u64 = self .execute( diff --git a/src/interpreter/worker.rs b/src/interpreter/worker.rs index 82bc63e..5ff6e3f 100644 --- a/src/interpreter/worker.rs +++ b/src/interpreter/worker.rs @@ -2,7 +2,7 @@ use crate::{ common::{RelativeDateTime, Stopwatch}, interpreter::{ ContextArc, Query, - clickhouse::{Columns, TextLogArguments, TraceType}, + clickhouse::{ClickHouse, Columns, TextLogArguments, TraceType}, flamegraph, perfetto::PerfettoTraceBuilder, }, @@ -86,6 +86,8 @@ pub enum Event { DateTime, Option>, ), + // (start, end) + ServerPerfettoExport(DateTime, DateTime), } impl Event { @@ -114,6 +116,7 @@ impl Event { Event::AsynchronousInserts(..) => "AsynchronousInserts".to_string(), Event::ShareLogs(..) => "ShareLogs".to_string(), Event::PerfettoExport(..) => "PerfettoExport".to_string(), + Event::ServerPerfettoExport(..) => "ServerPerfettoExport".to_string(), } } } @@ -342,6 +345,185 @@ async fn render_or_share_flamegraph( return Ok(()); } +async fn build_and_serve_perfetto_trace( + context: ContextArc, + clickhouse: &Arc, + cb_sink: cursive::CbSink, + queries: &[Query], + query_ids: Option<&[String]>, + start: DateTime, + end_time: DateTime, +) -> Result<()> { + let perfetto_cfg = context.lock().unwrap().options.perfetto.clone(); + let mut builder = + PerfettoTraceBuilder::new(perfetto_cfg.per_server, perfetto_cfg.text_log_android); + + for q in queries { + log::info!( + "Perfetto query: id={} start_ns={} end_ns={} elapsed={}", + q.query_id, + q.query_start_time_microseconds + .timestamp_nanos_opt() + .unwrap_or(0), + q.query_end_time_microseconds + .timestamp_nanos_opt() + .unwrap_or(0), + q.elapsed, + ); + } + builder.add_queries(queries); + + let (otel, trace_log, metrics, parts, threads, stack_traces, text_logs) = tokio::join!( + async { + if perfetto_cfg.opentelemetry_span_log { + Some( + clickhouse + .get_otel_spans_for_perfetto(query_ids, start, end_time) + .await, + ) + } else { + None + } + }, + async { + if perfetto_cfg.trace_log { + Some( + clickhouse + .get_trace_log_counters_for_perfetto(query_ids, start, end_time) + .await, + ) + } else { + None + } + }, + async { + if perfetto_cfg.query_metric_log { + Some( + clickhouse + .get_query_metrics_for_perfetto(query_ids, start, end_time) + .await, + ) + } else { + None + } + }, + async { + if perfetto_cfg.part_log { + Some( + clickhouse + .get_part_log_for_perfetto(query_ids, start, end_time) + .await, + ) + } else { + None + } + }, + async { + if perfetto_cfg.query_thread_log { + Some( + clickhouse + .get_query_thread_log_for_perfetto(query_ids, start, end_time) + .await, + ) + } else { + None + } + }, + async { + if perfetto_cfg.trace_log { + Some( + clickhouse + .get_stack_traces_for_perfetto(query_ids, start, end_time) + .await, + ) + } else { + None + } + }, + async { + if perfetto_cfg.text_log { + Some( + clickhouse + .get_text_log_for_perfetto(query_ids, start, end_time) + .await, + ) + } else { + None + } + }, + ); + + match otel { + Some(Ok(block)) => builder.add_otel_spans(&block), + Some(Err(e)) => log::warn!("Failed to fetch opentelemetry_span_log: {}", e), + None => {} + } + match trace_log { + Some(Ok(block)) => builder.add_trace_log_counters(&block), + Some(Err(e)) => log::warn!("Failed to fetch trace_log counters: {}", e), + None => {} + } + match metrics { + Some(Ok(rows)) => builder.add_query_metrics(&rows), + Some(Err(e)) => log::warn!("Failed to fetch query_metric_log: {}", e), + None => {} + } + match parts { + Some(Ok(block)) => builder.add_part_log(&block), + Some(Err(e)) => log::warn!("Failed to fetch part_log: {}", e), + None => {} + } + match threads { + Some(Ok(block)) => builder.add_query_thread_log(&block), + Some(Err(e)) => log::warn!("Failed to fetch query_thread_log: {}", e), + None => {} + } + match stack_traces { + Some(Ok(block)) => builder.add_stack_traces(&block), + Some(Err(e)) => log::warn!("Failed to fetch trace_log stack traces: {}", e), + None => {} + } + match text_logs { + Some(Ok(block)) => builder.add_text_logs(&block), + Some(Err(e)) => log::warn!("Failed to fetch text_log: {}", e), + None => {} + } + + let data = builder.build(); + let data_len = data.len(); + if let Err(e) = std::fs::write("/tmp/chdig_perfetto.pftrace", &data) { + log::warn!("Failed to save debug trace: {}", e); + } else { + log::info!( + "Saved debug trace to /tmp/chdig_perfetto.pftrace ({} bytes)", + data_len + ); + } + + let server = context.lock().unwrap().get_or_start_perfetto_server(); + server.set_trace(data); + let url = server.get_perfetto_url(); + + let url_clone = url.clone(); + cb_sink + .send(Box::new(move |siv: &mut cursive::Cursive| { + siv.add_layer( + views::Dialog::text(format!( + "Perfetto trace exported ({} bytes)\n\nOpening: {}", + data_len, url + )) + .title("Perfetto Export") + .button("Close", |siv| { + siv.pop_layer(); + }), + ); + })) + .map_err(|_| anyhow!("Cannot send message to UI"))?; + + crate::utils::open_url_command(&url_clone).status()?; + Ok(()) +} + async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) -> Result<()> { let cb_sink = context.lock().unwrap().cb_sink.clone(); let clickhouse = context.lock().unwrap().clickhouse.clone(); @@ -773,176 +955,38 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) crate::utils::open_url_command(&url_clone).status()?; } Event::PerfettoExport(queries, query_ids, start, end) => { - let perfetto_cfg = context.lock().unwrap().options.perfetto.clone(); - let mut builder = - PerfettoTraceBuilder::new(perfetto_cfg.per_server, perfetto_cfg.text_log_android); - - for q in &queries { - log::info!( - "Perfetto query: id={} start_ns={} end_ns={} elapsed={}", - q.query_id, - q.query_start_time_microseconds - .timestamp_nanos_opt() - .unwrap_or(0), - q.query_end_time_microseconds - .timestamp_nanos_opt() - .unwrap_or(0), - q.elapsed, - ); - } - builder.add_queries(&queries); - let end_time = end.unwrap_or_else(Local::now) + chrono::TimeDelta::seconds(1); - - let (otel, trace_log, metrics, parts, threads, stack_traces, text_logs) = tokio::join!( - async { - if perfetto_cfg.opentelemetry_span_log { - Some( - clickhouse - .get_otel_spans_for_perfetto(&query_ids, start, end_time) - .await, - ) - } else { - None - } - }, - async { - if perfetto_cfg.trace_log { - Some( - clickhouse - .get_trace_log_counters_for_perfetto(&query_ids, start, end_time) - .await, - ) - } else { - None - } - }, - async { - // Consider using ProfileEvents instead - if perfetto_cfg.query_metric_log { - Some( - clickhouse - .get_query_metrics_for_perfetto(&query_ids, start, end_time) - .await, - ) - } else { - None - } - }, - async { - if perfetto_cfg.part_log { - Some( - clickhouse - .get_part_log_for_perfetto(&query_ids, start, end_time) - .await, - ) - } else { - None - } - }, - async { - if perfetto_cfg.query_thread_log { - Some( - clickhouse - .get_query_thread_log_for_perfetto(&query_ids, start, end_time) - .await, - ) - } else { - None - } - }, - async { - if perfetto_cfg.trace_log { - Some( - clickhouse - .get_stack_traces_for_perfetto(&query_ids, start, end_time) - .await, - ) - } else { - None - } - }, - async { - if perfetto_cfg.text_log { - Some( - clickhouse - .get_text_log_for_perfetto(&query_ids, start, end_time) - .await, - ) - } else { - None - } - }, - ); - - match otel { - Some(Ok(block)) => builder.add_otel_spans(&block), - Some(Err(e)) => log::warn!("Failed to fetch opentelemetry_span_log: {}", e), - None => {} - } - match trace_log { - Some(Ok(block)) => builder.add_trace_log_counters(&block), - Some(Err(e)) => log::warn!("Failed to fetch trace_log counters: {}", e), - None => {} - } - match metrics { - Some(Ok(rows)) => builder.add_query_metrics(&rows), - Some(Err(e)) => log::warn!("Failed to fetch query_metric_log: {}", e), - None => {} - } - match parts { - Some(Ok(block)) => builder.add_part_log(&block), - Some(Err(e)) => log::warn!("Failed to fetch part_log: {}", e), - None => {} - } - match threads { - Some(Ok(block)) => builder.add_query_thread_log(&block), - Some(Err(e)) => log::warn!("Failed to fetch query_thread_log: {}", e), - None => {} - } - match stack_traces { - Some(Ok(block)) => builder.add_stack_traces(&block), - Some(Err(e)) => log::warn!("Failed to fetch trace_log stack traces: {}", e), - None => {} - } - match text_logs { - Some(Ok(block)) => builder.add_text_logs(&block), - Some(Err(e)) => log::warn!("Failed to fetch text_log: {}", e), - None => {} - } - - let data = builder.build(); - let data_len = data.len(); - if let Err(e) = std::fs::write("/tmp/chdig_perfetto.pftrace", &data) { - log::warn!("Failed to save debug trace: {}", e); - } else { - log::info!( - "Saved debug trace to /tmp/chdig_perfetto.pftrace ({} bytes)", - data_len - ); + build_and_serve_perfetto_trace( + context.clone(), + &clickhouse, + cb_sink, + &queries, + Some(&query_ids), + start, + end_time, + ) + .await?; + } + Event::ServerPerfettoExport(start, end) => { + let query_block = clickhouse.get_queries_for_perfetto(start, end).await?; + let mut queries = Vec::new(); + for i in 0..query_block.row_count() { + match Query::from_clickhouse_block(&query_block, i, false) { + Ok(q) => queries.push(q), + Err(e) => log::warn!("Perfetto: failed to parse query row {}: {}", i, e), + } } - - let server = context.lock().unwrap().get_or_start_perfetto_server(); - server.set_trace(data); - let url = server.get_perfetto_url(); - - let url_clone = url.clone(); - cb_sink - .send(Box::new(move |siv: &mut cursive::Cursive| { - siv.add_layer( - views::Dialog::text(format!( - "Perfetto trace exported ({} bytes)\n\nOpening: {}", - data_len, url - )) - .title("Perfetto Export") - .button("Close", |siv| { - siv.pop_layer(); - }), - ); - })) - .map_err(|_| anyhow!("Cannot send message to UI"))?; - - crate::utils::open_url_command(&url_clone).status()?; + let end_time = end + chrono::TimeDelta::seconds(1); + build_and_serve_perfetto_trace( + context.clone(), + &clickhouse, + cb_sink, + &queries, + None, + start, + end_time, + ) + .await?; } } diff --git a/src/view/navigation.rs b/src/view/navigation.rs index 5f7ebbc..56bf116 100644 --- a/src/view/navigation.rs +++ b/src/view/navigation.rs @@ -59,6 +59,7 @@ pub trait Navigation { fn show_fuzzy_actions(&mut self); fn show_server_flamegraph(&mut self, tui: bool, trace_type: Option); fn show_jemalloc_flamegraph(&mut self, tui: bool); + fn show_server_perfetto(&mut self); fn show_connection_dialog(&mut self); fn drop_main_view(&mut self); @@ -288,6 +289,7 @@ impl Navigation for Cursive { context.add_global_action_without_shortcut(self, "Share Server Live Flamegraph", |siv| siv.show_server_flamegraph(false, None)); context.add_global_action_without_shortcut(self, "Jemalloc", |siv| siv.show_jemalloc_flamegraph(true)); context.add_global_action_without_shortcut(self, "Share Jemalloc", |siv| siv.show_jemalloc_flamegraph(false)); + context.add_global_action_without_shortcut(self, "Server Perfetto Export", |siv| siv.show_server_perfetto()); // If logging is done to file, console is always empty if context.options.service.log.is_none() { @@ -919,6 +921,15 @@ impl Navigation for Cursive { .send(true, WorkerEvent::JemallocFlameGraph(tui)); } + fn show_server_perfetto(&mut self) { + let mut context = self.user_data::().unwrap().lock().unwrap(); + let start: DateTime = context.options.view.start.clone().into(); + let end: DateTime = context.options.view.end.clone().into(); + context + .worker + .send(true, WorkerEvent::ServerPerfettoExport(start, end)); + } + fn show_connection_dialog(&mut self) { let context_arc = self.user_data::().unwrap().clone(); let context = context_arc.lock().unwrap(); From 8795b4b994f782430bbe4065efebad9eef44de63 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 30 Mar 2026 11:31:36 +0200 Subject: [PATCH 2/3] Add server-wide data sources to Perfetto export Add 10 new system table sources for server-wide Perfetto traces: metric_log, asynchronous_metric_log, asynchronous_insert_log, error_log, s3queue_log, azure_queue_log, blob_storage_log, background_schedule_pool_log, session_log, aggregated_zookeeper_log. Split the worker's perfetto helper into fetch_and_populate (query-scoped), fetch_server_perfetto_sources (server-only), and serve_perfetto_trace. Split settings UI into "Perfetto (query)" and "Perfetto (server)" groups. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/interpreter/clickhouse.rs | 392 ++++++++++++++++++++++++++ src/interpreter/options.rs | 20 ++ src/interpreter/perfetto.rs | 502 +++++++++++++++++++++++++++++++++- src/interpreter/worker.rs | 279 ++++++++++++++++--- src/view/navigation.rs | 99 ++++++- 5 files changed, 1248 insertions(+), 44 deletions(-) diff --git a/src/interpreter/clickhouse.rs b/src/interpreter/clickhouse.rs index c5baf87..0b3565e 100644 --- a/src/interpreter/clickhouse.rs +++ b/src/interpreter/clickhouse.rs @@ -158,6 +158,12 @@ pub struct QueryMetricRow { pub profile_events: HashMap, } +pub struct MetricLogRow { + pub timestamp_ns: u64, + pub profile_events: HashMap, + pub current_metrics: HashMap, +} + fn collect_values<'b, T: FromSql<'b>>(block: &'b Columns, column: &str) -> Vec { return (0..block.row_count()) .map(|i| block.get(i, column).unwrap()) @@ -1597,6 +1603,392 @@ impl ClickHouse { .await; } + pub async fn get_metric_log_for_perfetto( + &self, + start: DateTime, + end: DateTime, + ) -> Result> { + let dbtable = self.get_table_name("system", "metric_log"); + let block = self + .execute(&format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + event_time_microseconds, + COLUMNS('ProfileEvent_'), + COLUMNS('CurrentMetric_') + FROM {dbtable} + WHERE 1 + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + ORDER BY event_time_microseconds + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + )) + .await?; + + let pe_columns: Vec = block + .columns() + .iter() + .map(|c| c.name().to_string()) + .filter(|name| name.starts_with("ProfileEvent_")) + .collect(); + let cm_columns: Vec = block + .columns() + .iter() + .map(|c| c.name().to_string()) + .filter(|name| name.starts_with("CurrentMetric_")) + .collect(); + + let mut rows = Vec::with_capacity(block.row_count()); + for i in 0..block.row_count() { + let ts_ns = match block.get::, _>(i, "event_time_microseconds") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(e) => { + log::warn!( + "Perfetto: metric_log row {} event_time_microseconds: {}", + i, + e + ); + continue; + } + }; + let mut profile_events = HashMap::new(); + for col in &pe_columns { + let value: u64 = block.get(i, col.as_str()).unwrap_or(0); + if value != 0 { + let name = col.strip_prefix("ProfileEvent_").unwrap(); + profile_events.insert(name.to_string(), value); + } + } + let mut current_metrics = HashMap::new(); + for col in &cm_columns { + let value: i64 = block.get(i, col.as_str()).unwrap_or(0); + if value != 0 { + let name = col.strip_prefix("CurrentMetric_").unwrap(); + current_metrics.insert(name.to_string(), value); + } + } + rows.push(MetricLogRow { + timestamp_ns: ts_ns, + profile_events, + current_metrics, + }); + } + Ok(rows) + } + + pub async fn get_asynchronous_metric_log_for_perfetto( + &self, + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "asynchronous_metric_log"); + return self + .execute(&format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + metric, + value, + event_time_microseconds + FROM {dbtable} + WHERE 1 + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + ORDER BY event_time_microseconds + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + )) + .await; + } + + pub async fn get_asynchronous_insert_log_for_perfetto( + &self, + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "asynchronous_insert_log"); + return self + .execute(&format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + database, + table, + format, + status, + bytes, + exception, + event_time_microseconds, + flush_time_microseconds, + query_id + FROM {dbtable} + WHERE 1 + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + ORDER BY event_time_microseconds + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + )) + .await; + } + + pub async fn get_error_log_for_perfetto( + &self, + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "error_log"); + return self + .execute(&format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + error, + code, + value, + remote, + last_error_message, + event_time + FROM {dbtable} + WHERE 1 + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + ORDER BY event_time + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + )) + .await; + } + + pub async fn get_s3_queue_log_for_perfetto( + &self, + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "s3queue_log"); + return self + .execute(&format!( + r#" + SELECT + file_name, + rows_processed, + status, + processing_start_time, + processing_end_time, + exception + FROM {dbtable} + WHERE processing_start_time >= toDateTime(fromUnixTimestamp64Nano({start})) + AND processing_start_time <= toDateTime(fromUnixTimestamp64Nano({end})) + ORDER BY processing_start_time + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + )) + .await; + } + + pub async fn get_azure_queue_log_for_perfetto( + &self, + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "azure_queue_log"); + return self + .execute(&format!( + r#" + SELECT + database, + table, + file_name, + rows_processed, + status, + processing_start_time, + processing_end_time, + exception + FROM {dbtable} + WHERE processing_start_time >= toDateTime(fromUnixTimestamp64Nano({start})) + AND processing_start_time <= toDateTime(fromUnixTimestamp64Nano({end})) + ORDER BY processing_start_time + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + )) + .await; + } + + pub async fn get_blob_storage_log_for_perfetto( + &self, + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "blob_storage_log"); + return self + .execute(&format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + event_type, + query_id, + disk_name, + bucket, + remote_path, + data_size, + error, + event_time_microseconds + FROM {dbtable} + WHERE 1 + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + ORDER BY event_time_microseconds + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + )) + .await; + } + + pub async fn get_background_schedule_pool_log_for_perfetto( + &self, + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "background_schedule_pool_log"); + return self + .execute(&format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + log_name, + database, + table, + query_id, + duration_ms, + error, + exception, + event_time_microseconds + FROM {dbtable} + WHERE 1 + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + ORDER BY event_time_microseconds + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + )) + .await; + } + + pub async fn get_session_log_for_perfetto( + &self, + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "session_log"); + return self + .execute(&format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + type::String AS type, + user, + auth_type::String AS auth_type, + interface::String AS interface, + toString(client_address) AS client_address, + client_name, + failure_reason, + event_time_microseconds + FROM {dbtable} + WHERE 1 + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + ORDER BY event_time_microseconds + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + )) + .await; + } + + pub async fn get_aggregated_zookeeper_log_for_perfetto( + &self, + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "aggregated_zookeeper_log"); + return self + .execute(&format!( + r#" + SELECT + event_time, + session_id, + parent_path, + operation::String AS operation, + count, + mapKeys(errors) AS error_names, + mapValues(errors) AS error_counts, + average_latency, + component + FROM {dbtable} + WHERE event_time >= toDateTime(fromUnixTimestamp64Nano({start})) + AND event_time <= toDateTime(fromUnixTimestamp64Nano({end})) + ORDER BY event_time + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + )) + .await; + } + pub async fn get_warnings(&self) -> Result> { let table_exists: u64 = self .execute( diff --git a/src/interpreter/options.rs b/src/interpreter/options.rs index 355787e..75c0f72 100644 --- a/src/interpreter/options.rs +++ b/src/interpreter/options.rs @@ -297,6 +297,16 @@ pub struct ChDigPerfettoConfig { pub text_log: bool, pub text_log_android: bool, pub per_server: bool, + pub metric_log: bool, + pub asynchronous_metric_log: bool, + pub asynchronous_insert_log: bool, + pub error_log: bool, + pub s3_queue_log: bool, + pub azure_queue_log: bool, + pub blob_storage_log: bool, + pub background_schedule_pool_log: bool, + pub session_log: bool, + pub aggregated_zookeeper_log: bool, } impl Default for ChDigPerfettoConfig { @@ -310,6 +320,16 @@ impl Default for ChDigPerfettoConfig { text_log: true, text_log_android: true, per_server: true, + metric_log: true, + asynchronous_metric_log: false, + asynchronous_insert_log: true, + error_log: true, + s3_queue_log: true, + azure_queue_log: true, + blob_storage_log: true, + background_schedule_pool_log: true, + session_log: true, + aggregated_zookeeper_log: false, } } } diff --git a/src/interpreter/perfetto.rs b/src/interpreter/perfetto.rs index e1026a1..48e5b9e 100644 --- a/src/interpreter/perfetto.rs +++ b/src/interpreter/perfetto.rs @@ -1,5 +1,5 @@ use crate::interpreter::Query; -use crate::interpreter::clickhouse::{Columns, QueryMetricRow}; +use crate::interpreter::clickhouse::{Columns, MetricLogRow, QueryMetricRow}; use chrono::{DateTime, Local}; use chrono_tz::Tz; use perfetto_protos::android_log::AndroidLogPacket; @@ -745,6 +745,506 @@ impl PerfettoTraceBuilder { } } + pub fn add_metric_log(&mut self, rows: &[MetricLogRow]) { + if rows.is_empty() { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "Metric Log"); + + // event_name → (track_uuid, running_total) + let mut pe_tracks: HashMap = HashMap::new(); + // metric_name → track_uuid + let mut cm_tracks: HashMap = HashMap::new(); + + for row in rows { + for (name, value) in &row.profile_events { + let (unit, scale) = Self::unit_for_event(name); + let scaled = *value as i64 * scale; + let (track_uuid, running_total) = + pe_tracks.entry(name.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_counter_track(uuid, process_uuid, name, unit); + (uuid, 0) + }); + *running_total += scaled; + self.add_counter_value(*track_uuid, row.timestamp_ns, *running_total); + } + + for (name, value) in &row.current_metrics { + let (unit, scale) = Self::unit_for_event(name); + let track_uuid = *cm_tracks.entry(name.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_counter_track(uuid, process_uuid, name, unit); + uuid + }); + self.add_counter_value(track_uuid, row.timestamp_ns, *value * scale); + } + } + } + + pub fn add_asynchronous_metric_log(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "Async Metrics"); + + let mut counter_tracks: HashMap = HashMap::new(); + + for i in 0..columns.row_count() { + let metric: String = columns.get(i, "metric").unwrap_or_default(); + let value: f64 = columns.get(i, "value").unwrap_or(0.0); + let timestamp_ns: u64 = + match columns.get::, _>(i, "event_time_microseconds") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(e) => { + log::warn!( + "Perfetto: asynchronous_metric_log row {} event_time_microseconds: {}", + i, + e + ); + continue; + } + }; + + let track_uuid = *counter_tracks.entry(metric.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_counter_track(uuid, process_uuid, &metric, Unit::UNIT_UNSPECIFIED); + uuid + }); + + self.add_counter_value(track_uuid, timestamp_ns, value as i64); + } + } + + pub fn add_asynchronous_insert_log(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "Async Inserts"); + + let mut table_uuids: HashMap = HashMap::new(); + + for i in 0..columns.row_count() { + let database: String = columns.get(i, "database").unwrap_or_default(); + let table: String = columns.get(i, "table").unwrap_or_default(); + let format: String = columns.get(i, "format").unwrap_or_default(); + let status: String = columns.get(i, "status").unwrap_or_default(); + let bytes: u64 = columns.get(i, "bytes").unwrap_or(0); + let exception: String = columns.get(i, "exception").unwrap_or_default(); + let query_id: String = columns.get(i, "query_id").unwrap_or_default(); + + let start_ns: u64 = match columns.get::, _>(i, "event_time_microseconds") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(e) => { + log::warn!( + "Perfetto: asynchronous_insert_log row {} event_time_microseconds: {}", + i, + e + ); + continue; + } + }; + let end_ns: u64 = match columns.get::, _>(i, "flush_time_microseconds") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(_) => start_ns, + }; + + let table_key = format!("{}.{}", database, table); + let track_uuid = *table_uuids.entry(table_key.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_child_track(uuid, process_uuid, &table_key); + uuid + }); + + let label = format!("{} ({})", table_key, status); + let mut annotations = vec![ + Self::make_annotation_str("query_id", &query_id), + Self::make_annotation_str("format", &format), + Self::make_annotation_str("status", &status), + Self::make_annotation_int("bytes", bytes as i64), + ]; + if !exception.is_empty() { + annotations.push(Self::make_annotation_str("exception", &exception)); + } + + self.add_slice_begin(track_uuid, &label, start_ns, annotations); + self.add_slice_end(track_uuid, end_ns); + } + } + + pub fn add_error_log(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "Error Log"); + + let mut error_uuids: HashMap = HashMap::new(); + + for i in 0..columns.row_count() { + let error: String = columns.get(i, "error").unwrap_or_default(); + let code: i64 = columns.get(i, "code").unwrap_or(0); + let value: u64 = columns.get(i, "value").unwrap_or(0); + let remote: u8 = columns.get(i, "remote").unwrap_or(0); + let last_error_message: String = + columns.get(i, "last_error_message").unwrap_or_default(); + let timestamp_ns: u64 = match columns.get::, _>(i, "event_time") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(e) => { + log::warn!("Perfetto: error_log row {} event_time: {}", i, e); + continue; + } + }; + + let track_uuid = *error_uuids.entry(error.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_child_track(uuid, process_uuid, &error); + uuid + }); + + let mut annotations = vec![ + Self::make_annotation_int("code", code), + Self::make_annotation_int("value", value as i64), + Self::make_annotation_int("remote", remote as i64), + ]; + if !last_error_message.is_empty() { + annotations.push(Self::make_annotation_str( + "last_error_message", + &last_error_message, + )); + } + + self.add_instant(track_uuid, &error, timestamp_ns, annotations); + } + } + + pub fn add_s3_queue_log(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "S3 Queue"); + + let track_uuid = self.alloc_uuid(); + self.add_child_track(track_uuid, process_uuid, "files"); + + for i in 0..columns.row_count() { + let file_name: String = columns.get(i, "file_name").unwrap_or_default(); + let rows_processed: u64 = columns.get(i, "rows_processed").unwrap_or(0); + let status: String = columns.get(i, "status").unwrap_or_default(); + let exception: String = columns.get(i, "exception").unwrap_or_default(); + + let start_ns: u64 = match columns.get::, _>(i, "processing_start_time") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(_) => continue, + }; + let end_ns: u64 = match columns.get::, _>(i, "processing_end_time") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(_) => start_ns, + }; + + let mut annotations = vec![ + Self::make_annotation_str("file_name", &file_name), + Self::make_annotation_int("rows_processed", rows_processed as i64), + Self::make_annotation_str("status", &status), + ]; + if !exception.is_empty() { + annotations.push(Self::make_annotation_str("exception", &exception)); + } + + self.add_slice_begin(track_uuid, &file_name, start_ns, annotations); + self.add_slice_end(track_uuid, end_ns); + } + } + + pub fn add_azure_queue_log(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "Azure Queue"); + + let mut table_uuids: HashMap = HashMap::new(); + + for i in 0..columns.row_count() { + let database: String = columns.get(i, "database").unwrap_or_default(); + let table: String = columns.get(i, "table").unwrap_or_default(); + let file_name: String = columns.get(i, "file_name").unwrap_or_default(); + let rows_processed: u64 = columns.get(i, "rows_processed").unwrap_or(0); + let status: String = columns.get(i, "status").unwrap_or_default(); + let exception: String = columns.get(i, "exception").unwrap_or_default(); + + let start_ns: u64 = match columns.get::, _>(i, "processing_start_time") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(_) => continue, + }; + let end_ns: u64 = match columns.get::, _>(i, "processing_end_time") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(_) => start_ns, + }; + + let table_key = format!("{}.{}", database, table); + let track_uuid = *table_uuids.entry(table_key.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_child_track(uuid, process_uuid, &table_key); + uuid + }); + + let mut annotations = vec![ + Self::make_annotation_str("file_name", &file_name), + Self::make_annotation_int("rows_processed", rows_processed as i64), + Self::make_annotation_str("status", &status), + ]; + if !exception.is_empty() { + annotations.push(Self::make_annotation_str("exception", &exception)); + } + + self.add_slice_begin(track_uuid, &file_name, start_ns, annotations); + self.add_slice_end(track_uuid, end_ns); + } + } + + pub fn add_blob_storage_log(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "Blob Storage"); + + let mut type_uuids: HashMap = HashMap::new(); + + for i in 0..columns.row_count() { + let event_type: String = columns.get(i, "event_type").unwrap_or_default(); + let query_id: String = columns.get(i, "query_id").unwrap_or_default(); + let disk_name: String = columns.get(i, "disk_name").unwrap_or_default(); + let bucket: String = columns.get(i, "bucket").unwrap_or_default(); + let remote_path: String = columns.get(i, "remote_path").unwrap_or_default(); + let data_size: u64 = columns.get(i, "data_size").unwrap_or(0); + let error: String = columns.get(i, "error").unwrap_or_default(); + let timestamp_ns: u64 = + match columns.get::, _>(i, "event_time_microseconds") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(e) => { + log::warn!( + "Perfetto: blob_storage_log row {} event_time_microseconds: {}", + i, + e + ); + continue; + } + }; + + let track_uuid = *type_uuids.entry(event_type.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_child_track(uuid, process_uuid, &event_type); + uuid + }); + + let mut annotations = vec![ + Self::make_annotation_str("query_id", &query_id), + Self::make_annotation_str("disk_name", &disk_name), + Self::make_annotation_str("bucket", &bucket), + Self::make_annotation_str("remote_path", &remote_path), + Self::make_annotation_int("data_size", data_size as i64), + ]; + if !error.is_empty() { + annotations.push(Self::make_annotation_str("error", &error)); + } + + self.add_instant(track_uuid, &event_type, timestamp_ns, annotations); + } + } + + pub fn add_background_pool_log(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "Background Pool"); + + let mut log_name_uuids: HashMap = HashMap::new(); + + for i in 0..columns.row_count() { + let log_name: String = columns.get(i, "log_name").unwrap_or_default(); + let database: String = columns.get(i, "database").unwrap_or_default(); + let table: String = columns.get(i, "table").unwrap_or_default(); + let query_id: String = columns.get(i, "query_id").unwrap_or_default(); + let duration_ms: u64 = columns.get(i, "duration_ms").unwrap_or(0); + let error: String = columns.get(i, "error").unwrap_or_default(); + let exception: String = columns.get(i, "exception").unwrap_or_default(); + let end_ns: u64 = match columns.get::, _>(i, "event_time_microseconds") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(e) => { + log::warn!( + "Perfetto: background_schedule_pool_log row {} event_time_microseconds: {}", + i, + e + ); + continue; + } + }; + let start_ns = end_ns.saturating_sub(duration_ms * 1_000_000); + + let track_uuid = *log_name_uuids.entry(log_name.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_child_track(uuid, process_uuid, &log_name); + uuid + }); + + let mut annotations = vec![ + Self::make_annotation_str("database", &database), + Self::make_annotation_str("table", &table), + Self::make_annotation_str("query_id", &query_id), + ]; + if !error.is_empty() { + annotations.push(Self::make_annotation_str("error", &error)); + } + if !exception.is_empty() { + annotations.push(Self::make_annotation_str("exception", &exception)); + } + + let label = format!("{}.{}", database, table); + self.add_slice_begin(track_uuid, &label, start_ns, annotations); + self.add_slice_end(track_uuid, end_ns); + } + } + + pub fn add_session_log(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "Sessions"); + + let mut type_uuids: HashMap = HashMap::new(); + + for i in 0..columns.row_count() { + let session_type: String = columns.get(i, "type").unwrap_or_default(); + let user: String = columns.get(i, "user").unwrap_or_default(); + let auth_type: String = columns.get(i, "auth_type").unwrap_or_default(); + let interface: String = columns.get(i, "interface").unwrap_or_default(); + let client_address: String = columns.get(i, "client_address").unwrap_or_default(); + let client_name: String = columns.get(i, "client_name").unwrap_or_default(); + let failure_reason: String = columns.get(i, "failure_reason").unwrap_or_default(); + let timestamp_ns: u64 = + match columns.get::, _>(i, "event_time_microseconds") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(e) => { + log::warn!( + "Perfetto: session_log row {} event_time_microseconds: {}", + i, + e + ); + continue; + } + }; + + let track_uuid = *type_uuids.entry(session_type.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_child_track(uuid, process_uuid, &session_type); + uuid + }); + + let mut annotations = vec![ + Self::make_annotation_str("user", &user), + Self::make_annotation_str("auth_type", &auth_type), + Self::make_annotation_str("interface", &interface), + Self::make_annotation_str("client_address", &client_address), + Self::make_annotation_str("client_name", &client_name), + ]; + if !failure_reason.is_empty() { + annotations.push(Self::make_annotation_str("failure_reason", &failure_reason)); + } + + let label = format!("{} ({})", session_type, user); + self.add_instant(track_uuid, &label, timestamp_ns, annotations); + } + } + + pub fn add_aggregated_zookeeper_log(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "ZooKeeper"); + + // operation → (count_track, latency_track) + let mut op_tracks: HashMap = HashMap::new(); + + for i in 0..columns.row_count() { + let operation: String = columns.get(i, "operation").unwrap_or_default(); + let count: u64 = columns.get(i, "count").unwrap_or(0); + let average_latency: f64 = columns.get(i, "average_latency").unwrap_or(0.0); + let parent_path: String = columns.get(i, "parent_path").unwrap_or_default(); + let component: String = columns.get(i, "component").unwrap_or_default(); + + let timestamp_ns: u64 = match columns.get::, _>(i, "event_time") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(e) => { + log::warn!( + "Perfetto: aggregated_zookeeper_log row {} event_time: {}", + i, + e + ); + continue; + } + }; + + let (count_track, latency_track) = + *op_tracks.entry(operation.clone()).or_insert_with(|| { + let ct = self.alloc_uuid(); + self.add_counter_track( + ct, + process_uuid, + &format!("{} count", operation), + Unit::UNIT_UNSPECIFIED, + ); + let lt = self.alloc_uuid(); + self.add_counter_track( + lt, + process_uuid, + &format!("{} avg_latency", operation), + Unit::UNIT_UNSPECIFIED, + ); + (ct, lt) + }); + + self.add_counter_value(count_track, timestamp_ns, count as i64); + self.add_counter_value(latency_track, timestamp_ns, average_latency as i64); + + // Also emit an instant with annotations for the detail + if !parent_path.is_empty() || !component.is_empty() { + let error_names: Vec = columns.get(i, "error_names").unwrap_or_default(); + let error_counts: Vec = columns.get(i, "error_counts").unwrap_or_default(); + + let mut annotations = vec![ + Self::make_annotation_str("parent_path", &parent_path), + Self::make_annotation_str("component", &component), + Self::make_annotation_int("count", count as i64), + ]; + for (en, ec) in error_names.iter().zip(error_counts.iter()) { + annotations.push(Self::make_annotation_int(en, *ec as i64)); + } + + // Use count_track for the instant + self.add_instant(count_track, &operation, timestamp_ns, annotations); + } + } + } + fn alloc_intern_id(&mut self) -> u64 { let id = self.next_intern_id; self.next_intern_id += 1; diff --git a/src/interpreter/worker.rs b/src/interpreter/worker.rs index 5ff6e3f..509dda1 100644 --- a/src/interpreter/worker.rs +++ b/src/interpreter/worker.rs @@ -345,37 +345,19 @@ async fn render_or_share_flamegraph( return Ok(()); } -async fn build_and_serve_perfetto_trace( - context: ContextArc, +use crate::interpreter::options::ChDigPerfettoConfig; + +async fn fetch_and_populate_perfetto_trace( clickhouse: &Arc, - cb_sink: cursive::CbSink, - queries: &[Query], + builder: &mut PerfettoTraceBuilder, + cfg: &ChDigPerfettoConfig, query_ids: Option<&[String]>, start: DateTime, end_time: DateTime, -) -> Result<()> { - let perfetto_cfg = context.lock().unwrap().options.perfetto.clone(); - let mut builder = - PerfettoTraceBuilder::new(perfetto_cfg.per_server, perfetto_cfg.text_log_android); - - for q in queries { - log::info!( - "Perfetto query: id={} start_ns={} end_ns={} elapsed={}", - q.query_id, - q.query_start_time_microseconds - .timestamp_nanos_opt() - .unwrap_or(0), - q.query_end_time_microseconds - .timestamp_nanos_opt() - .unwrap_or(0), - q.elapsed, - ); - } - builder.add_queries(queries); - +) { let (otel, trace_log, metrics, parts, threads, stack_traces, text_logs) = tokio::join!( async { - if perfetto_cfg.opentelemetry_span_log { + if cfg.opentelemetry_span_log { Some( clickhouse .get_otel_spans_for_perfetto(query_ids, start, end_time) @@ -386,7 +368,7 @@ async fn build_and_serve_perfetto_trace( } }, async { - if perfetto_cfg.trace_log { + if cfg.trace_log { Some( clickhouse .get_trace_log_counters_for_perfetto(query_ids, start, end_time) @@ -397,7 +379,7 @@ async fn build_and_serve_perfetto_trace( } }, async { - if perfetto_cfg.query_metric_log { + if cfg.query_metric_log { Some( clickhouse .get_query_metrics_for_perfetto(query_ids, start, end_time) @@ -408,7 +390,7 @@ async fn build_and_serve_perfetto_trace( } }, async { - if perfetto_cfg.part_log { + if cfg.part_log { Some( clickhouse .get_part_log_for_perfetto(query_ids, start, end_time) @@ -419,7 +401,7 @@ async fn build_and_serve_perfetto_trace( } }, async { - if perfetto_cfg.query_thread_log { + if cfg.query_thread_log { Some( clickhouse .get_query_thread_log_for_perfetto(query_ids, start, end_time) @@ -430,7 +412,7 @@ async fn build_and_serve_perfetto_trace( } }, async { - if perfetto_cfg.trace_log { + if cfg.trace_log { Some( clickhouse .get_stack_traces_for_perfetto(query_ids, start, end_time) @@ -441,7 +423,7 @@ async fn build_and_serve_perfetto_trace( } }, async { - if perfetto_cfg.text_log { + if cfg.text_log { Some( clickhouse .get_text_log_for_perfetto(query_ids, start, end_time) @@ -488,7 +470,192 @@ async fn build_and_serve_perfetto_trace( Some(Err(e)) => log::warn!("Failed to fetch text_log: {}", e), None => {} } +} +async fn fetch_server_perfetto_sources( + clickhouse: &Arc, + builder: &mut PerfettoTraceBuilder, + cfg: &ChDigPerfettoConfig, + start: DateTime, + end_time: DateTime, +) { + let ( + metric_log, + async_metric_log, + async_insert_log, + error_log, + s3_queue_log, + azure_queue_log, + blob_storage_log, + bg_pool_log, + session_log, + zk_log, + ) = tokio::join!( + async { + if cfg.metric_log { + Some( + clickhouse + .get_metric_log_for_perfetto(start, end_time) + .await, + ) + } else { + None + } + }, + async { + if cfg.asynchronous_metric_log { + Some( + clickhouse + .get_asynchronous_metric_log_for_perfetto(start, end_time) + .await, + ) + } else { + None + } + }, + async { + if cfg.asynchronous_insert_log { + Some( + clickhouse + .get_asynchronous_insert_log_for_perfetto(start, end_time) + .await, + ) + } else { + None + } + }, + async { + if cfg.error_log { + Some(clickhouse.get_error_log_for_perfetto(start, end_time).await) + } else { + None + } + }, + async { + if cfg.s3_queue_log { + Some( + clickhouse + .get_s3_queue_log_for_perfetto(start, end_time) + .await, + ) + } else { + None + } + }, + async { + if cfg.azure_queue_log { + Some( + clickhouse + .get_azure_queue_log_for_perfetto(start, end_time) + .await, + ) + } else { + None + } + }, + async { + if cfg.blob_storage_log { + Some( + clickhouse + .get_blob_storage_log_for_perfetto(start, end_time) + .await, + ) + } else { + None + } + }, + async { + if cfg.background_schedule_pool_log { + Some( + clickhouse + .get_background_schedule_pool_log_for_perfetto(start, end_time) + .await, + ) + } else { + None + } + }, + async { + if cfg.session_log { + Some( + clickhouse + .get_session_log_for_perfetto(start, end_time) + .await, + ) + } else { + None + } + }, + async { + if cfg.aggregated_zookeeper_log { + Some( + clickhouse + .get_aggregated_zookeeper_log_for_perfetto(start, end_time) + .await, + ) + } else { + None + } + }, + ); + + match metric_log { + Some(Ok(rows)) => builder.add_metric_log(&rows), + Some(Err(e)) => log::warn!("Failed to fetch metric_log: {}", e), + None => {} + } + match async_metric_log { + Some(Ok(block)) => builder.add_asynchronous_metric_log(&block), + Some(Err(e)) => log::warn!("Failed to fetch asynchronous_metric_log: {}", e), + None => {} + } + match async_insert_log { + Some(Ok(block)) => builder.add_asynchronous_insert_log(&block), + Some(Err(e)) => log::warn!("Failed to fetch asynchronous_insert_log: {}", e), + None => {} + } + match error_log { + Some(Ok(block)) => builder.add_error_log(&block), + Some(Err(e)) => log::warn!("Failed to fetch error_log: {}", e), + None => {} + } + match s3_queue_log { + Some(Ok(block)) => builder.add_s3_queue_log(&block), + Some(Err(e)) => log::warn!("Failed to fetch s3queue_log: {}", e), + None => {} + } + match azure_queue_log { + Some(Ok(block)) => builder.add_azure_queue_log(&block), + Some(Err(e)) => log::warn!("Failed to fetch azure_queue_log: {}", e), + None => {} + } + match blob_storage_log { + Some(Ok(block)) => builder.add_blob_storage_log(&block), + Some(Err(e)) => log::warn!("Failed to fetch blob_storage_log: {}", e), + None => {} + } + match bg_pool_log { + Some(Ok(block)) => builder.add_background_pool_log(&block), + Some(Err(e)) => log::warn!("Failed to fetch background_schedule_pool_log: {}", e), + None => {} + } + match session_log { + Some(Ok(block)) => builder.add_session_log(&block), + Some(Err(e)) => log::warn!("Failed to fetch session_log: {}", e), + None => {} + } + match zk_log { + Some(Ok(block)) => builder.add_aggregated_zookeeper_log(&block), + Some(Err(e)) => log::warn!("Failed to fetch aggregated_zookeeper_log: {}", e), + None => {} + } +} + +fn serve_perfetto_trace( + context: ContextArc, + cb_sink: cursive::CbSink, + builder: PerfettoTraceBuilder, +) -> Result<()> { let data = builder.build(); let data_len = data.len(); if let Err(e) = std::fs::write("/tmp/chdig_perfetto.pftrace", &data) { @@ -955,19 +1122,38 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) crate::utils::open_url_command(&url_clone).status()?; } Event::PerfettoExport(queries, query_ids, start, end) => { + let perfetto_cfg = context.lock().unwrap().options.perfetto.clone(); let end_time = end.unwrap_or_else(Local::now) + chrono::TimeDelta::seconds(1); - build_and_serve_perfetto_trace( - context.clone(), + let mut builder = + PerfettoTraceBuilder::new(perfetto_cfg.per_server, perfetto_cfg.text_log_android); + + for q in &queries { + log::info!( + "Perfetto query: id={} start_ns={} end_ns={} elapsed={}", + q.query_id, + q.query_start_time_microseconds + .timestamp_nanos_opt() + .unwrap_or(0), + q.query_end_time_microseconds + .timestamp_nanos_opt() + .unwrap_or(0), + q.elapsed, + ); + } + builder.add_queries(&queries); + fetch_and_populate_perfetto_trace( &clickhouse, - cb_sink, - &queries, + &mut builder, + &perfetto_cfg, Some(&query_ids), start, end_time, ) - .await?; + .await; + serve_perfetto_trace(context.clone(), cb_sink, builder)?; } Event::ServerPerfettoExport(start, end) => { + let perfetto_cfg = context.lock().unwrap().options.perfetto.clone(); let query_block = clickhouse.get_queries_for_perfetto(start, end).await?; let mut queries = Vec::new(); for i in 0..query_block.row_count() { @@ -977,16 +1163,27 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) } } let end_time = end + chrono::TimeDelta::seconds(1); - build_and_serve_perfetto_trace( - context.clone(), + let mut builder = + PerfettoTraceBuilder::new(perfetto_cfg.per_server, perfetto_cfg.text_log_android); + builder.add_queries(&queries); + fetch_and_populate_perfetto_trace( &clickhouse, - cb_sink, - &queries, + &mut builder, + &perfetto_cfg, None, start, end_time, ) - .await?; + .await; + fetch_server_perfetto_sources( + &clickhouse, + &mut builder, + &perfetto_cfg, + start, + end_time, + ) + .await; + serve_perfetto_trace(context.clone(), cb_sink, builder)?; } } diff --git a/src/view/navigation.rs b/src/view/navigation.rs index 56bf116..ac45adc 100644 --- a/src/view/navigation.rs +++ b/src/view/navigation.rs @@ -503,8 +503,8 @@ impl Navigation for Cursive { ))); layout.add_child(DummyView); - // Perfetto - layout.add_child(bold("Perfetto:")); + // Perfetto (query) + layout.add_child(bold("Perfetto (query):")); layout.add_child(checkbox_row( "opentelemetry_span_log", "set_otel", @@ -547,6 +547,60 @@ impl Navigation for Cursive { )); layout.add_child(DummyView); + // Perfetto (server) + layout.add_child(bold("Perfetto (server):")); + layout.add_child(checkbox_row( + "metric_log", + "set_metric_log", + opts.perfetto.metric_log, + )); + layout.add_child(checkbox_row( + "asynchronous_metric_log", + "set_async_metric_log", + opts.perfetto.asynchronous_metric_log, + )); + layout.add_child(checkbox_row( + "asynchronous_insert_log", + "set_async_insert_log", + opts.perfetto.asynchronous_insert_log, + )); + layout.add_child(checkbox_row( + "error_log", + "set_error_log", + opts.perfetto.error_log, + )); + layout.add_child(checkbox_row( + "s3_queue_log", + "set_s3_queue_log", + opts.perfetto.s3_queue_log, + )); + layout.add_child(checkbox_row( + "azure_queue_log", + "set_azure_queue_log", + opts.perfetto.azure_queue_log, + )); + layout.add_child(checkbox_row( + "blob_storage_log", + "set_blob_storage_log", + opts.perfetto.blob_storage_log, + )); + layout.add_child(checkbox_row( + "background_schedule_pool_log", + "set_bg_pool_log", + opts.perfetto.background_schedule_pool_log, + )); + layout.add_child(checkbox_row( + "session_log", + "set_session_log", + opts.perfetto.session_log, + )); + layout.add_child(checkbox_row( + "aggregated_zookeeper_log", + "set_zk_log", + opts.perfetto.aggregated_zookeeper_log, + )); + layout.add_child(DummyView); + // Runtime (read-only) layout.add_child(bold("Runtime:")); layout.add_child(TextView::new(format!( @@ -627,6 +681,37 @@ impl Navigation for Cursive { .call_on_name("set_per_server", |v: &mut Checkbox| v.is_checked()) .unwrap(); + let metric_log = siv + .call_on_name("set_metric_log", |v: &mut Checkbox| v.is_checked()) + .unwrap(); + let async_metric_log = siv + .call_on_name("set_async_metric_log", |v: &mut Checkbox| v.is_checked()) + .unwrap(); + let async_insert_log = siv + .call_on_name("set_async_insert_log", |v: &mut Checkbox| v.is_checked()) + .unwrap(); + let error_log = siv + .call_on_name("set_error_log", |v: &mut Checkbox| v.is_checked()) + .unwrap(); + let s3_queue_log = siv + .call_on_name("set_s3_queue_log", |v: &mut Checkbox| v.is_checked()) + .unwrap(); + let azure_queue_log = siv + .call_on_name("set_azure_queue_log", |v: &mut Checkbox| v.is_checked()) + .unwrap(); + let blob_storage_log = siv + .call_on_name("set_blob_storage_log", |v: &mut Checkbox| v.is_checked()) + .unwrap(); + let bg_pool_log = siv + .call_on_name("set_bg_pool_log", |v: &mut Checkbox| v.is_checked()) + .unwrap(); + let session_log = siv + .call_on_name("set_session_log", |v: &mut Checkbox| v.is_checked()) + .unwrap(); + let zk_log = siv + .call_on_name("set_zk_log", |v: &mut Checkbox| v.is_checked()) + .unwrap(); + let limit: u64 = match limit_str.parse() { Ok(v) => v, Err(_) => { @@ -679,6 +764,16 @@ impl Navigation for Cursive { ctx.options.perfetto.text_log = text_log; ctx.options.perfetto.text_log_android = text_log_android; ctx.options.perfetto.per_server = per_server; + ctx.options.perfetto.metric_log = metric_log; + ctx.options.perfetto.asynchronous_metric_log = async_metric_log; + ctx.options.perfetto.asynchronous_insert_log = async_insert_log; + ctx.options.perfetto.error_log = error_log; + ctx.options.perfetto.s3_queue_log = s3_queue_log; + ctx.options.perfetto.azure_queue_log = azure_queue_log; + ctx.options.perfetto.blob_storage_log = blob_storage_log; + ctx.options.perfetto.background_schedule_pool_log = bg_pool_log; + ctx.options.perfetto.session_log = session_log; + ctx.options.perfetto.aggregated_zookeeper_log = zk_log; ctx.trigger_view_refresh(); } From 5ef9d6e3c838393d6f8d6e6e414f0155184346c2 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 30 Mar 2026 11:42:15 +0200 Subject: [PATCH 3/3] Fix error_log columns, add time range dialog for server Perfetto export - Fix error_log query: use `error` (not `name`) and `event_time` DateTime (not `event_time_microseconds` DateTime64), matching actual table schema. - Add a time range dialog when invoking Server Perfetto Export, pre-filled with current view start/end. Includes a warning that server-wide export is heavy (~1.5 GiB/server for 2 min of data) so users should reduce the time range accordingly. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/view/navigation.rs | 76 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 70 insertions(+), 6 deletions(-) diff --git a/src/view/navigation.rs b/src/view/navigation.rs index ac45adc..649c9fd 100644 --- a/src/view/navigation.rs +++ b/src/view/navigation.rs @@ -1017,12 +1017,76 @@ impl Navigation for Cursive { } fn show_server_perfetto(&mut self) { - let mut context = self.user_data::().unwrap().lock().unwrap(); - let start: DateTime = context.options.view.start.clone().into(); - let end: DateTime = context.options.view.end.clone().into(); - context - .worker - .send(true, WorkerEvent::ServerPerfettoExport(start, end)); + let context = self.user_data::().unwrap().clone(); + let (start_str, end_str) = { + let ctx = context.lock().unwrap(); + ( + ctx.options.view.start.to_editable_string(), + ctx.options.view.end.to_editable_string(), + ) + }; + + let on_submit = move |siv: &mut Cursive| { + let start_str = siv + .call_on_name("perfetto_start", |view: &mut EditView| view.get_content()) + .unwrap(); + let end_str = siv + .call_on_name("perfetto_end", |view: &mut EditView| view.get_content()) + .unwrap(); + + let start = match start_str.parse::() { + Ok(v) => v, + Err(err) => { + siv.add_layer(Dialog::info(format!("Invalid start: {}", err))); + return; + } + }; + let end = match end_str.parse::() { + Ok(v) => v, + Err(err) => { + siv.add_layer(Dialog::info(format!("Invalid end: {}", err))); + return; + } + }; + + siv.pop_layer(); + + let start_dt: DateTime = start.into(); + let end_dt: DateTime = end.into(); + let mut ctx = siv.user_data::().unwrap().lock().unwrap(); + ctx.worker + .send(true, WorkerEvent::ServerPerfettoExport(start_dt, end_dt)); + }; + + let dialog = Dialog::new() + .title("Server Perfetto Export") + .content( + LinearLayout::vertical() + .child(TextView::new( + "Warning: server-wide export is heavy (~1.5 GiB/server\nfor 2 min). Consider reducing the time range.", + )) + .child(DummyView) + .child(TextView::new("start:")) + .child( + EditView::new() + .content(start_str) + .with_name("perfetto_start") + .fixed_width(30), + ) + .child(DummyView) + .child(TextView::new("end:")) + .child( + EditView::new() + .content(end_str) + .with_name("perfetto_end") + .fixed_width(30), + ), + ) + .button("Export", on_submit) + .button("Cancel", |siv| { + siv.pop_layer(); + }); + self.add_layer(dialog); } fn show_connection_dialog(&mut self) {