diff --git a/src/interpreter/context.rs b/src/interpreter/context.rs index 396ef5e..e26a6f6 100644 --- a/src/interpreter/context.rs +++ b/src/interpreter/context.rs @@ -1,6 +1,7 @@ use crate::actions::ActionDescription; use crate::interpreter::{ ClickHouse, Worker, + debug_metrics::DebugMetrics, options::{ChDigOptions, ChDigViews}, perfetto::PerfettoServer, }; @@ -52,6 +53,8 @@ pub struct Context { pub queries_filter: Arc>, pub queries_limit: Arc>, + + pub debug_metrics: Arc, } impl Context { @@ -61,6 +64,7 @@ impl Context { cb_sink: cursive::CbSink, ) -> Result { let server_version = clickhouse.version(); + let debug_metrics = DebugMetrics::new(); let worker = Worker::new(); let background_runner_cv = Arc::new((Mutex::new(()), Condvar::new())); let background_runner_force = Arc::new(atomic::AtomicBool::new(false)); @@ -71,6 +75,10 @@ impl Context { let queries_filter = Arc::new(Mutex::new(String::new())); let queries_limit = Arc::new(Mutex::new(options.view.queries_limit)); + // Metrics are always collected; display is toggled with `!`. The refresh thread + // sleeps when hidden, so this is free when unused. + debug_metrics.spawn_refresh(cb_sink.clone(), std::time::Duration::from_millis(500)); + let context = Arc::new(Mutex::new(Context { options, clickhouse, @@ -91,6 +99,7 @@ impl Context { perfetto_server: None, queries_filter, queries_limit, + debug_metrics, })); context.lock().unwrap().worker.start(context.clone()); diff --git a/src/interpreter/debug_metrics.rs b/src/interpreter/debug_metrics.rs new file mode 100644 index 0000000..4184bb1 --- /dev/null +++ b/src/interpreter/debug_metrics.rs @@ -0,0 +1,293 @@ +//! Internal chdig observability counters, rendered into the status bar when toggled with `!`. +//! +//! Metrics are recorded unconditionally — the cost is two atomic ops per worker event plus a +//! lock-and-push on a ~256-entry ring buffer. Display is gated on a toggle flag: when off +//! the refresh thread sleeps and does not ping the event loop, so there is no UI cost either. +//! +//! Picks: +//! - Nearest-rank percentile over a fixed-size [`Histogram`] (O(N log N) per snapshot, +//! N≤256). Simpler than an online estimator (t-digest, HDR histogram) and accurate enough +//! for a status bar at a few Hz. +//! - Event-loop latency is measured as a `cb_sink` round-trip, not frame render time. +//! Cursive does not expose per-frame hooks; round-trip drift is the quantity the user +//! actually perceives as "responsiveness". Tracked as a histogram (not a single latest +//! value) so transient spikes don't get hidden behind whatever the most recent ping saw. +//! - [`InFlightGuard`] is an RAII guard so early returns and panics in the worker cannot +//! leak the counter. + +use std::collections::VecDeque; +use std::fmt; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::{Duration, Instant}; + +use cursive::CbSink; + +const SAMPLES_CAPACITY: usize = 256; + +/// Fixed-capacity ring-buffer histogram over `Duration` samples. Thread-safe via an +/// internal `Mutex` — contention is negligible at the rates we record (≤ a few Hz). +pub struct Histogram { + samples: Mutex>, +} + +impl Histogram { + fn new() -> Self { + Histogram { + samples: Mutex::new(VecDeque::with_capacity(SAMPLES_CAPACITY)), + } + } + + pub fn record(&self, d: Duration) { + let mut s = self.samples.lock().unwrap(); + if s.len() == SAMPLES_CAPACITY { + s.pop_front(); + } + s.push_back(d); + } + + /// Nearest-rank (p50, p90, p99). Returns zeros on an empty histogram. + pub fn percentiles(&self) -> (Duration, Duration, Duration) { + let s = self.samples.lock().unwrap(); + if s.is_empty() { + return (Duration::ZERO, Duration::ZERO, Duration::ZERO); + } + let mut v: Vec = s.iter().copied().collect(); + v.sort_unstable(); + (percentile(&v, 50), percentile(&v, 90), percentile(&v, 99)) + } +} + +pub struct DebugMetrics { + shown: AtomicBool, + in_flight: AtomicU64, + /// `cb_sink` round-trip latency — proxy for "how responsive does chdig feel". + ui_lag: Histogram, + /// Per-worker-event processing duration (a worker event is one ClickHouse query / + /// action chdig issued). + event: Histogram, +} + +#[must_use = "Drop decrements the in-flight counter; hold this for the duration of work"] +pub struct InFlightGuard(Arc); + +impl Drop for InFlightGuard { + fn drop(&mut self) { + self.0.in_flight.fetch_sub(1, Ordering::Relaxed); + } +} + +impl DebugMetrics { + pub fn new() -> Arc { + Arc::new(DebugMetrics { + shown: AtomicBool::new(false), + in_flight: AtomicU64::new(0), + ui_lag: Histogram::new(), + event: Histogram::new(), + }) + } + + pub fn is_shown(&self) -> bool { + self.shown.load(Ordering::Relaxed) + } + + /// Flips visibility and returns the new state. + pub fn toggle_shown(&self) -> bool { + !self.shown.fetch_xor(true, Ordering::Relaxed) + } + + pub fn track_in_flight(self: &Arc) -> InFlightGuard { + self.in_flight.fetch_add(1, Ordering::Relaxed); + InFlightGuard(Arc::clone(self)) + } + + pub fn record_event(&self, d: Duration) { + self.event.record(d); + } + + pub fn record_ui_lag(&self, d: Duration) { + self.ui_lag.record(d); + } + + pub fn snapshot(&self) -> MetricsSnapshot { + let (lag_p50, lag_p90, lag_p99) = self.ui_lag.percentiles(); + let (evt_p50, evt_p90, evt_p99) = self.event.percentiles(); + MetricsSnapshot { + in_flight: self.in_flight.load(Ordering::Relaxed), + lag_p50, + lag_p90, + lag_p99, + evt_p50, + evt_p90, + evt_p99, + } + } + + /// Spawn a background thread that, *while visibility is on*, probes event-loop lag + /// via a `cb_sink` round-trip and pushes the latest snapshot into the status bar. + /// When visibility is off the thread sleeps, so the hidden cost is just a dormant + /// thread (no cb_sink traffic, no redraws). Exits when the sink is closed. + pub fn spawn_refresh(self: &Arc, cb_sink: CbSink, interval: Duration) { + let metrics = Arc::clone(self); + thread::Builder::new() + .name("chdig-debug-metrics".into()) + .spawn(move || refresh_loop(metrics, cb_sink, interval)) + .expect("spawn chdig-debug-metrics"); + } +} + +fn refresh_loop(metrics: Arc, cb_sink: CbSink, interval: Duration) { + loop { + thread::sleep(interval); + if !metrics.is_shown() { + continue; + } + let sent_at = Instant::now(); + let metrics = Arc::clone(&metrics); + let send_result = cb_sink.send(Box::new(move |siv: &mut cursive::Cursive| { + metrics.record_ui_lag(sent_at.elapsed()); + let text = metrics.snapshot().to_string(); + crate::view::Navigation::set_statusbar_debug(siv, text); + })); + if send_result.is_err() { + break; + } + } +} + +#[derive(Default, Clone, Copy)] +pub struct MetricsSnapshot { + pub in_flight: u64, + pub lag_p50: Duration, + pub lag_p90: Duration, + pub lag_p99: Duration, + pub evt_p50: Duration, + pub evt_p90: Duration, + pub evt_p99: Duration, +} + +impl fmt::Display for MetricsSnapshot { + /// Status-bar line; written to be readable without a legend: + /// * `UI lag` – cb_sink round-trip percentiles (event loop responsiveness) + /// * `Active` – worker events currently being processed + /// * `Event` – worker-event processing-time percentiles (one per ClickHouse query) + /// + /// All triples are `p50/p90/p99`, nearest-rank over the last [`SAMPLES_CAPACITY`] + /// samples of each kind. + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "UI lag p50/p90/p99: {}/{}/{} ms Active: {} Event p50/p90/p99: {}/{}/{} ms", + self.lag_p50.as_millis(), + self.lag_p90.as_millis(), + self.lag_p99.as_millis(), + self.in_flight, + self.evt_p50.as_millis(), + self.evt_p90.as_millis(), + self.evt_p99.as_millis(), + ) + } +} + +/// Nearest-rank percentile; q ∈ 0..=100. Undefined on an empty slice — callers must guard. +fn percentile(sorted: &[T], q: u32) -> T { + debug_assert!(q <= 100); + debug_assert!(!sorted.is_empty()); + let rank = (q as usize * sorted.len()).div_ceil(100).max(1); + sorted[rank - 1] +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn percentile_integer_ranks() { + let v: Vec = (1..=10).collect(); + assert_eq!(percentile(&v, 50), 5); + assert_eq!(percentile(&v, 90), 9); + assert_eq!(percentile(&v, 99), 10); + assert_eq!(percentile(&v, 100), 10); + } + + #[test] + fn percentile_single_element() { + assert_eq!(percentile(&[42u64], 50), 42); + assert_eq!(percentile(&[42u64], 99), 42); + } + + #[test] + fn histogram_caps_at_capacity() { + let h = Histogram::new(); + // Feed monotonic samples well past capacity and assert that the p99 reflects + // only the most recent SAMPLES_CAPACITY values (earliest ones were evicted). + let total = SAMPLES_CAPACITY + 50; + for i in 0..total { + h.record(Duration::from_millis(i as u64)); + } + let (_p50, _p90, p99) = h.percentiles(); + // Oldest retained = total - SAMPLES_CAPACITY = 50; newest = total - 1 = 305. + // Nearest-rank p99: rank = ceil(99 * 256 / 100) = 254; value = 50 + (254-1) = 303. + assert_eq!(p99, Duration::from_millis(303)); + } + + #[test] + fn histogram_empty_returns_zero() { + let h = Histogram::new(); + assert_eq!( + h.percentiles(), + (Duration::ZERO, Duration::ZERO, Duration::ZERO) + ); + } + + #[test] + fn ui_lag_and_event_are_independent() { + let m = DebugMetrics::new(); + m.record_ui_lag(Duration::from_millis(5)); + m.record_event(Duration::from_millis(500)); + let s = m.snapshot(); + assert_eq!(s.lag_p50, Duration::from_millis(5)); + assert_eq!(s.evt_p50, Duration::from_millis(500)); + } + + #[test] + fn in_flight_guard_is_raii() { + let m = DebugMetrics::new(); + assert_eq!(m.snapshot().in_flight, 0); + let g1 = m.track_in_flight(); + let g2 = m.track_in_flight(); + assert_eq!(m.snapshot().in_flight, 2); + drop(g1); + assert_eq!(m.snapshot().in_flight, 1); + drop(g2); + assert_eq!(m.snapshot().in_flight, 0); + } + + #[test] + fn toggle_shown_returns_new_state() { + let m = DebugMetrics::new(); + assert!(!m.is_shown()); + assert!(m.toggle_shown()); + assert!(m.is_shown()); + assert!(!m.toggle_shown()); + assert!(!m.is_shown()); + } + + #[test] + fn display_format_is_readable() { + let s = MetricsSnapshot { + in_flight: 3, + lag_p50: Duration::from_millis(1), + lag_p90: Duration::from_millis(4), + lag_p99: Duration::from_millis(12), + evt_p50: Duration::from_millis(12), + evt_p90: Duration::from_millis(87), + evt_p99: Duration::from_millis(420), + }; + let rendered = s.to_string(); + assert!(rendered.contains("UI lag p50/p90/p99: 1/4/12 ms")); + assert!(rendered.contains("Active: 3")); + assert!(rendered.contains("Event p50/p90/p99: 12/87/420 ms")); + } +} diff --git a/src/interpreter/mod.rs b/src/interpreter/mod.rs index 760b440..175037a 100644 --- a/src/interpreter/mod.rs +++ b/src/interpreter/mod.rs @@ -3,6 +3,7 @@ mod background_runner; pub mod clickhouse; mod clickhouse_quirks; mod context; +pub mod debug_metrics; mod query; mod worker; // only functions diff --git a/src/interpreter/worker.rs b/src/interpreter/worker.rs index e44465a..365bbcd 100644 --- a/src/interpreter/worker.rs +++ b/src/interpreter/worker.rs @@ -202,7 +202,7 @@ impl Worker { channel_created ); - // Simply ignore errors (queue is full, likely update interval is too short) + // Simply ignore errors (queue is full, likely update interval is too short). sender.try_send(event.clone()).unwrap_or_else(|e| { log::error!( "Cannot send event {:?}: {} (too low --delay-interval?)", @@ -247,6 +247,9 @@ async fn start_tokio(context: ContextArc, receiver: ReceiverArc) { update_status(&format!("Processing {}...", event.enum_key())); + let debug_metrics = context.lock().unwrap().debug_metrics.clone(); + // RAII: decrements on scope exit, including panic or early return paths. + let _in_flight = debug_metrics.track_in_flight(); let stopwatch = Stopwatch::start_new(); if let Err(err) = process_event(context.clone(), event.clone(), &mut need_clear).await { cb_sink @@ -290,9 +293,13 @@ async fn start_tokio(context: ContextArc, receiver: ReceiverArc) { // Ignore errors on exit .unwrap_or_default(); } - let elapsed_ms = stopwatch.elapsed_ms(); - let mut completion_status = - format!("Processing {} took {} ms.", event.enum_key(), elapsed_ms); + let elapsed = stopwatch.elapsed(); + debug_metrics.record_event(elapsed); + let mut completion_status = format!( + "Processing {} took {} ms.", + event.enum_key(), + elapsed.as_millis() + ); // It should not be reset, since delay_interval should be set to the maximum service // query duration time. diff --git a/src/view/navigation.rs b/src/view/navigation.rs index eba5f78..587924a 100644 --- a/src/view/navigation.rs +++ b/src/view/navigation.rs @@ -19,6 +19,19 @@ use cursive::{ }; use cursive_flexi_logger_view::toggle_flexi_logger_debug_console; +fn toggle_debug_metrics(siv: &mut Cursive) { + let ctx = siv.user_data::().unwrap().clone(); + let metrics = ctx.lock().unwrap().debug_metrics.clone(); + let shown = metrics.toggle_shown(); + // Paint immediately on both transitions so the user sees the toggle take effect + // without waiting for the next refresh tick (and so stale numbers don't linger on hide). + if shown { + siv.set_statusbar_debug(metrics.snapshot().to_string()); + } else { + siv.set_statusbar_debug(""); + } +} + fn make_menu_text() -> StyledString { let mut text = StyledString::new(); @@ -68,6 +81,7 @@ pub trait Navigation { fn set_statusbar_version(&mut self, main_content: impl Into>); fn set_statusbar_content(&mut self, content: impl Into>); fn set_statusbar_connection(&mut self, content: impl Into>); + fn set_statusbar_debug(&mut self, content: impl Into>); // TODO: move into separate trait fn call_on_name_or_render_error(&mut self, name: &str, callback: F) @@ -224,6 +238,8 @@ impl Navigation for Cursive { .child(TextView::new("").with_name("is_paused")) // Align status to the right .child(DummyView.full_width()) + // Empty until `!` toggles it — no visual cost when hidden. + .child(TextView::new("").with_name("debug_status")) .child(TextView::new("").with_name("status")) .child(DummyView.fixed_width(1)) .child(TextView::new("").with_name("connection")) @@ -300,6 +316,7 @@ impl Navigation for Cursive { toggle_flexi_logger_debug_console, ); } + context.add_global_action(self, "Toggle debug metrics", '!', toggle_debug_metrics); context.add_global_action(self, "Back/Quit", Key::Esc, |siv| siv.pop_ui(false)); context.add_global_action(self, "Back/Quit", 'q', |siv| siv.pop_ui(true)); context.add_global_action(self, "Quit forcefully", 'Q', |siv| siv.quit()); @@ -1268,6 +1285,23 @@ impl Navigation for Cursive { .expect("connection"); } + fn set_statusbar_debug(&mut self, content: impl Into>) { + self.call_on_name("debug_status", |text_view: &mut TextView| { + let spanned: SpannedString