diff --git a/Cargo.lock b/Cargo.lock index 8cdf320..1df2f7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -168,6 +168,12 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0ffd3d69bd89910509a5d31d1f1353f38ccffdd116dd0099bbd6627f7bd8ad8" +[[package]] +name = "ascii" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" + [[package]] name = "async-trait" version = "0.1.89" @@ -353,7 +359,9 @@ dependencies = [ "libc", "log", "percent-encoding", + "perfetto_protos", "pretty_assertions", + "protobuf", "quick-xml", "rand", "ratatui", @@ -366,6 +374,7 @@ dependencies = [ "strfmt", "syntect", "tempfile", + "tiny_http", "tokio", "unicode-width 0.1.14", "url", @@ -404,6 +413,12 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "chunked_transfer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901" + [[package]] name = "cipher" version = "0.4.4" @@ -1321,6 +1336,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" +[[package]] +name = "home" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "hostname" version = "0.3.1" @@ -2064,6 +2088,17 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "perfetto_protos" +version = "0.51.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634f88ebef917643d9e470dbf99637fbfa8295f54525954d156641f06823f9ac" +dependencies = [ + "protobuf", + "protobuf-codegen", + "protoc-bin-vendored", +] + [[package]] name = "phf" version = "0.11.3" @@ -2240,6 +2275,121 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-codegen" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace" +dependencies = [ + "anyhow", + "once_cell", + "protobuf", + "protobuf-parse", + "regex", + "tempfile", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-parse" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973" +dependencies = [ + "anyhow", + "indexmap", + "log", + "protobuf", + "protobuf-support", + "tempfile", + "thiserror 1.0.69", + "which", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] + +[[package]] +name = "protoc-bin-vendored" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1c381df33c98266b5f08186583660090a4ffa0889e76c7e9a5e175f645a67fa" +dependencies = [ + "protoc-bin-vendored-linux-aarch_64", + "protoc-bin-vendored-linux-ppcle_64", + "protoc-bin-vendored-linux-s390_64", + "protoc-bin-vendored-linux-x86_32", + "protoc-bin-vendored-linux-x86_64", + "protoc-bin-vendored-macos-aarch_64", + "protoc-bin-vendored-macos-x86_64", + "protoc-bin-vendored-win32", +] + +[[package]] +name = "protoc-bin-vendored-linux-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c350df4d49b5b9e3ca79f7e646fde2377b199e13cfa87320308397e1f37e1a4c" + +[[package]] +name = "protoc-bin-vendored-linux-ppcle_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55a63e6c7244f19b5c6393f025017eb5d793fd5467823a099740a7a4222440c" + +[[package]] +name = "protoc-bin-vendored-linux-s390_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dba5565db4288e935d5330a07c264a4ee8e4a5b4a4e6f4e83fad824cc32f3b0" + +[[package]] +name = "protoc-bin-vendored-linux-x86_32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8854774b24ee28b7868cd71dccaae8e02a2365e67a4a87a6cd11ee6cdbdf9cf5" + +[[package]] +name = "protoc-bin-vendored-linux-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b38b07546580df720fa464ce124c4b03630a6fb83e05c336fea2a241df7e5d78" + +[[package]] +name = "protoc-bin-vendored-macos-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89278a9926ce312e51f1d999fee8825d324d603213344a9a706daa009f1d8092" + +[[package]] +name = "protoc-bin-vendored-macos-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81745feda7ccfb9471d7a4de888f0652e806d5795b61480605d4943176299756" + +[[package]] +name = "protoc-bin-vendored-win32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95067976aca6421a523e491fce939a3e65249bac4b977adee0ee9771568e8aa3" + [[package]] name = "quick-xml" version = "0.38.4" @@ -2745,6 +2895,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" dependencies = [ "fastrand", + "getrandom 0.3.4", "once_cell", "rustix 1.1.4", "windows-sys 0.61.2", @@ -2832,6 +2983,18 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny_http" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389915df6413a2e74fb181895f933386023c71110878cd0825588928e64cdc82" +dependencies = [ + "ascii", + "chunked_transfer", + "httpdate", + "log", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -3231,6 +3394,18 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix 0.38.44", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 0e89869..23a8a60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,10 @@ flamelens = { git = "https://github.com/ys-l/flamelens", branch = "main", defaul ratatui = { version = "0.29.0", features = ["unstable-rendered-line-info"] } # Should **only** with the flamelens, since cursive re-export it, while flamelens does not crossterm = { version = "0.28.1", features = ["use-dev-tty"] } +# Perfetto +perfetto_protos = { version = "*", default-features = false } +protobuf = { version = "3", default-features = false } +tiny_http = { version = "*", default-features = false } # Sharing aes-gcm = { version = "0.10", default-features = false, features = ["aes", "alloc"] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } diff --git a/Documentation/FAQ.md b/Documentation/FAQ.md index 6f0c9c0..75600a9 100644 --- a/Documentation/FAQ.md +++ b/Documentation/FAQ.md @@ -87,6 +87,56 @@ highly not recommended), you can use `chdig --connection prod`. [1]: https://github.com/ClickHouse/ClickHouse/pull/45715 [2]: https://github.com/ClickHouse/ClickHouse/pull/46480 +### What is Perfetto export? + +Pressing `X` in the queries view exports a timeline visualization to +[Perfetto UI](https://ui.perfetto.dev). + +An embedded HTTP server starts on port 9001 (lazily, on first export) and serves +the binary protobuf trace. The browser opens automatically. + +The export includes data from multiple ClickHouse system tables (when available): + +| Source table | What it shows | +|---|---| +| In-memory queries | Query duration slices grouped by host/user | +| `system.opentelemetry_span_log` | Processor pipeline spans | +| `system.trace_log` (ProfileEvent) | Per-thread counter increments | +| `system.query_metric_log` | Per-query metric snapshots | +| `system.part_log` | Part lifecycle events (NewPart, MergeParts, etc.) | +| `system.query_thread_log` | Per-thread execution with ProfileEvents | + +Tables that don't exist are silently skipped — the export works with whatever +data is available. + +When queries are selected with `Space`, only those queries are exported. + +To get the richest traces, enable these ClickHouse settings for the queries you +want to analyze: + +```sql +SET + opentelemetry_start_trace_probability = 1, + opentelemetry_trace_processors = 1, + opentelemetry_trace_cpu_scheduling = 1, + log_query_threads = 1, + trace_profile_events = 1, + query_metric_log_interval = 0 +``` + +- `opentelemetry_start_trace_probability` / `opentelemetry_trace_processors` / + `opentelemetry_trace_cpu_scheduling` — enable OpenTelemetry spans for the + query execution pipeline (populates `system.opentelemetry_span_log`) +- `log_query_threads` — log per-thread execution info + (populates `system.query_thread_log`) +- `trace_profile_events` — record ProfileEvent counter increments with + timestamps into `system.trace_log`, giving precise per-event timelines +- `query_metric_log_interval` — controls periodic metric snapshots in + `system.query_metric_log` (sampled every N milliseconds). Set to `0` to + disable if you prefer the more accurate `trace_profile_events`. Set to e.g. + `1000` (1 second) if you want periodic snapshots — note that these are + sampled and less precise than `trace_profile_events`, but lighter on overhead + ### What is flamegraph? It is best to start with [Brendan Gregg's site](https://www.brendangregg.com/flamegraphs.html) for a solid introduction to flamegraphs. diff --git a/src/interpreter/clickhouse.rs b/src/interpreter/clickhouse.rs index 4174108..a9f1b7c 100644 --- a/src/interpreter/clickhouse.rs +++ b/src/interpreter/clickhouse.rs @@ -4,6 +4,7 @@ use crate::{ }; use anyhow::{Error, Result}; use chrono::{DateTime, Local}; +use chrono_tz::Tz; use clickhouse_rs::{ Block, Options, Pool, types::{Complex, FromSql}, @@ -149,6 +150,13 @@ pub struct ClickHouseServerSummary { pub update_interval: u64, } +pub struct QueryMetricRow { + pub timestamp_ns: u64, + pub memory_usage: i64, + pub peak_memory_usage: i64, + pub profile_events: HashMap, +} + fn collect_values<'b, T: FromSql<'b>>(block: &'b Columns, column: &str) -> Vec { return (0..block.row_count()) .map(|i| block.get(i, column).unwrap()) @@ -1193,6 +1201,303 @@ impl ClickHouse { Ok(query_ids) } + pub async fn get_otel_spans_for_perfetto( + &self, + query_ids: &[String], + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "opentelemetry_span_log"); + let start_us = start.timestamp_micros(); + let end_us = end.timestamp_micros(); + return self + .execute(&format!( + r#" + SELECT + operation_name, + start_time_us, + finish_time_us, + attribute['clickhouse.query_id'] AS query_id + FROM {dbtable} + WHERE start_time_us BETWEEN {start_us} AND {end_us} + AND attribute['clickhouse.query_id'] IN ('{query_ids}') + ORDER BY start_time_us + "#, + dbtable = dbtable, + start_us = start_us, + end_us = end_us, + query_ids = query_ids.join("','"), + )) + .await; + } + + pub async fn get_trace_log_counters_for_perfetto( + &self, + query_ids: &[String], + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "trace_log"); + return self + .execute(&format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + query_id, + event, + increment, + event_time_microseconds + FROM {dbtable} + WHERE trace_type = 'ProfileEvent' AND increment != 0 + AND query_id IN ('{query_ids}') + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + ORDER BY event_time_microseconds + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + query_ids = query_ids.join("','"), + )) + .await; + } + + pub async fn get_query_metrics_for_perfetto( + &self, + query_ids: &[String], + start: DateTime, + end: DateTime, + ) -> Result> { + let dbtable = self.get_table_name("system", "query_metric_log"); + let block = self + .execute(&format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + query_id, + event_time_microseconds, + memory_usage, + peak_memory_usage, + COLUMNS('ProfileEvent_') + FROM {dbtable} + WHERE query_id IN ('{query_ids}') + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + ORDER BY event_time_microseconds + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + query_ids = query_ids.join("','"), + )) + .await?; + + let pe_columns: Vec = block + .columns() + .iter() + .map(|c| c.name().to_string()) + .filter(|name| name.starts_with("ProfileEvent_")) + .collect(); + + let mut rows = Vec::with_capacity(block.row_count()); + for i in 0..block.row_count() { + let mut profile_events = HashMap::new(); + for col in &pe_columns { + let value: u64 = block.get(i, col.as_str()).unwrap_or(0); + if value != 0 { + let name = col.strip_prefix("ProfileEvent_").unwrap(); + profile_events.insert(name.to_string(), value); + } + } + let ts_ns = match block.get::, _>(i, "event_time_microseconds") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(e) => { + log::warn!( + "Perfetto: query_metric_log row {} event_time_microseconds: {}", + i, + e + ); + continue; + } + }; + rows.push(QueryMetricRow { + timestamp_ns: ts_ns, + memory_usage: block.get(i, "memory_usage").unwrap_or(0), + peak_memory_usage: block.get(i, "peak_memory_usage").unwrap_or(0), + profile_events, + }); + } + Ok(rows) + } + + pub async fn get_part_log_for_perfetto( + &self, + query_ids: &[String], + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "part_log"); + return self + .execute(&format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + event_type, + event_time_microseconds, + duration_ms, + database, + table, + part_name, + query_id, + rows, + size_in_bytes + FROM {dbtable} + WHERE event_type NOT IN ('MergePartsStart', 'MutatePartStart') + AND query_id IN ('{query_ids}') + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + ORDER BY event_time_microseconds + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + query_ids = query_ids.join("','"), + )) + .await; + } + + pub async fn get_stack_traces_for_perfetto( + &self, + query_ids: &[String], + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "trace_log"); + let symbol_expr = if self + .quirks + .has(ClickHouseAvailableQuirks::TraceLogHasSymbols) + { + r#"arrayReverse(if(empty(symbols), + arrayMap(addr -> demangle(addressToSymbol(addr)), trace), + symbols))"# + } else { + "arrayReverse(arrayMap(addr -> demangle(addressToSymbol(addr)), trace))" + }; + return self + .execute(&format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + event_time_microseconds, + thread_id, + trace_type::String AS trace_type, + {symbol_expr} AS stack, + size + FROM {dbtable} + WHERE trace_type IN ('CPU', 'Real', 'Memory') + AND query_id IN ('{query_ids}') + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + ORDER BY event_time_microseconds + SETTINGS allow_introspection_functions=1 + "#, + dbtable = dbtable, + symbol_expr = symbol_expr, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + query_ids = query_ids.join("','"), + )) + .await; + } + + pub async fn get_text_log_for_perfetto( + &self, + query_ids: &[String], + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "text_log"); + return self + .execute(&format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + event_time_microseconds, + level::String AS level, + logger_name::String AS logger_name, + message + FROM {dbtable} + WHERE query_id IN ('{query_ids}') + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + ORDER BY event_time_microseconds + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + query_ids = query_ids.join("','"), + )) + .await; + } + + pub async fn get_query_thread_log_for_perfetto( + &self, + query_ids: &[String], + start: DateTime, + end: DateTime, + ) -> Result { + let dbtable = self.get_table_name("system", "query_thread_log"); + return self + .execute(&format!( + r#" + WITH + fromUnixTimestamp64Nano({start}) AS start_, + fromUnixTimestamp64Nano({end}) AS end_ + SELECT + query_id, + thread_name, + thread_id, + event_time_microseconds, + query_duration_ms, + ProfileEvents.Names, + ProfileEvents.Values, + peak_memory_usage + FROM {dbtable} + WHERE query_id IN ('{query_ids}') + AND event_date >= toDate(start_) AND event_time >= toDateTime(start_) + AND event_date <= toDate(end_) AND event_time <= toDateTime(end_) + ORDER BY query_id, thread_id + "#, + dbtable = dbtable, + start = start + .timestamp_nanos_opt() + .ok_or(Error::msg("Invalid start"))?, + end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?, + query_ids = query_ids.join("','"), + )) + .await; + } + pub async fn get_warnings(&self) -> Result> { let table_exists: u64 = self .execute( diff --git a/src/interpreter/context.rs b/src/interpreter/context.rs index b2e996f..89db4f0 100644 --- a/src/interpreter/context.rs +++ b/src/interpreter/context.rs @@ -2,6 +2,7 @@ use crate::actions::ActionDescription; use crate::interpreter::{ ClickHouse, Worker, options::{ChDigOptions, ChDigViews}, + perfetto::PerfettoServer, }; use anyhow::Result; use chrono::Duration; @@ -46,6 +47,8 @@ pub struct Context { pub selected_host: Option, pub current_view: Option, + + pub perfetto_server: Option>, } impl Context { @@ -79,6 +82,7 @@ impl Context { search_history: crate::view::search_history::SearchHistory::new(), selected_host: None, current_view: None, + perfetto_server: None, })); context.lock().unwrap().worker.start(context.clone()); @@ -187,6 +191,15 @@ impl Context { return self.add_view_action(view, text, Event::Unknown(Vec::from([0u8])), cb); } + pub fn get_or_start_perfetto_server(&mut self) -> Arc { + if let Some(ref server) = self.perfetto_server { + return server.clone(); + } + let server = Arc::new(PerfettoServer::new()); + self.perfetto_server = Some(server.clone()); + server + } + pub fn trigger_view_refresh(&self) { self.background_runner_force .store(true, atomic::Ordering::SeqCst); diff --git a/src/interpreter/mod.rs b/src/interpreter/mod.rs index facae1b..760b440 100644 --- a/src/interpreter/mod.rs +++ b/src/interpreter/mod.rs @@ -8,6 +8,7 @@ mod worker; // only functions pub mod flamegraph; pub mod options; +pub mod perfetto; pub use clickhouse::ClickHouse; pub use clickhouse::TextLogArguments; diff --git a/src/interpreter/perfetto.rs b/src/interpreter/perfetto.rs new file mode 100644 index 0000000..77f8fc8 --- /dev/null +++ b/src/interpreter/perfetto.rs @@ -0,0 +1,875 @@ +use crate::interpreter::Query; +use crate::interpreter::clickhouse::{Columns, QueryMetricRow}; +use chrono::{DateTime, Local}; +use chrono_tz::Tz; +use perfetto_protos::clock_snapshot::ClockSnapshot; +use perfetto_protos::clock_snapshot::clock_snapshot::Clock; +use perfetto_protos::counter_descriptor::CounterDescriptor; +use perfetto_protos::counter_descriptor::counter_descriptor::Unit; +use perfetto_protos::debug_annotation::DebugAnnotation; +use perfetto_protos::debug_annotation::debug_annotation as da; +use perfetto_protos::interned_data::InternedData; +use perfetto_protos::profile_common::{Callstack, Frame, InternedString, Mapping}; +use perfetto_protos::profile_packet::StreamingProfilePacket; +use perfetto_protos::thread_descriptor::ThreadDescriptor as PerfettoThreadDescriptor; +use perfetto_protos::trace::Trace; +use perfetto_protos::trace_packet::TracePacket; +use perfetto_protos::trace_packet::trace_packet::Data; +use perfetto_protos::track_descriptor::TrackDescriptor; +use perfetto_protos::track_descriptor::track_descriptor::Static_or_dynamic_name; +use perfetto_protos::track_event::TrackEvent; +use perfetto_protos::track_event::track_event::{Counter_value_field, Name_field, Type}; +use protobuf::{EnumOrUnknown, Message, MessageField}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +const SEQUENCE_ID: u32 = 1; +// Sequence-scoped clock (>=64), mapped to BOOTTIME via ClockSnapshot. +// All TrackEvent packets (slices, counters) use this clock on SEQUENCE_ID. +// +// Clock timeline notes: +// - Clock 128 is sequence-scoped: the ClockSnapshot on SEQUENCE_ID defines it +// ONLY for that sequence. Other sequences cannot use it (see add_stack_traces). +// - The ClockSnapshot must be the first packet (timestamp=0, self-referencing). +// Using a non-zero timestamp in clock 6 (BOOTTIME) instead doesn't work reliably. +// - The first make_packet() call emits SEQ_INCREMENTAL_STATE_CLEARED (flags=1). +// This is safe because it's a TrackDescriptor without a timestamp (processed +// inline before the ClockSnapshot enters the sort queue). +// - Never emit SEQ_INCREMENTAL_STATE_CLEARED on timestamped packets sharing this +// sequence — it destroys the clock mapping for all subsequent packets. +const CLOCK_ID_UNIXTIME: u32 = 128; + +pub struct PerfettoTraceBuilder { + packets: Vec, + next_uuid: u64, + next_sequence_id: u32, + first_event_emitted: bool, + + function_name_iids: HashMap, + frame_iids: HashMap<(u64, u64), u64>, + callstack_iids: HashMap, u64>, + next_intern_id: u64, +} + +impl PerfettoTraceBuilder { + pub fn new() -> Self { + PerfettoTraceBuilder { + packets: Vec::new(), + next_uuid: 1, + next_sequence_id: SEQUENCE_ID + 1, + first_event_emitted: false, + + function_name_iids: HashMap::new(), + frame_iids: HashMap::new(), + callstack_iids: HashMap::new(), + next_intern_id: 1, + } + } + + fn alloc_uuid(&mut self) -> u64 { + let uuid = self.next_uuid; + self.next_uuid += 1; + uuid + } + + fn make_packet(&mut self) -> TracePacket { + let mut pkt = TracePacket::new(); + pkt.set_trusted_packet_sequence_id(SEQUENCE_ID); + if !self.first_event_emitted { + pkt.sequence_flags = Some(1); // SEQ_INCREMENTAL_STATE_CLEARED + self.first_event_emitted = true; + } else { + pkt.sequence_flags = Some(2); // SEQ_NEEDS_INCREMENTAL_STATE + } + pkt + } + + fn make_event_packet(&mut self, ts_ns: u64) -> TracePacket { + let mut pkt = self.make_packet(); + pkt.timestamp = Some(ts_ns); + pkt.timestamp_clock_id = Some(CLOCK_ID_UNIXTIME); + pkt + } + + fn add_process_track(&mut self, uuid: u64, name: &str) { + let mut pkt = self.make_packet(); + let mut td = TrackDescriptor::new(); + td.uuid = Some(uuid); + td.static_or_dynamic_name = Some(Static_or_dynamic_name::Name(name.to_string())); + pkt.data = Some(Data::TrackDescriptor(td)); + self.packets.push(pkt); + } + + fn add_child_track(&mut self, uuid: u64, parent_uuid: u64, name: &str) { + let mut pkt = self.make_packet(); + let mut td = TrackDescriptor::new(); + td.uuid = Some(uuid); + td.parent_uuid = Some(parent_uuid); + td.static_or_dynamic_name = Some(Static_or_dynamic_name::Name(name.to_string())); + pkt.data = Some(Data::TrackDescriptor(td)); + self.packets.push(pkt); + } + + fn add_counter_track(&mut self, uuid: u64, parent_uuid: u64, name: &str, unit: Unit) { + let mut pkt = self.make_packet(); + let mut td = TrackDescriptor::new(); + td.uuid = Some(uuid); + td.parent_uuid = Some(parent_uuid); + td.static_or_dynamic_name = Some(Static_or_dynamic_name::Name(name.to_string())); + let mut cd = CounterDescriptor::new(); + cd.unit = Some(EnumOrUnknown::new(unit)); + td.counter = MessageField::some(cd); + pkt.data = Some(Data::TrackDescriptor(td)); + self.packets.push(pkt); + } + + fn add_slice_begin( + &mut self, + track_uuid: u64, + name: &str, + ts_ns: u64, + annotations: Vec, + ) { + let mut pkt = self.make_event_packet(ts_ns); + let mut te = TrackEvent::new(); + te.type_ = Some(EnumOrUnknown::new(Type::TYPE_SLICE_BEGIN)); + te.track_uuid = Some(track_uuid); + te.name_field = Some(Name_field::Name(name.to_string())); + te.debug_annotations = annotations; + pkt.data = Some(Data::TrackEvent(te)); + self.packets.push(pkt); + } + + fn add_slice_end(&mut self, track_uuid: u64, ts_ns: u64) { + let mut pkt = self.make_event_packet(ts_ns); + let mut te = TrackEvent::new(); + te.type_ = Some(EnumOrUnknown::new(Type::TYPE_SLICE_END)); + te.track_uuid = Some(track_uuid); + pkt.data = Some(Data::TrackEvent(te)); + self.packets.push(pkt); + } + + fn add_instant( + &mut self, + track_uuid: u64, + name: &str, + ts_ns: u64, + annotations: Vec, + ) { + let mut pkt = self.make_event_packet(ts_ns); + let mut te = TrackEvent::new(); + te.type_ = Some(EnumOrUnknown::new(Type::TYPE_INSTANT)); + te.track_uuid = Some(track_uuid); + te.name_field = Some(Name_field::Name(name.to_string())); + te.debug_annotations = annotations; + pkt.data = Some(Data::TrackEvent(te)); + self.packets.push(pkt); + } + + fn add_counter_value(&mut self, track_uuid: u64, ts_ns: u64, value: i64) { + let mut pkt = self.make_event_packet(ts_ns); + let mut te = TrackEvent::new(); + te.type_ = Some(EnumOrUnknown::new(Type::TYPE_COUNTER)); + te.track_uuid = Some(track_uuid); + te.counter_value_field = Some(Counter_value_field::CounterValue(value)); + pkt.data = Some(Data::TrackEvent(te)); + self.packets.push(pkt); + } + + fn make_annotation_str(name: &str, value: &str) -> DebugAnnotation { + let mut ann = DebugAnnotation::new(); + ann.name_field = Some(da::Name_field::Name(name.to_string())); + ann.value = Some(da::Value::StringValue(value.to_string())); + ann + } + + fn make_annotation_int(name: &str, value: i64) -> DebugAnnotation { + let mut ann = DebugAnnotation::new(); + ann.name_field = Some(da::Name_field::Name(name.to_string())); + ann.value = Some(da::Value::IntValue(value)); + ann + } + + fn datetime_to_ns(dt: &DateTime) -> Option { + dt.timestamp_nanos_opt().map(|ns| ns as u64) + } + + // --- High-level methods --- + + pub fn add_queries(&mut self, queries: &[Query]) { + // Group by host_name → process, then user → thread + let mut host_uuids: HashMap = HashMap::new(); + // (host, user) → thread_uuid + let mut user_uuids: HashMap<(String, String), u64> = HashMap::new(); + + for q in queries { + let host_uuid = *host_uuids.entry(q.host_name.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_process_track(uuid, &q.host_name); + uuid + }); + + let user_key = (q.host_name.clone(), q.user.clone()); + let user_uuid = *user_uuids.entry(user_key).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_child_track(uuid, host_uuid, &q.user); + uuid + }); + + let start_ns = match Self::datetime_to_ns(&q.query_start_time_microseconds) { + Some(ns) => ns, + None => { + log::warn!("Perfetto: query {} has invalid start time", q.query_id); + continue; + } + }; + let end_ns = match Self::datetime_to_ns(&q.query_end_time_microseconds) { + Some(ns) => ns, + None => { + log::warn!("Perfetto: query {} has invalid end time", q.query_id); + continue; + } + }; + + let label = if q.normalized_query.chars().count() > 80 { + let truncated: String = q.normalized_query.chars().take(80).collect(); + format!("{}...", truncated) + } else { + q.normalized_query.clone() + }; + + let mut annotations = vec![ + Self::make_annotation_str("query_id", &q.query_id), + Self::make_annotation_str("initial_query_id", &q.initial_query_id), + Self::make_annotation_str("user", &q.user), + Self::make_annotation_str("database", &q.current_database), + Self::make_annotation_int("memory", q.memory), + Self::make_annotation_int("threads", q.threads as i64), + ]; + if !q.original_query.is_empty() { + annotations.push(Self::make_annotation_str("query", &q.original_query)); + } + + self.add_slice_begin(user_uuid, &label, start_ns, annotations); + self.add_slice_end(user_uuid, end_ns); + } + } + + pub fn add_otel_spans(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + // Group spans by operation_name → thread track under query's host process + // Use a single process track for OTel spans + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "OpenTelemetry Spans"); + + let mut op_uuids: HashMap = HashMap::new(); + + for i in 0..columns.row_count() { + let operation_name: String = columns.get(i, "operation_name").unwrap_or_default(); + let start_us: u64 = match columns.get(i, "start_time_us") { + Ok(v) => v, + Err(e) => { + log::warn!("Perfetto: otel_span row {} start_time_us: {}", i, e); + continue; + } + }; + let finish_us: u64 = match columns.get(i, "finish_time_us") { + Ok(v) => v, + Err(e) => { + log::warn!("Perfetto: otel_span row {} finish_time_us: {}", i, e); + continue; + } + }; + let query_id: String = columns.get(i, "query_id").unwrap_or_default(); + + let start_ns = start_us.saturating_mul(1000); + let end_ns = finish_us.saturating_mul(1000); + + let track_uuid = *op_uuids.entry(operation_name.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_child_track( + uuid, + process_uuid, + &format!("Processor: {}", operation_name), + ); + uuid + }); + + let annotations = vec![Self::make_annotation_str("query_id", &query_id)]; + + self.add_slice_begin(track_uuid, &operation_name, start_ns, annotations); + self.add_slice_end(track_uuid, end_ns); + } + } + + pub fn add_trace_log_counters(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "ProfileEvent Counters"); + + // event_name → (track_uuid, running_total) + let mut counter_tracks: HashMap = HashMap::new(); + + for i in 0..columns.row_count() { + let event: String = columns.get(i, "event").unwrap_or_default(); + let increment: i64 = columns.get(i, "increment").unwrap_or(0); + let timestamp_ns: u64 = + match columns.get::, _>(i, "event_time_microseconds") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(e) => { + log::warn!( + "Perfetto: trace_log row {} event_time_microseconds: {}", + i, + e + ); + continue; + } + }; + + let (track_uuid, running_total) = + counter_tracks.entry(event.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_counter_track(uuid, process_uuid, &event, Unit::UNIT_UNSPECIFIED); + (uuid, 0) + }); + + *running_total += increment; + self.add_counter_value(*track_uuid, timestamp_ns, *running_total); + } + } + + pub fn add_query_metrics(&mut self, rows: &[QueryMetricRow]) { + if rows.is_empty() { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "Query Metrics"); + + // metric_name → track_uuid + let mut counter_tracks: HashMap = HashMap::new(); + + for row in rows { + // memory_usage / peak_memory_usage + for (name, value, unit) in [ + ("memory_usage", row.memory_usage, Unit::UNIT_SIZE_BYTES), + ( + "peak_memory_usage", + row.peak_memory_usage, + Unit::UNIT_SIZE_BYTES, + ), + ] { + let track_uuid = *counter_tracks.entry(name.to_string()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_counter_track(uuid, process_uuid, name, unit); + uuid + }); + self.add_counter_value(track_uuid, row.timestamp_ns, value); + } + + // ProfileEvent_* metrics + for (name, value) in &row.profile_events { + let track_uuid = *counter_tracks.entry(name.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_counter_track(uuid, process_uuid, name, Unit::UNIT_UNSPECIFIED); + uuid + }); + self.add_counter_value(track_uuid, row.timestamp_ns, *value as i64); + } + } + } + + pub fn add_part_log(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "Part Log"); + + // "db.table" → thread_uuid + let mut table_uuids: HashMap = HashMap::new(); + + for i in 0..columns.row_count() { + let event_type: String = columns.get(i, "event_type").unwrap_or_default(); + let event_time: DateTime = match columns.get(i, "event_time_microseconds") { + Ok(v) => v, + Err(e) => { + log::warn!( + "Perfetto: part_log row {} event_time_microseconds: {}", + i, + e + ); + continue; + } + }; + let duration_ms: u64 = columns.get(i, "duration_ms").unwrap_or(0); + let database: String = columns.get(i, "database").unwrap_or_default(); + let table: String = columns.get(i, "table").unwrap_or_default(); + let part_name: String = columns.get(i, "part_name").unwrap_or_default(); + let query_id: String = columns.get(i, "query_id").unwrap_or_default(); + let rows: u64 = columns.get(i, "rows").unwrap_or(0); + let size_in_bytes: u64 = columns.get(i, "size_in_bytes").unwrap_or(0); + + let table_key = format!("{}.{}", database, table); + let track_uuid = *table_uuids.entry(table_key.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_child_track(uuid, process_uuid, &table_key); + uuid + }); + + let start_ns = match event_time.with_timezone(&Local).timestamp_nanos_opt() { + Some(ns) => ns as u64, + None => { + log::warn!("Perfetto: part_log row {} timestamp overflow", i); + continue; + } + }; + let end_ns = start_ns + duration_ms * 1_000_000; + + let label = format!("{} {}", event_type, part_name); + let annotations = vec![ + Self::make_annotation_str("query_id", &query_id), + Self::make_annotation_str("part_name", &part_name), + Self::make_annotation_int("rows", rows as i64), + Self::make_annotation_int("size_in_bytes", size_in_bytes as i64), + ]; + + self.add_slice_begin(track_uuid, &label, start_ns, annotations); + self.add_slice_end(track_uuid, end_ns); + } + } + + pub fn add_query_thread_log(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "Query Threads"); + + // thread_name → track_uuid + let mut thread_uuids: HashMap = HashMap::new(); + + for i in 0..columns.row_count() { + let query_id: String = columns.get(i, "query_id").unwrap_or_default(); + let thread_name: String = columns.get(i, "thread_name").unwrap_or_default(); + let timestamp_ns: u64 = + match columns.get::, _>(i, "event_time_microseconds") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(e) => { + log::warn!( + "Perfetto: query_thread_log row {} event_time_microseconds: {}", + i, + e + ); + continue; + } + }; + let duration_ms: u64 = columns.get(i, "query_duration_ms").unwrap_or(0); + let peak_memory: i64 = columns.get(i, "peak_memory_usage").unwrap_or(0); + + let names: Vec = columns.get(i, "ProfileEvents.Names").unwrap_or_default(); + let values: Vec = columns.get(i, "ProfileEvents.Values").unwrap_or_default(); + + let track_uuid = *thread_uuids.entry(thread_name.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_child_track(uuid, process_uuid, &thread_name); + uuid + }); + + let end_ns = timestamp_ns; + let start_ns = end_ns.saturating_sub(duration_ms * 1_000_000); + + let mut annotations = vec![ + Self::make_annotation_str("query_id", &query_id), + Self::make_annotation_str("thread_name", &thread_name), + Self::make_annotation_int("peak_memory_usage", peak_memory), + ]; + + // Add top ProfileEvents as annotations + let mut pe: Vec<(String, u64)> = names.into_iter().zip(values).collect(); + pe.sort_by(|a, b| b.1.cmp(&a.1)); + for (name, value) in pe.iter().take(10) { + if *value > 0 { + annotations.push(Self::make_annotation_int(name, *value as i64)); + } + } + + self.add_slice_begin(track_uuid, &query_id, start_ns, annotations); + self.add_slice_end(track_uuid, end_ns); + } + } + + pub fn add_text_logs(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + let process_uuid = self.alloc_uuid(); + self.add_process_track(process_uuid, "Query Logs"); + + // level → track_uuid + let mut level_uuids: HashMap = HashMap::new(); + + for i in 0..columns.row_count() { + let level: String = columns.get(i, "level").unwrap_or_default(); + let logger_name: String = columns.get(i, "logger_name").unwrap_or_default(); + let message: String = columns.get(i, "message").unwrap_or_default(); + let timestamp_ns: u64 = + match columns.get::, _>(i, "event_time_microseconds") { + Ok(dt) => dt.with_timezone(&Local).timestamp_nanos_opt().unwrap_or(0) as u64, + Err(e) => { + log::warn!( + "Perfetto: text_log row {} event_time_microseconds: {}", + i, + e + ); + continue; + } + }; + + let track_uuid = *level_uuids.entry(level.clone()).or_insert_with(|| { + let uuid = self.alloc_uuid(); + self.add_child_track(uuid, process_uuid, &level); + uuid + }); + + let label = if message.len() > 80 { + format!("{}...", &message[..80]) + } else { + message.clone() + }; + + let annotations = vec![ + Self::make_annotation_str("level", &level), + Self::make_annotation_str("logger", &logger_name), + Self::make_annotation_str("message", &message), + ]; + + self.add_instant(track_uuid, &label, timestamp_ns, annotations); + } + } + + fn alloc_intern_id(&mut self) -> u64 { + let id = self.next_intern_id; + self.next_intern_id += 1; + id + } + + // Add CPU/Real/Memory stack trace samples as StreamingProfilePacket. + // + // Perfetto profiling timeline pitfalls (hard-won lessons): + // - Clock 128 is sequence-scoped: a ClockSnapshot on seq 1 does NOT help seq 2+. + // - Built-in clocks (e.g. BOOTTIME=6) also fail on non-main sequences in practice. + // - SEQ_INCREMENTAL_STATE_CLEARED nukes clock mappings on the sequence — never + // use it on the main sequence after the ClockSnapshot. + // - StreamingProfilePacket timestamps come from ThreadDescriptor.reference_timestamp_us + // + timestamp_delta_us, NOT from TracePacket.timestamp. If reference_timestamp_us + // is unset, all samples land at time 0. + // - Samples go into cpu_profile_stack_sample table, not perf_sample. + // + // The working approach: each trace type gets its own sequence with a ThreadDescriptor + // that carries reference_timestamp_us (microseconds). No clock_id needed on the + // packets — timing is entirely from reference_timestamp_us + deltas. + pub fn add_stack_traces(&mut self, columns: &Columns) { + if columns.row_count() == 0 { + return; + } + + // Collect samples grouped by trace_type + struct Sample { + callstack_iid: u64, + timestamp_us: i64, + } + let mut samples_by_type: HashMap> = HashMap::new(); + + // Interning accumulators for this batch + let mut interned_strings: Vec = Vec::new(); + let mut interned_frames: Vec = Vec::new(); + let mut interned_callstacks: Vec = Vec::new(); + + let mapping_iid = self.alloc_intern_id(); + + for i in 0..columns.row_count() { + let trace_type: String = columns.get(i, "trace_type").unwrap_or_default(); + let stack: Vec = columns.get(i, "stack").unwrap_or_default(); + + if stack.is_empty() { + continue; + } + + let timestamp_us: i64 = + match columns.get::, _>(i, "event_time_microseconds") { + Ok(dt) => dt.with_timezone(&Local).timestamp_micros(), + Err(e) => { + log::warn!( + "Perfetto: stack trace row {} event_time_microseconds: {}", + i, + e + ); + continue; + } + }; + + // Intern each frame in the stack + let mut frame_ids = Vec::with_capacity(stack.len()); + for func_name in &stack { + let func_iid = *self + .function_name_iids + .entry(func_name.clone()) + .or_insert_with(|| { + let iid = self.next_intern_id; + self.next_intern_id += 1; + let mut is = InternedString::new(); + is.iid = Some(iid); + is.str = Some(func_name.as_bytes().to_vec()); + interned_strings.push(is); + iid + }); + + let frame_key = (func_iid, mapping_iid); + let frame_iid = *self.frame_iids.entry(frame_key).or_insert_with(|| { + let iid = self.next_intern_id; + self.next_intern_id += 1; + let mut f = Frame::new(); + f.iid = Some(iid); + f.function_name_id = Some(func_iid); + f.mapping_id = Some(mapping_iid); + interned_frames.push(f); + iid + }); + + frame_ids.push(frame_iid); + } + + let callstack_iid = + *self + .callstack_iids + .entry(frame_ids.clone()) + .or_insert_with(|| { + let iid = self.next_intern_id; + self.next_intern_id += 1; + let mut cs = Callstack::new(); + cs.iid = Some(iid); + cs.frame_ids = frame_ids; + interned_callstacks.push(cs); + iid + }); + + samples_by_type.entry(trace_type).or_default().push(Sample { + callstack_iid, + timestamp_us, + }); + } + + // Build one dummy mapping + let mut mapping = Mapping::new(); + mapping.iid = Some(mapping_iid); + + // Each trace_type gets its own sequence with a dedicated ThreadDescriptor. + // Sample timestamps come from ThreadDescriptor.reference_timestamp_us + deltas, + // so profiling packets don't need clock_id/timestamp (avoids sequence-scoped + // clock 128 resolution issues on non-main sequences). + for (trace_type, samples) in &samples_by_type { + if samples.is_empty() { + continue; + } + + let seq_id = self.next_sequence_id; + self.next_sequence_id += 1; + let fake_tid = seq_id as i32; + + let mut td = PerfettoThreadDescriptor::new(); + td.pid = Some(1); + td.tid = Some(fake_tid); + td.thread_name = Some(format!("{} Samples", trace_type)); + td.reference_timestamp_us = Some(samples[0].timestamp_us); + + let mut desc_pkt = TracePacket::new(); + desc_pkt.set_trusted_packet_sequence_id(seq_id); + desc_pkt.sequence_flags = Some(1 | 2); + desc_pkt.trusted_pid = Some(1); + desc_pkt.data = Some(Data::ThreadDescriptor(td)); + self.packets.push(desc_pkt); + + let mut callstack_iids = Vec::with_capacity(samples.len()); + let mut timestamp_deltas = Vec::with_capacity(samples.len()); + + let mut prev_us = samples[0].timestamp_us; + for (idx, s) in samples.iter().enumerate() { + callstack_iids.push(s.callstack_iid); + if idx == 0 { + timestamp_deltas.push(0); + } else { + timestamp_deltas.push(s.timestamp_us - prev_us); + prev_us = s.timestamp_us; + } + } + + let mut spp = StreamingProfilePacket::new(); + spp.callstack_iid = callstack_iids; + spp.timestamp_delta_us = timestamp_deltas; + + let mut interned_data = InternedData::new(); + interned_data.function_names = interned_strings.clone(); + interned_data.frames = interned_frames.clone(); + interned_data.callstacks = interned_callstacks.clone(); + interned_data.mappings = vec![mapping.clone()]; + + let mut pkt = TracePacket::new(); + pkt.set_trusted_packet_sequence_id(seq_id); + pkt.sequence_flags = Some(2); + pkt.trusted_pid = Some(1); + pkt.interned_data = MessageField::some(interned_data); + pkt.data = Some(Data::StreamingProfilePacket(spp)); + self.packets.push(pkt); + } + } + + /// Build a ClockSnapshot mapping sequence-scoped clock 128 → BOOTTIME. + /// Both at timestamp 0 with 1ns multiplier, so raw nanosecond values pass through as-is. + fn make_clock_snapshot() -> ClockSnapshot { + let mut cs = ClockSnapshot::new(); + let mut unixtime_clock = Clock::new(); + unixtime_clock.clock_id = Some(CLOCK_ID_UNIXTIME); + unixtime_clock.timestamp = Some(0); + unixtime_clock.unit_multiplier_ns = Some(1); + unixtime_clock.is_incremental = Some(false); + let mut boottime_clock = Clock::new(); + boottime_clock.clock_id = Some(6); // BUILTIN_CLOCK_BOOTTIME + boottime_clock.timestamp = Some(0); + boottime_clock.unit_multiplier_ns = Some(1); + boottime_clock.is_incremental = Some(false); + cs.clocks = vec![unixtime_clock, boottime_clock]; + cs + } + + pub fn build(self) -> Vec { + // ClockSnapshot with timestamp=0 in its own clock (self-referencing). + // The trace processor resolves this specially for ClockSnapshot packets, + // placing it at the very start of the trace (time 0). + let cs = Self::make_clock_snapshot(); + let mut cs_pkt = TracePacket::new(); + cs_pkt.set_trusted_packet_sequence_id(SEQUENCE_ID); + cs_pkt.sequence_flags = Some(1 | 2); + cs_pkt.timestamp = Some(0); + cs_pkt.timestamp_clock_id = Some(CLOCK_ID_UNIXTIME); + cs_pkt.data = Some(Data::ClockSnapshot(cs)); + + let mut trace = Trace::new(); + trace.packet = Vec::with_capacity(self.packets.len() + 1); + trace.packet.push(cs_pkt); + trace.packet.extend(self.packets); + trace.write_to_bytes().unwrap_or_default() + } +} + +pub struct PerfettoServer { + trace_data: Arc>>>>, + #[allow(dead_code)] + server_thread: Option>, +} + +impl PerfettoServer { + pub fn new() -> Self { + let trace_data: Arc>>>> = Arc::new(Mutex::new(None)); + let trace_data_clone = trace_data.clone(); + + let server_thread = std::thread::spawn(move || { + let server = match tiny_http::Server::http("127.0.0.1:9001") { + Ok(s) => s, + Err(e) => { + log::error!("Failed to start Perfetto HTTP server on port 9001: {}", e); + return; + } + }; + log::info!("Perfetto HTTP server listening on port 9001"); + + for request in server.incoming_requests() { + let url = request.url().to_string(); + log::trace!("Perfetto HTTP request: {} {}", request.method(), url); + + if request.method() == &tiny_http::Method::Options { + let response = tiny_http::Response::empty(200) + .with_header( + "Access-Control-Allow-Origin: *" + .parse::() + .unwrap(), + ) + .with_header( + "Access-Control-Allow-Methods: GET, POST, OPTIONS" + .parse::() + .unwrap(), + ) + .with_header( + "Access-Control-Allow-Headers: *" + .parse::() + .unwrap(), + ); + request.respond(response).ok(); + continue; + } + + if url == "/trace" { + let data: Option>> = trace_data_clone.lock().unwrap().clone(); + match data { + Some(bytes) => { + let response = tiny_http::Response::from_data((*bytes).clone()) + .with_header( + "Content-Type: application/octet-stream" + .parse::() + .unwrap(), + ) + .with_header( + "Access-Control-Allow-Origin: *" + .parse::() + .unwrap(), + ); + request.respond(response).ok(); + } + None => { + let response = + tiny_http::Response::from_string("No trace data available") + .with_status_code(404) + .with_header( + "Access-Control-Allow-Origin: *" + .parse::() + .unwrap(), + ); + request.respond(response).ok(); + } + } + } else { + let response = tiny_http::Response::from_string("Not Found") + .with_status_code(404) + .with_header( + "Access-Control-Allow-Origin: *" + .parse::() + .unwrap(), + ); + request.respond(response).ok(); + } + } + }); + + PerfettoServer { + trace_data, + server_thread: Some(server_thread), + } + } + + pub fn set_trace(&self, data: Vec) { + *self.trace_data.lock().unwrap() = Some(Arc::new(data)); + } + + pub fn get_perfetto_url(&self) -> String { + "https://ui.perfetto.dev/#!/?url=http://127.0.0.1:9001/trace".to_string() + } +} diff --git a/src/interpreter/worker.rs b/src/interpreter/worker.rs index 6859b7a..2eef93b 100644 --- a/src/interpreter/worker.rs +++ b/src/interpreter/worker.rs @@ -1,9 +1,10 @@ use crate::{ common::{RelativeDateTime, Stopwatch}, interpreter::{ - ContextArc, + ContextArc, Query, clickhouse::{Columns, TextLogArguments, TraceType}, flamegraph, + perfetto::PerfettoTraceBuilder, }, pastila, utils::{highlight_sql, share_graph}, @@ -78,6 +79,13 @@ pub enum Event { AsynchronousInserts(String, String), // (content to share via pastila) ShareLogs(String), + // (queries, query_ids, start, end) + PerfettoExport( + Vec, + Vec, + DateTime, + Option>, + ), } impl Event { @@ -105,6 +113,7 @@ impl Event { Event::TableParts(..) => "TableParts".to_string(), Event::AsynchronousInserts(..) => "AsynchronousInserts".to_string(), Event::ShareLogs(..) => "ShareLogs".to_string(), + Event::PerfettoExport(..) => "PerfettoExport".to_string(), } } } @@ -763,6 +772,98 @@ async fn process_event(context: ContextArc, event: Event, need_clear: &mut bool) crate::utils::open_url_command(&url_clone).status()?; } + Event::PerfettoExport(queries, query_ids, start, end) => { + let mut builder = PerfettoTraceBuilder::new(); + + for q in &queries { + log::info!( + "Perfetto query: id={} start_ns={} end_ns={} elapsed={}", + q.query_id, + q.query_start_time_microseconds + .timestamp_nanos_opt() + .unwrap_or(0), + q.query_end_time_microseconds + .timestamp_nanos_opt() + .unwrap_or(0), + q.elapsed, + ); + } + builder.add_queries(&queries); + + let end_time = end.unwrap_or_else(Local::now) + chrono::TimeDelta::seconds(1); + + let (otel, trace_log, metrics, parts, threads, stack_traces, text_logs) = tokio::join!( + clickhouse.get_otel_spans_for_perfetto(&query_ids, start, end_time), + clickhouse.get_trace_log_counters_for_perfetto(&query_ids, start, end_time), + clickhouse.get_query_metrics_for_perfetto(&query_ids, start, end_time), + clickhouse.get_part_log_for_perfetto(&query_ids, start, end_time), + clickhouse.get_query_thread_log_for_perfetto(&query_ids, start, end_time), + clickhouse.get_stack_traces_for_perfetto(&query_ids, start, end_time), + clickhouse.get_text_log_for_perfetto(&query_ids, start, end_time), + ); + + match otel { + Ok(block) => builder.add_otel_spans(&block), + Err(e) => log::warn!("Failed to fetch opentelemetry_span_log: {}", e), + } + match trace_log { + Ok(block) => builder.add_trace_log_counters(&block), + Err(e) => log::warn!("Failed to fetch trace_log counters: {}", e), + } + match metrics { + Ok(rows) => builder.add_query_metrics(&rows), + Err(e) => log::warn!("Failed to fetch query_metric_log: {}", e), + } + match parts { + Ok(block) => builder.add_part_log(&block), + Err(e) => log::warn!("Failed to fetch part_log: {}", e), + } + match threads { + Ok(block) => builder.add_query_thread_log(&block), + Err(e) => log::warn!("Failed to fetch query_thread_log: {}", e), + } + match stack_traces { + Ok(block) => builder.add_stack_traces(&block), + Err(e) => log::warn!("Failed to fetch trace_log stack traces: {}", e), + } + match text_logs { + Ok(block) => builder.add_text_logs(&block), + Err(e) => log::warn!("Failed to fetch text_log: {}", e), + } + + let data = builder.build(); + let data_len = data.len(); + if let Err(e) = std::fs::write("/tmp/chdig_perfetto.pftrace", &data) { + log::warn!("Failed to save debug trace: {}", e); + } else { + log::info!( + "Saved debug trace to /tmp/chdig_perfetto.pftrace ({} bytes)", + data_len + ); + } + + let server = context.lock().unwrap().get_or_start_perfetto_server(); + server.set_trace(data); + let url = server.get_perfetto_url(); + + let url_clone = url.clone(); + cb_sink + .send(Box::new(move |siv: &mut cursive::Cursive| { + siv.add_layer( + views::Dialog::text(format!( + "Perfetto trace exported ({} bytes)\n\nOpening: {}", + data_len, url + )) + .title("Perfetto Export") + .button("Close", |siv| { + siv.pop_layer(); + }), + ); + })) + .map_err(|_| anyhow!("Cannot send message to UI"))?; + + crate::utils::open_url_command(&url_clone).status()?; + } } return Ok(()); diff --git a/src/view/queries_view.rs b/src/view/queries_view.rs index 66c5fda..65339c8 100644 --- a/src/view/queries_view.rs +++ b/src/view/queries_view.rs @@ -706,6 +706,31 @@ impl QueriesView { Ok(Some(EventResult::consumed())) } + fn action_export_perfetto(&mut self) -> Result> { + let (query_ids, min_query_start_microseconds, max_query_end_microseconds) = + self.get_query_ids()?; + + let query_ids_set: HashSet<&String> = HashSet::from_iter(query_ids.iter()); + let queries: Vec<_> = self + .items + .values() + .filter(|q| query_ids_set.contains(&q.query_id)) + .cloned() + .collect(); + + let mut context_locked = self.context.lock().unwrap(); + context_locked.worker.send( + true, + WorkerEvent::PerfettoExport( + queries, + query_ids, + min_query_start_microseconds, + max_query_end_microseconds, + ), + ); + Ok(Some(EventResult::consumed())) + } + fn action_increase_limit(&mut self) -> Result> { self.update_limit(true); self.bg_runner.schedule(); @@ -1060,6 +1085,7 @@ impl QueriesView { add_action!(context, &mut event_view, "Query jemalloc sample flamegraph", action_show_flamegraph(true, Some(TraceType::JemallocSample))); add_action!(context, &mut event_view, "Query MemoryAllocatedWithoutCheck flamegraph", action_show_flamegraph(true, Some(TraceType::MemoryAllocatedWithoutCheck))); add_action!(context, &mut event_view, "Query events flamegraph", action_show_flamegraph(true, Some(TraceType::ProfileEvent))); + add_action!(context, &mut event_view, "Export to Perfetto", 'X', action_export_perfetto); add_action!(context, &mut event_view, "Edit query and execute", Event::AltChar('E'), action_edit_query_and_execute); add_action!(context, &mut event_view, "Show query", 'S', action_show_query); add_action!(context, &mut event_view, "Copy query to clipboard", 'y', action_copy_query);