diff --git a/Cargo.lock b/Cargo.lock index 4a327da..9d78fda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4559,6 +4559,7 @@ dependencies = [ "serde", "serde_json", "silver_common_macros", + "tracing", ] [[package]] diff --git a/crates/bin/Cargo.toml b/crates/bin/Cargo.toml index 65e9307..195053c 100644 --- a/crates/bin/Cargo.toml +++ b/crates/bin/Cargo.toml @@ -31,3 +31,7 @@ workspace = true [features] alloc-profile = ["silver_common/alloc-profile"] +# #[perf] collection: per-function instructions-retired + CPU cycles on +# perf-{fn} queues (dev; needs `sudo sysctl kernel.perf_event_paranoid=2`). +# Surfer picks the queues up when present; non-perf builds are unaffected. +perf = ["silver_common/perf"] diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index b5bbc38..83e08d7 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -38,6 +38,8 @@ backtrace = { workspace = true, optional = true } [features] alloc-profile = ["dep:backtrace"] +# Forwards #[perf] collection (rdpmc instr+cycle counting) to silver_metrics. +perf = ["silver_metrics/perf"] test-util = [] [build-dependencies] diff --git a/crates/common/src/spine/tcache/consumer.rs b/crates/common/src/spine/tcache/consumer.rs index bcc483f..caf10f7 100644 --- a/crates/common/src/spine/tcache/consumer.rs +++ b/crates/common/src/spine/tcache/consumer.rs @@ -260,6 +260,11 @@ pub(super) struct Buckets { bucket_mask: u64, // max difference between head and tail, before 'forced' eviction lag_threshold: u64, + // Out-of-order acquire lookback: the tail never advances within this + // many seqs of the newest acquire's bucket, so late acquires up to + // this far behind still land at or above the tail. 10% of capacity, + // rounded up to a bucket. + guard: u64, } impl Buckets { @@ -277,24 +282,35 @@ impl Buckets { bucket_shift: bucket_size.trailing_zeros() as u64, bucket_mask: !(bucket_size - 1), lag_threshold: lag_threshold(cache_capacity as u32), + guard: (cache_capacity / 10).next_multiple_of(bucket_size).max(bucket_size), } } fn acquire(&mut self, seq: u64) { + // Out-of-order acquire beyond the guard window: the producer may + // already be reclaiming this slot, and bucket_index aliases behind + // the tail onto in-window buckets — counting it would corrupt a + // live bucket (the matching release is dropped below tail). Skip; + // the read surfaces as StaleSeq at buffer() time. + if self.tail_seq != u64::MAX && seq < self.tail_seq { + tracing::warn!(seq, tail = self.tail_seq, head = self.head_seq, "acquire below tail"); + return; + } + let bucket_idx = self.bucket_index(seq); self.buckets[bucket_idx] += 1; self.head_seq = self.head_seq.max(seq); if self.tail_seq == u64::MAX { - self.tail_seq = self.bucket_start_seq(seq); + self.tail_seq = self.bucket_start_seq(seq).saturating_sub(self.guard); } - // rollup tail for completed buckets - maintain at least 1 bucket between - // head and tail to avoid any boundary cases where we might roll the tail before - // an entry is acquired. + // Rollup tail for completed buckets — keep `guard` seqs of slack + // below the newest acquire's bucket so bounded out-of-order + // acquires never land behind the tail. let head_bucket_seq = self.bucket_start_seq(seq); - while head_bucket_seq > (self.tail_seq + self.bucket_size) { + while head_bucket_seq > (self.tail_seq + self.guard) { let tail_bucket = self.bucket_index(self.tail_seq); if self.head_seq - self.tail_seq > self.lag_threshold { tracing::warn!( @@ -356,11 +372,58 @@ mod tests { fn buckets_acquire_initialises_tail_to_bucket_boundary() { let mut b = Buckets::new(64, 1024); b.acquire(100); - // 100 lives in bucket 1 (64..128). Tail is bucket-aligned to 64. - assert_eq!(b.tail_seq, 64); + // 100 lives in bucket 1 (64..128); tail starts `guard` (10% of + // 1024 → 128) below its bucket start, saturating at 0. + assert_eq!(b.tail_seq, 0); assert_eq!(b.head_seq, 100); } + /// A late acquire within the guard window (10% of capacity behind the + /// newest acquire) must land at or above the tail and be tracked. + #[test] + fn buckets_out_of_order_acquire_within_guard() { + // guard = (1024/10).next_multiple_of(64) = 128. + let mut b = Buckets::new(64, 1024); + b.acquire(0); + b.release(0, ""); + b.acquire(300); + // Tail rolled over the released bucket but held 128 back from + // bucket_start(300) = 256. + assert_eq!(b.tail_seq, 128); + // Late low acquire inside the window: tracked, not dropped. + b.acquire(200); + assert!(b.tail_seq <= 200); + b.release(200, ""); + b.release(300, ""); + } + + /// An acquire below the tail (out-of-order beyond the guard window) + /// must not be counted: bucket_index aliases behind-tail seqs onto + /// live buckets and the matching release is dropped, so counting + /// would permanently stall the tail on a phantom holder. + #[test] + fn buckets_acquire_below_tail_dropped_without_corruption() { + let mut b = Buckets::new(64, 1024); // guard 128, lag threshold 921 + b.acquire(0); + b.release(0, ""); + b.acquire(1000); // forces tail well past bucket 0 (tail = 832) + let tail = b.tail_seq; + assert!(tail >= 128); + + // Below-tail acquire: dropped (warn), tail untouched. + b.acquire(1); + assert_eq!(b.tail_seq, tail); + // Its release is the existing below-tail no-op. + b.release(1, ""); + + // seq 1 aliases onto the same ring bucket as seqs 1024..1088; had + // the acquire been counted, the tail would stall there forever + // (head - tail stays under the lag threshold, so no force-evict). + b.release(1000, ""); + b.acquire(1600); + assert!(b.tail_seq > 1024, "tail stalled at {} on a phantom holder", b.tail_seq); + } + #[test] fn buckets_release_advances_tail_when_head_is_ahead() { let mut b = Buckets::new(64, 1024); diff --git a/crates/common_macros/src/lib.rs b/crates/common_macros/src/lib.rs index 2829bfd..f120edd 100644 --- a/crates/common_macros/src/lib.rs +++ b/crates/common_macros/src/lib.rs @@ -1,13 +1,17 @@ -//! Proc-macro support for the silver crates. Currently exposes -//! `#[timed]`, which wraps a function body in a thread-local flux -//! `Timer` so processing time is emitted to a per-function shmem -//! queue. Storage and Drop-side recording live in -//! `silver_metrics`; this crate is only the attribute-macro -//! glue. +//! Proc-macro support for the silver crates. Exposes `#[timed]` (wraps a +//! function body in a thread-local flux `Timer` so processing time is +//! emitted to a per-function shmem queue) and `#[perf]` (same shape, but +//! records instructions retired + CPU cycles via rdpmc onto `perf-{fn}` +//! queues). Storage and Drop-side recording live in `silver_metrics`; +//! this crate is only the attribute-macro glue. use proc_macro::TokenStream; use quote::quote; -use syn::{ItemFn, LitStr, parse_macro_input}; +use syn::{ + Ident, ItemFn, LitInt, LitStr, Token, + parse::{Parse, ParseStream}, + parse_macro_input, +}; /// Wrap a function body in a per-function thread-local flux `Timer`. /// @@ -47,3 +51,100 @@ pub fn timed(attr: TokenStream, item: TokenStream) -> TokenStream { expanded.into() } + +/// Arguments to `#[perf]`: an optional name literal, then an optional +/// `sample = N`. +struct PerfArgs { + name: Option, + sample: u64, +} + +impl Parse for PerfArgs { + fn parse(input: ParseStream) -> syn::Result { + let mut name = None; + let mut sample = 1u64; + if input.peek(LitStr) { + name = Some(input.parse()?); + if input.peek(Token![,]) { + input.parse::()?; + } + } + if !input.is_empty() { + let ident: Ident = input.parse()?; + if ident != "sample" { + return Err(syn::Error::new(ident.span(), "expected `sample = N`")); + } + input.parse::()?; + let lit: LitInt = input.parse()?; + sample = lit.base10_parse()?; + if sample == 0 { + return Err(syn::Error::new(lit.span(), "sample must be >= 1")); + } + } + Ok(Self { name, sample }) + } +} + +/// Record instructions retired + CPU cycles for a function via a +/// Drop guard, emitted per call onto the `perf-{name}` shmem queue. +/// +/// Naming matches `#[timed]`: default is +/// `concat!(module_path!(), "::", fn_name)`; pass a string literal to +/// override (`#[perf("custom_name")]`). +/// +/// `sample = N` measures one in N calls (per thread, via a +/// thread-local call counter) — use on hot paths where two rdpmc +/// reads per call is too much. Default 1 = every call. Combinable: +/// `#[perf("custom_name", sample = 1000)]`. +/// +/// Collection requires the `perf` feature on `silver_metrics` (rdpmc) +/// and `kernel.perf_event_paranoid <= 2` at runtime; otherwise the +/// guard is inert. +#[proc_macro_attribute] +pub fn perf(attr: TokenStream, item: TokenStream) -> TokenStream { + let input = parse_macro_input!(item as ItemFn); + let args = parse_macro_input!(attr as PerfArgs); + + let name_expr = match &args.name { + Some(lit) => { + let s = lit.value(); + quote! { #s } + } + None => { + let func_name_str = input.sig.ident.to_string(); + quote! { concat!(module_path!(), "::", #func_name_str) } + } + }; + + let ItemFn { attrs, vis, sig, block } = input; + + let guard = if args.sample <= 1 { + quote! { + let __perf_guard = ::silver_metrics::PerfGuard::new( + #name_expr, + ); + } + } else { + let sample = args.sample; + quote! { + ::std::thread_local! { + static __PERF_SKIP: ::core::cell::Cell = + const { ::core::cell::Cell::new(0) }; + } + let __perf_guard = __PERF_SKIP.with(|c| { + let n = c.get(); + c.set(n.wrapping_add(1)); + (n % #sample == 0).then(|| ::silver_metrics::PerfGuard::new(#name_expr)) + }); + } + }; + + let expanded = quote! { + #(#attrs)* #vis #sig { + #guard + #block + } + }; + + expanded.into() +} diff --git a/crates/metrics/Cargo.toml b/crates/metrics/Cargo.toml index ceb7865..abd284c 100644 --- a/crates/metrics/Cargo.toml +++ b/crates/metrics/Cargo.toml @@ -11,6 +11,17 @@ flux.workspace = true libc.workspace = true serde.workspace = true serde_json.workspace = true +tracing = { workspace = true, optional = true } [lints] workspace = true + +[features] +# #[perf] collection: per-function instructions-retired + CPU cycles via +# rdpmc (hw_counter), emitted on perf-{fn} queues. Needs +# kernel.perf_event_paranoid <= 2 at runtime (<= 1 to include kernel-mode +# work in the counts): +# sudo sysctl kernel.perf_event_paranoid=1 +# Disabled with a warn otherwise. `PerfSample` and the #[perf] macro +# compile regardless — collection alone is gated. +perf = ["dep:tracing"] diff --git a/crates/metrics/src/hw_counter.rs b/crates/metrics/src/hw_counter.rs new file mode 100644 index 0000000..27d362f --- /dev/null +++ b/crates/metrics/src/hw_counter.rs @@ -0,0 +1,302 @@ +//! Hardware counters (instructions retired, CPU cycles) via +//! `perf_event_open` + userspace rdpmc. +//! +//! Reads cost ~30 cycles. Kernel-mode work is counted when permitted +//! (`perf_event_paranoid` <= 1 or `CAP_PERFMON`) so syscall-heavy +//! functions measure fully; otherwise falls back to userspace-only +//! counting, which works unprivileged at `perf_event_paranoid` <= 2. +//! +//! perf binds pid=0 events to the *opening* thread — construct on the +//! thread being measured, never a spawner. +//! +//! ABI structs are defined locally: libc has no `perf_event_mmap_page` and we +//! need only a prefix of `perf_event_attr`. + +#[cfg(all(target_os = "linux", target_arch = "x86_64"))] +mod imp { + use std::{ + io::{Error, ErrorKind}, + sync::{ + Once, + atomic::{Ordering, compiler_fence}, + }, + }; + + use tracing::{info, warn}; + + const PERF_TYPE_HARDWARE: u32 = 0; + const PERF_COUNT_HW_CPU_CYCLES: u64 = 0; + const PERF_COUNT_HW_INSTRUCTIONS: u64 = 1; + const PERF_COUNT_HW_CACHE_MISSES: u64 = 3; + const PERF_COUNT_HW_BRANCH_MISSES: u64 = 5; + const PERF_FLAG_FD_CLOEXEC: libc::c_ulong = 8; + // perf_event_attr flag bits. + const ATTR_EXCLUDE_KERNEL: u64 = 1 << 5; + const ATTR_EXCLUDE_HV: u64 = 1 << 6; + // perf_event_mmap_page.capabilities bits. + const CAP_USER_RDPMC: u64 = 1 << 2; + + /// Prefix of linux `perf_event_attr`, zero-padded to 128 bytes. + /// Kernel accepts any size with zeroed bytes beyond its known struct. + #[repr(C)] + struct PerfEventAttr { + type_: u32, + size: u32, + config: u64, + sample_period: u64, + sample_type: u64, + read_format: u64, + flags: u64, + _pad: [u64; 10], + } + + /// Prefix of linux `perf_event_mmap_page`. + #[repr(C)] + struct PerfEventMmapPage { + version: u32, + compat_version: u32, + lock: u32, + index: u32, + offset: i64, + time_enabled: u64, + time_running: u64, + capabilities: u64, + pmc_width: u16, + } + + #[derive(Clone, Copy, Debug)] + pub struct HwCounter { + page: *const PerfEventMmapPage, + fd: i32, + } + + // Used from exactly one tile thread; raw ptr blocks the auto impl. + unsafe impl Send for HwCounter {} + + impl HwCounter { + pub fn instructions() -> Option { + Self::new(PERF_COUNT_HW_INSTRUCTIONS) + } + + pub fn cycles() -> Option { + Self::new(PERF_COUNT_HW_CPU_CYCLES) + } + + /// Branch mispredictions. Unlike instructions/cycles (fixed PMU + /// counters) this takes a general-purpose counter — typically 4 + /// per hyperthread, one of which the NMI watchdog may hold. + pub fn branch_misses() -> Option { + Self::new(PERF_COUNT_HW_BRANCH_MISSES) + } + + /// Last-level cache misses (~DRAM round trips). General-purpose + /// counter, same budget note as `branch_misses`. + pub fn cache_misses() -> Option { + Self::new(PERF_COUNT_HW_CACHE_MISSES) + } + + /// Counts kernel-mode work when permitted (`perf_event_paranoid` + /// <= 1 or `CAP_PERFMON`), else falls back to userspace-only. + /// Returns None (with a warn) if perf or rdpmc is unavailable — + /// typically `kernel.perf_event_paranoid` > 2. + /// fd and mapping live for the process; no Drop (keeps Copy). + fn new(config: u64) -> Option { + match Self::open(config, false) { + Ok(c) => Some(c), + Err(_) => match Self::open(config, true) { + Ok(c) => { + static NOTICE: Once = Once::new(); + NOTICE.call_once(|| { + info!( + "perf counters are userspace-only (kernel excluded; \ + set kernel.perf_event_paranoid=1 to include kernel work)" + ); + }); + Some(c) + } + Err(err) => { + warn!( + ?err, + "perf_event_open failed; hw counters disabled \ + (check kernel.perf_event_paranoid <= 2)" + ); + None + } + }, + } + } + + fn open(config: u64, exclude_kernel: bool) -> Result { + let mut attr: PerfEventAttr = unsafe { std::mem::zeroed() }; + attr.type_ = PERF_TYPE_HARDWARE; + attr.size = size_of::() as u32; + attr.config = config; + attr.flags = ATTR_EXCLUDE_HV | if exclude_kernel { ATTR_EXCLUDE_KERNEL } else { 0 }; + + let fd = unsafe { + libc::syscall(libc::SYS_perf_event_open, &attr, 0, -1, -1, PERF_FLAG_FD_CLOEXEC) + } as i32; + if fd < 0 { + return Err(Error::last_os_error()); + } + + let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as usize; + let page = unsafe { + libc::mmap( + std::ptr::null_mut(), + page_size, + libc::PROT_READ, + libc::MAP_SHARED, + fd, + 0, + ) + }; + if page == libc::MAP_FAILED { + let err = Error::last_os_error(); + unsafe { libc::close(fd) }; + return Err(err); + } + let page = page as *const PerfEventMmapPage; + + let caps = unsafe { std::ptr::read_volatile(&raw const (*page).capabilities) }; + if caps & CAP_USER_RDPMC == 0 { + unsafe { + libc::munmap(page as *mut libc::c_void, page_size); + libc::close(fd); + } + return Err(Error::new( + ErrorKind::Unsupported, + "cap_user_rdpmc unset (check /sys/bus/event_source/devices/cpu/rdpmc)", + )); + } + + Ok(Self { page, fd }) + } + + /// Current instructions-retired count. Userspace mmap-page seqlock + /// read (see `perf_event_mmap_page` kernel docs); falls back to the + /// read(2) path if the event is momentarily descheduled (index == 0). + #[inline] + pub fn read(&self) -> u64 { + unsafe { + loop { + let lock = std::ptr::read_volatile(&raw const (*self.page).lock); + compiler_fence(Ordering::Acquire); + + let index = std::ptr::read_volatile(&raw const (*self.page).index); + let offset = std::ptr::read_volatile(&raw const (*self.page).offset); + let width = + u32::from(std::ptr::read_volatile(&raw const (*self.page).pmc_width)); + + if index == 0 { + return self.read_slow(); + } + // rdpmc yields pmc_width valid bits; sign-extend before + // adding the kernel-maintained offset. + let pmc = rdpmc(index - 1); + let pmc = ((pmc << (64 - width)) as i64) >> (64 - width); + let count = offset.wrapping_add(pmc) as u64; + + compiler_fence(Ordering::Acquire); + if std::ptr::read_volatile(&raw const (*self.page).lock) == lock { + return count; + } + } + } + } + + #[cold] + fn read_slow(&self) -> u64 { + let mut count = 0u64; + let n = unsafe { libc::read(self.fd, (&raw mut count).cast::(), 8) }; + if n == 8 { count } else { 0 } + } + } + + #[inline] + fn rdpmc(counter: u32) -> u64 { + let lo: u32; + let hi: u32; + unsafe { + core::arch::asm!( + "rdpmc", + in("ecx") counter, + out("eax") lo, + out("edx") hi, + options(nostack, preserves_flags), + ); + } + ((hi as u64) << 32) | lo as u64 + } +} + +#[cfg(not(all(target_os = "linux", target_arch = "x86_64")))] +mod imp { + #[derive(Clone, Copy, Debug)] + pub struct HwCounter {} + + impl HwCounter { + pub fn instructions() -> Option { + None + } + + pub fn cycles() -> Option { + None + } + + pub fn branch_misses() -> Option { + None + } + + pub fn cache_misses() -> Option { + None + } + + #[inline] + pub fn read(&self) -> u64 { + 0 + } + } +} + +pub use imp::HwCounter; + +#[cfg(all(test, target_os = "linux", target_arch = "x86_64"))] +mod tests { + use super::HwCounter; + + /// Skips (None) where `perf_event_open` is restricted, e.g. CI. + #[test] + fn counts_loop_instructions() { + let Some(c) = HwCounter::instructions() else { + eprintln!("perf unavailable; skipping"); + return; + }; + let start = c.read(); + let mut acc = 0u64; + for i in 0..100_000u64 { + acc = acc.wrapping_add(std::hint::black_box(i)); + } + std::hint::black_box(acc); + let delta = c.read().saturating_sub(start); + // 100k iterations retire >= 100k instructions. + assert!(delta >= 100_000, "delta = {delta}"); + } + + #[test] + fn counts_cycles() { + let Some(c) = HwCounter::cycles() else { + eprintln!("perf unavailable; skipping"); + return; + }; + let start = c.read(); + let mut acc = 0u64; + for i in 0..100_000u64 { + acc = acc.wrapping_add(std::hint::black_box(i)); + } + std::hint::black_box(acc); + let delta = c.read().saturating_sub(start); + // 100k dependent adds take >= ~25k cycles on any core. + assert!(delta >= 10_000, "delta = {delta}"); + } +} diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs index f4d4ddf..a854ddb 100644 --- a/crates/metrics/src/lib.rs +++ b/crates/metrics/src/lib.rs @@ -25,9 +25,13 @@ use std::{ }; use flux::{Timer, timing::Instant}; -pub use silver_common_macros::timed; +pub use silver_common_macros::{perf, timed}; pub mod flamegraph_timer; +#[cfg(feature = "perf")] +pub mod hw_counter; +mod perf_guard; +pub use perf_guard::{PerfGuard, PerfSample}; /// App name used as the parent directory for per-function `Timer` /// shmem queues. Falls back to `"silver"` if `init_app` is not called. @@ -345,4 +349,45 @@ mod tests { assert_eq!(timed_custom_name(0), Err("zero")); assert_eq!(timed_custom_name(41), Ok(42)); } + + use crate::perf; + + #[perf] + fn perf_default_name(x: u64) -> u64 { + x * 3 + } + + #[perf("perf_custom_label")] + fn perf_custom_name(x: u64) -> Result { + if x == 0 { Err("zero") } else { Ok(x + 1) } + } + + #[perf(sample = 4)] + fn perf_sampled(x: u64) -> u64 { + x + 1 + } + + #[perf("perf_sampled_label", sample = 1000)] + fn perf_sampled_named(x: u64) -> Result { + if x == 0 { Err("zero") } else { Ok(x * 2) } + } + + /// Inert without the `perf` feature / perf access; emits to + /// `perf-{name}` queues otherwise. Either way the wrap must be + /// transparent, including the sampled variants' skip path. + #[test] + fn perf_macro_expands_and_runs() { + super::init_app("silver_test"); + + assert_eq!(perf_default_name(7), 21); + assert_eq!(perf_custom_name(0), Err("zero")); + assert_eq!(perf_custom_name(41), Ok(42)); + + // Cross the sampling boundary a few times. + for i in 0..10 { + assert_eq!(perf_sampled(i), i + 1); + } + assert_eq!(perf_sampled_named(0), Err("zero")); + assert_eq!(perf_sampled_named(21), Ok(42)); + } } diff --git a/crates/metrics/src/perf_guard.rs b/crates/metrics/src/perf_guard.rs new file mode 100644 index 0000000..34e229e --- /dev/null +++ b/crates/metrics/src/perf_guard.rs @@ -0,0 +1,142 @@ +//! `#[perf]` runtime: per-function instructions-retired + CPU-cycle +//! measurement via rdpmc, emitted per call onto a `perf-{fn}` MPMC shmem +//! queue (mirroring `#[timed]`'s `timing-{fn}` queues). +//! +//! Counters are perf self-monitoring events bound to the opening thread, +//! so each thread lazily opens its own set (instructions, cycles, branch +//! misses, LLC misses). Collection is gated behind the `perf` feature; +//! without it (or when `perf_event_paranoid` blocks the events at +//! runtime) the guard is inert. + +/// One decorated-function call: counter deltas between entry and every +/// exit path. Kernel-mode work is included when perf permits +/// (`perf_event_paranoid` <= 1), else userspace only — see `hw_counter`. +/// +/// `branch_misses` / `cache_misses` sit in general-purpose PMU counters +/// and read zero if those couldn't be opened (counter budget exhausted). +#[repr(C)] +#[derive(Clone, Copy, Debug, Default)] +pub struct PerfSample { + pub instr: u64, + pub cycles: u64, + pub branch_misses: u64, + pub cache_misses: u64, +} + +impl PerfSample { + #[inline] + pub fn ipc(&self) -> f64 { + if self.cycles == 0 { 0.0 } else { self.instr as f64 / self.cycles as f64 } + } +} + +/// Drop-based scope used by the `#[perf]` macro expansion. Records on +/// every exit path — normal return, `?`, early `return`, panic-unwind. +#[doc(hidden)] +pub struct PerfGuard { + #[cfg(feature = "perf")] + name: &'static str, + #[cfg(feature = "perf")] + start: PerfSample, +} + +#[cfg(feature = "perf")] +mod imp { + use std::{cell::RefCell, collections::HashMap}; + + use flux::{ + communication::queue::{Producer, Queue, QueueType}, + utils::directories::{local_share_dir, shmem_dir_queues_with_base}, + }; + + use super::{PerfGuard, PerfSample}; + use crate::hw_counter::HwCounter; + + const QUEUE_SIZE: usize = 2usize.pow(13); + + /// Per-thread counter set. Instructions/cycles use fixed PMU counters + /// and are required; branch/cache misses use general-purpose counters + /// and degrade individually to zero readings when unavailable. + struct Counters { + instr: HwCounter, + cycles: HwCounter, + branch_misses: Option, + cache_misses: Option, + } + + impl Counters { + fn open() -> Option { + Some(Self { + instr: HwCounter::instructions()?, + cycles: HwCounter::cycles()?, + branch_misses: HwCounter::branch_misses(), + cache_misses: HwCounter::cache_misses(), + }) + } + + #[inline] + fn read(&self) -> PerfSample { + PerfSample { + instr: self.instr.read(), + cycles: self.cycles.read(), + branch_misses: self.branch_misses.as_ref().map_or(0, HwCounter::read), + cache_misses: self.cache_misses.as_ref().map_or(0, HwCounter::read), + } + } + } + + ::std::thread_local! { + /// Perf pid=0 events bind to the opening thread. None when + /// perf_event_open is unavailable. + static COUNTERS: Option = Counters::open(); + static PRODUCERS: RefCell>> = + RefCell::new(HashMap::new()); + } + + fn new_producer(name: &str) -> Producer { + let app = crate::APP_NAME.get().map(String::as_str).unwrap_or("silver"); + let dir = shmem_dir_queues_with_base(local_share_dir(), app); + let _ = std::fs::create_dir_all(&dir); + let queue: Queue = Queue::create_or_open_shared( + dir.join(format!("perf-{name}")), + QUEUE_SIZE, + QueueType::MPMC, + ); + Producer::from(queue) + } + + impl PerfGuard { + #[inline] + pub fn new(name: &'static str) -> Self { + let start = COUNTERS.with(|c| c.as_ref().map(Counters::read).unwrap_or_default()); + Self { name, start } + } + } + + impl Drop for PerfGuard { + fn drop(&mut self) { + let Some(end) = COUNTERS.with(|c| c.as_ref().map(Counters::read)) else { + return; + }; + let sample = PerfSample { + instr: end.instr.saturating_sub(self.start.instr), + cycles: end.cycles.saturating_sub(self.start.cycles), + branch_misses: end.branch_misses.saturating_sub(self.start.branch_misses), + cache_misses: end.cache_misses.saturating_sub(self.start.cache_misses), + }; + PRODUCERS.with(|cell| { + let mut map = cell.borrow_mut(); + let producer = map.entry(self.name).or_insert_with(|| new_producer(self.name)); + producer.produce(&sample); + }); + } + } +} + +#[cfg(not(feature = "perf"))] +impl PerfGuard { + #[inline] + pub fn new(_name: &'static str) -> Self { + Self {} + } +} diff --git a/crates/network/src/p2p/streams/snappy.rs b/crates/network/src/p2p/streams/snappy.rs index 7b8f738..e5dbd82 100644 --- a/crates/network/src/p2p/streams/snappy.rs +++ b/crates/network/src/p2p/streams/snappy.rs @@ -4,6 +4,7 @@ use std::{ }; use silver_common::metrics::timed; +use silver_metrics::perf; use snap::raw::{Decoder, Encoder}; use thiserror::Error; @@ -90,6 +91,7 @@ impl SnappyDecoder { /// Feed compressed bytes, decompress complete frames into `out`. /// Returns `(bytes_consumed, bytes_written)`. #[timed] + #[perf] pub fn decompress( &mut self, input: &[u8], diff --git a/crates/storage/src/tile.rs b/crates/storage/src/tile.rs index 222e842..29d3cc1 100644 --- a/crates/storage/src/tile.rs +++ b/crates/storage/src/tile.rs @@ -151,9 +151,7 @@ impl StorageTile { "data columns by root request: {to_request:b}" ); - if self.store.is_synced() { - emit(self.column_request(block_root, to_request)); - } + emit(self.column_request(block_root, to_request)); } #[timed] @@ -394,7 +392,7 @@ impl Tile for StorageTile { let t_read = self.rpc_consumer.acquire(ssz); self.store.backfill_block(t_read); } - silver_common::RpcResponse::BeaconBlock { fork_digest: _, ssz } => { + silver_common::RpcResponse::BeaconBlock { fork_digest: _, ssz } if self.store.is_synced() => { let t_read = self.rpc_consumer.acquire(ssz); self.beacon_block(rsp.stream_id, t_read, &mut |evt| { producers.peer_events.produce(&evt.into()); @@ -637,22 +635,9 @@ mod tests { let read = blocks_consumer.acquire(ssz); let block_root = util::block_root(&block_bytes); - - // 1a. RPC block while syncing: the immediate by-root request is - // suppressed (columns arrive via the PM by-range path), but the - // outstanding entry is still registered so the retry wheel can fall - // back to by-root for stragglers. let rpc_stream = P2pStreamId::new(2, 2, StreamProtocol::BeaconBlocksByRange, true); - let mut rpc_events = Vec::new(); - tile.beacon_block(rpc_stream, read.clone(), &mut |evt| rpc_events.push(evt)); - assert!(rpc_events.is_empty(), "RPC block while syncing must not emit immediately"); - assert!( - tile.outstanding_requests.contains(&block_root), - "outstanding entry registered for the wheel fallback" - ); - tile.outstanding_requests.remove(&block_root); - // 1b. Once synced, an RPC block requests its custody columns by root. + // 1. Once synced, an RPC block requests its custody columns by root. tile.store.sync_update(SyncUpdate::Following); let mut rpc_events = Vec::new(); tile.beacon_block(rpc_stream, read.clone(), &mut |evt| rpc_events.push(evt)); diff --git a/crates/storage/src/util.rs b/crates/storage/src/util.rs index f9347ae..1f93d63 100644 --- a/crates/storage/src/util.rs +++ b/crates/storage/src/util.rs @@ -12,6 +12,7 @@ use silver_common::{ MAX_BLOB_COMMITMENTS_PER_BLOCK, NUMBER_OF_COLUMNS, SignedBeaconBlockView, }, }; +use silver_metrics::{perf, timed}; /// Depth of the Merkle branch attaching `kzg_commitments` to /// `BeaconBlockBody.body_root`. Per Fulu spec: `floor(log2(gindex))` @@ -141,6 +142,8 @@ pub fn verify_data_column_sidecar_inclusion_proof(sidecar: &[u8]) -> bool { /// c-kzg's `ethereum_kzg_settings` feature; no runtime initialisation /// required. `precompute = 0` — verification doesn't benefit from the /// precomputation table (only proof generation does). +#[timed] +#[perf] pub fn verify_data_column_sidecar_kzg_proofs(sidecar: &[u8]) -> bool { let column = DataColumnSidecarView::column(sidecar); let commits = DataColumnSidecarView::kzg_commitments(sidecar); @@ -230,6 +233,7 @@ pub fn parent_validated( /// `fork_version` is the active version at `sidecar.slot`'s epoch; the /// caller resolves it from the fork schedule. `proposer_pubkey` is the /// already-decompressed registry entry (no extra subgroup check). +#[perf] pub fn verify_proposer_signature( sidecar: &[u8], proposer_pubkey: &PublicKey, diff --git a/crates/surfer/src/app.rs b/crates/surfer/src/app.rs index f786951..27cd602 100644 --- a/crates/surfer/src/app.rs +++ b/crates/surfer/src/app.rs @@ -4,7 +4,9 @@ use ratatui::widgets::TableState; use crate::{ discovery::DiscoveredSources, - sources::{counters::CounterSet, tilemetrics::TileMetricsSet, timings::TimingSet}, + sources::{ + counters::CounterSet, perf::PerfSet, tilemetrics::TileMetricsSet, timings::TimingSet, + }, }; #[derive(Clone, Copy, PartialEq, Eq)] @@ -13,6 +15,7 @@ pub enum Pane { TCaches, Timings, Tiles, + Perf, } impl Pane { @@ -22,6 +25,7 @@ impl Pane { Pane::TCaches => "TCaches", Pane::Timings => "Timings", Pane::Tiles => "Tiles", + Pane::Perf => "Perf", } } @@ -30,7 +34,8 @@ impl Pane { Pane::Counters => Pane::TCaches, Pane::TCaches => Pane::Timings, Pane::Timings => Pane::Tiles, - Pane::Tiles => Pane::Counters, + Pane::Tiles => Pane::Perf, + Pane::Perf => Pane::Counters, } } } @@ -47,6 +52,8 @@ pub struct App { pub timings_selection: usize, pub tilemetrics: Vec, pub tiles_selection: usize, + pub perf: Vec, + pub perf_selection: usize, /// When true, the active pane renders only the plot for the /// selected row, full-area. Toggled by Enter; Esc exits. pub drilled_in: bool, @@ -63,6 +70,7 @@ pub struct App { pub tcaches_table_state: TableState, pub timings_table_state: TableState, pub tiles_table_state: TableState, + pub perf_table_state: TableState, pub quit: bool, } @@ -77,6 +85,7 @@ impl App { tcaches: Vec, timings: Vec, tilemetrics: Vec, + perf: Vec, ) -> Self { Self { pane: Pane::Counters, @@ -88,12 +97,15 @@ impl App { timings_selection: 0, tilemetrics, tiles_selection: 0, + perf, + perf_selection: 0, drilled_in: false, split_pct: SPLIT_DEFAULT, counters_table_state: TableState::default(), tcaches_table_state: TableState::default(), timings_table_state: TableState::default(), tiles_table_state: TableState::default(), + perf_table_state: TableState::default(), quit: false, } } @@ -193,6 +205,23 @@ impl App { self.tiles_selection = idx; } } + + // Perf. + let sel_name = self.perf.get(self.perf_selection).map(|p| p.name.clone()); + let existing: HashSet = self.perf.iter().map(|p| p.name.clone()).collect(); + for f in &sources.perf { + if !existing.contains(&f.name) { + if let Ok(p) = PerfSet::open(f) { + self.perf.push(p); + } + } + } + self.perf.sort_by(|a, b| a.name.cmp(&b.name)); + if let Some(n) = sel_name { + if let Some(idx) = self.perf.iter().position(|p| p.name == n) { + self.perf_selection = idx; + } + } } pub fn sample(&mut self) { @@ -208,6 +237,9 @@ impl App { for t in &mut self.tilemetrics { t.drain(); } + for p in &mut self.perf { + p.drain(); + } } pub fn roll_bucket(&mut self) { @@ -226,6 +258,9 @@ impl App { for t in &mut self.tilemetrics { t.roll_bucket(); } + for p in &mut self.perf { + p.roll_bucket(); + } } /// Scroll the selection within the active pane. `dir = +1` @@ -237,7 +272,17 @@ impl App { Pane::TCaches => self.move_tcache_selection(dir), Pane::Timings => self.move_timing_selection(dir), Pane::Tiles => self.move_tile_selection(dir), + Pane::Perf => self.move_perf_selection(dir), + } + } + + fn move_perf_selection(&mut self, dir: i32) { + if self.perf.is_empty() { + return; } + let n = self.perf.len() as i32; + let new = (self.perf_selection as i32 + dir).rem_euclid(n); + self.perf_selection = new as usize; } fn move_tcache_selection(&mut self, dir: i32) { diff --git a/crates/surfer/src/discovery.rs b/crates/surfer/src/discovery.rs index 43221ac..cd7b3e8 100644 --- a/crates/surfer/src/discovery.rs +++ b/crates/surfer/src/discovery.rs @@ -7,6 +7,7 @@ //! - `counters-{name}` — shmem-mapped `[AtomicU64; N]` (read-only). //! - `timing-{name}` / `latency-{name}` — flux MPMC `TimingMessage` queues. //! - `tilemetrics-{name}` — flux SPMC `TileSample` queue. +//! - `perf-{name}` — flux MPMC `PerfSample` queue (`#[perf]` functions). use std::{collections::HashMap, fs, io, path::PathBuf}; @@ -15,6 +16,7 @@ pub struct DiscoveredSources { pub tcaches: Vec, pub timings: Vec, pub tilemetrics: Vec, + pub perf: Vec, } pub struct CounterFile { @@ -41,6 +43,13 @@ pub struct TileMetricsFile { pub path: PathBuf, } +pub struct PerfFile { + /// The `{name}` suffix from `perf-{name}` — the decorated function's + /// `module_path::fn` (or its `#[perf("...")]` override). + pub name: String, + pub path: PathBuf, +} + pub fn discover(base_dir: &std::path::Path, app_name: &str) -> io::Result { let mut counters = Vec::new(); let mut tcaches = Vec::new(); @@ -48,6 +57,7 @@ pub fn discover(base_dir: &std::path::Path, app_name: &str) -> io::Result = HashMap::new(); let mut tilemetrics = Vec::new(); + let mut perf = Vec::new(); let dir = flux::utils::directories::shmem_dir_queues_with_base(base_dir, app_name); if let Ok(entries) = fs::read_dir(&dir) { @@ -80,6 +90,8 @@ pub fn discover(base_dir: &std::path::Path, app_name: &str) -> io::Result io::Result io::Result<()> { for t in &mut tile_sets { t.drain(); } - let mut app = App::new(counter_sets, tcache_sets, timing_sets, tile_sets); + + let mut perf_sets: Vec = sources + .perf + .iter() + .filter_map(|f| match PerfSet::open(f) { + Ok(p) => Some(p), + Err(e) => { + eprintln!("surfer: skipping {}: {e}", f.path.display()); + None + } + }) + .collect(); + for p in &mut perf_sets { + p.drain(); + } + let mut app = App::new(counter_sets, tcache_sets, timing_sets, tile_sets, perf_sets); enable_raw_mode()?; let mut stdout = io::stdout(); diff --git a/crates/surfer/src/render/mod.rs b/crates/surfer/src/render/mod.rs index 4099aaa..805926a 100644 --- a/crates/surfer/src/render/mod.rs +++ b/crates/surfer/src/render/mod.rs @@ -1,5 +1,6 @@ pub mod counters_pane; pub mod fmt; +pub mod perf_pane; pub mod tcaches_pane; pub mod tiles_pane; pub mod timings_pane; @@ -26,12 +27,13 @@ pub fn draw(f: &mut Frame, app: &mut App) { Pane::TCaches => tcaches_pane::draw(f, chunks[1], app), Pane::Timings => timings_pane::draw(f, chunks[1], app), Pane::Tiles => tiles_pane::draw(f, chunks[1], app), + Pane::Perf => perf_pane::draw(f, chunks[1], app), } draw_footer(f, chunks[2], app); } fn draw_header(f: &mut Frame, area: Rect, app: &App) { - let spans: Vec = [Pane::Counters, Pane::TCaches, Pane::Timings, Pane::Tiles] + let spans: Vec = [Pane::Counters, Pane::TCaches, Pane::Timings, Pane::Tiles, Pane::Perf] .iter() .flat_map(|&p| { let style = if p == app.pane { diff --git a/crates/surfer/src/render/perf_pane.rs b/crates/surfer/src/render/perf_pane.rs new file mode 100644 index 0000000..dd8eef2 --- /dev/null +++ b/crates/surfer/src/render/perf_pane.rs @@ -0,0 +1,143 @@ +use ratatui::{ + Frame, + layout::{Constraint, Direction, Layout, Rect}, + style::{Color, Modifier, Style}, + symbols, + text::{Line, Span}, + widgets::{Axis, Block, Borders, Cell, Chart, Dataset, GraphType, Paragraph, Row, Table}, +}; + +use crate::{app::App, render::fmt::fmt_span_ago}; + +pub fn draw(f: &mut Frame, area: Rect, app: &mut App) { + if app.perf.is_empty() { + let block = Block::default().borders(Borders::ALL).title(" perf "); + let inner = block.inner(area); + f.render_widget(block, area); + f.render_widget( + Paragraph::new("no perf queues discovered (build with --features perf)") + .style(Style::default().fg(Color::DarkGray)), + inner, + ); + return; + } + + if app.drilled_in { + draw_chart(f, area, app); + return; + } + let rows = Layout::default() + .direction(Direction::Vertical) + .constraints([ + Constraint::Percentage(app.split_pct), + Constraint::Percentage(100 - app.split_pct), + ]) + .split(area); + + draw_table(f, rows[0], app); + draw_chart(f, rows[1], app); +} + +fn draw_table(f: &mut Frame, area: Rect, app: &mut App) { + let block = Block::default().borders(Borders::ALL).title(" perf — 30 s avg per call "); + let header = Row::new(vec![ + Cell::from("function"), + Cell::from("calls/s"), + Cell::from("instr"), + Cell::from("cycles"), + Cell::from("ipc"), + Cell::from("br/1k"), + Cell::from("llc/call"), + ]) + .style(Style::default().add_modifier(Modifier::BOLD).fg(Color::White)) + .height(1); + + let rows: Vec = app + .perf + .iter() + .enumerate() + .map(|(i, p)| { + let row_style = if i == app.perf_selection { + Style::default().bg(Color::DarkGray).fg(Color::White).add_modifier(Modifier::BOLD) + } else { + Style::default() + }; + Row::new(vec![ + Cell::from(Span::styled(p.name.clone(), row_style)), + Cell::from(format!("{:>9}", p.call_rate())), + Cell::from(format!("{:>12}", p.instr_avg())), + Cell::from(format!("{:>12}", p.cycles_avg())), + Cell::from(format!("{:>6.2}", p.ipc())), + Cell::from(format!("{:>7.2}", p.branch_per_kinstr())), + Cell::from(format!("{:>9}", p.cache_miss_avg())), + ]) + .height(1) + }) + .collect(); + + let widths = [ + Constraint::Percentage(42), + Constraint::Length(10), + Constraint::Length(14), + Constraint::Length(14), + Constraint::Length(8), + Constraint::Length(9), + Constraint::Length(10), + ]; + let table = Table::new(rows, widths).header(header).block(block); + app.perf_table_state.select(Some(app.perf_selection)); + f.render_stateful_widget(table, area, &mut app.perf_table_state); +} + +/// IPC over the retained bucket ring for the selected function. +fn draw_chart(f: &mut Frame, area: Rect, app: &mut App) { + let Some(p) = app.perf.get(app.perf_selection) else { + f.render_widget(Block::default().borders(Borders::ALL).title(" ipc "), area); + return; + }; + let n = p.history.len(); + let title = format!(" {} — ipc ", p.name); + let block = Block::default().borders(Borders::ALL).title(title); + if n == 0 || !p.has_data() { + let inner = block.inner(area); + f.render_widget(block, area); + f.render_widget( + Paragraph::new("no samples yet").style(Style::default().fg(Color::DarkGray)), + inner, + ); + return; + } + + // One bucket per second, oldest at x=0, newest at right. + let data: Vec<(f64, f64)> = + p.history.iter().enumerate().map(|(i, b)| (i as f64, b.ipc())).collect(); + let y_max = data.iter().map(|(_, y)| *y).fold(0.0_f64, f64::max).max(1.0); + let x_max = n.saturating_sub(1).max(1) as f64; + + let dataset = Dataset::default() + .marker(symbols::Marker::Braille) + .style(Style::default().fg(Color::Cyan)) + .graph_type(GraphType::Line) + .data(&data); + + let x_labels = vec![ + Line::from(format!("-{}", fmt_span_ago(n))), + Line::from(format!("-{}", fmt_span_ago(n / 2))), + Line::from("now"), + ]; + let chart = Chart::new(vec![dataset]) + .block(block) + .x_axis( + Axis::default() + .bounds([0.0, x_max]) + .labels(x_labels) + .style(Style::default().fg(Color::DarkGray)), + ) + .y_axis( + Axis::default() + .bounds([0.0, y_max * 1.1]) + .labels(vec![Line::from("0"), Line::from(format!("{y_max:.1}"))]) + .style(Style::default().fg(Color::DarkGray)), + ); + f.render_widget(chart, area); +} diff --git a/crates/surfer/src/sources/mod.rs b/crates/surfer/src/sources/mod.rs index 814ffe0..19be3fa 100644 --- a/crates/surfer/src/sources/mod.rs +++ b/crates/surfer/src/sources/mod.rs @@ -1,3 +1,4 @@ pub mod counters; +pub mod perf; pub mod tilemetrics; pub mod timings; diff --git a/crates/surfer/src/sources/perf.rs b/crates/surfer/src/sources/perf.rs new file mode 100644 index 0000000..683322f --- /dev/null +++ b/crates/surfer/src/sources/perf.rs @@ -0,0 +1,187 @@ +//! Consumer for `perf-{name}` MPMC queues emitted by `#[perf]`-decorated +//! functions. Each `PerfSample` is one call: instructions retired and CPU +//! cycles (userspace) between entry and exit. +//! +//! Samples are summed into 1 s wall-clock buckets (mirroring +//! counters/timings); table stats average over the last +//! `TABLE_WINDOW_BUCKETS` so they track current behaviour, while the full +//! ring feeds the drill-down IPC chart. + +use std::collections::VecDeque; + +use flux::communication::queue::{ConsumerBare, Queue}; +use silver_metrics::PerfSample; + +use crate::{discovery::PerfFile, sources::counters::BUCKET_HISTORY_LEN}; + +/// Table stats average over the last 30 s of 1 s buckets. +const TABLE_WINDOW_BUCKETS: usize = 30; + +/// Call count and counter sums over one wall-clock bucket. +#[derive(Clone, Copy, Debug, Default)] +pub struct PerfBucket { + pub count: u64, + pub instr: u64, + pub cycles: u64, + pub branch_misses: u64, + pub cache_misses: u64, +} + +impl PerfBucket { + #[inline] + pub fn ipc(&self) -> f64 { + if self.cycles == 0 { 0.0 } else { self.instr as f64 / self.cycles as f64 } + } +} + +pub struct PerfSet { + pub name: String, + consumer: ConsumerBare, + /// In-progress bucket accumulated across drains since the last roll. + cur: PerfBucket, + /// Per-bucket sums over the retained window (240 × 1 s = 4 min). + pub history: VecDeque, + pub total_count: u64, +} + +impl PerfSet { + pub fn open(file: &PerfFile) -> Result { + let queue: Queue = Queue::try_open_shared(&file.path) + .map_err(|e| format!("open_shared({:?}): {e:?}", file.path))?; + let label: &'static str = Box::leak(format!("surfer-perf-{}", file.name).into_boxed_str()); + let consumer = ConsumerBare::::new(queue, label); + Ok(Self { + name: file.name.clone(), + consumer, + cur: PerfBucket::default(), + history: VecDeque::with_capacity(BUCKET_HISTORY_LEN), + total_count: 0, + }) + } + + /// Drain everything currently available into the in-progress bucket. + pub fn drain(&mut self) { + let mut sample = PerfSample::default(); + while self.consumer.try_consume(&mut sample).is_ok() { + self.cur.count += 1; + self.cur.instr += sample.instr; + self.cur.cycles += sample.cycles; + self.cur.branch_misses += sample.branch_misses; + self.cur.cache_misses += sample.cache_misses; + self.total_count += 1; + } + } + + /// Snapshot the in-progress bucket into the ring and reset. Driven by + /// the wall-clock bucket tick, same as counters/timings. + pub fn roll_bucket(&mut self) { + if self.history.len() == BUCKET_HISTORY_LEN { + self.history.pop_front(); + } + self.history.push_back(self.cur); + self.cur = PerfBucket::default(); + } + + /// Last `TABLE_WINDOW_BUCKETS` buckets — the table stats window. + fn recent(&self) -> impl Iterator + '_ { + self.history.iter().rev().take(TABLE_WINDOW_BUCKETS) + } + + fn recent_sums(&self) -> PerfBucket { + self.recent().fold(PerfBucket::default(), |acc, b| PerfBucket { + count: acc.count + b.count, + instr: acc.instr + b.instr, + cycles: acc.cycles + b.cycles, + branch_misses: acc.branch_misses + b.branch_misses, + cache_misses: acc.cache_misses + b.cache_misses, + }) + } + + /// Mean calls/s over the table window (buckets are 1 s). + pub fn call_rate(&self) -> u64 { + let n = self.history.len().min(TABLE_WINDOW_BUCKETS) as u64; + if n == 0 { 0 } else { self.recent_sums().count / n } + } + + /// Mean instructions retired per call over the table window. + pub fn instr_avg(&self) -> u64 { + let s = self.recent_sums(); + if s.count == 0 { 0 } else { s.instr / s.count } + } + + /// Mean CPU cycles per call over the table window. + pub fn cycles_avg(&self) -> u64 { + let s = self.recent_sums(); + if s.count == 0 { 0 } else { s.cycles / s.count } + } + + /// Cycle-weighted instructions per cycle over the table window. + pub fn ipc(&self) -> f64 { + self.recent_sums().ipc() + } + + /// Branch misses per 1k instructions over the table window. + pub fn branch_per_kinstr(&self) -> f64 { + let s = self.recent_sums(); + if s.instr == 0 { 0.0 } else { s.branch_misses as f64 * 1000.0 / s.instr as f64 } + } + + /// Mean LLC misses per call over the table window. + pub fn cache_miss_avg(&self) -> u64 { + let s = self.recent_sums(); + if s.count == 0 { 0 } else { s.cache_misses / s.count } + } + + pub fn has_data(&self) -> bool { + self.total_count > 0 + } +} + +#[cfg(test)] +mod tests { + use flux::communication::queue::{Producer, Queue, QueueType}; + + use super::*; + use crate::discovery::PerfFile; + + #[test] + fn aggregates_buckets() { + let tmp = std::env::temp_dir().join(format!("surfer_perf_{}", std::process::id())); + std::fs::remove_dir_all(&tmp).ok(); + std::fs::create_dir_all(&tmp).unwrap(); + let path = tmp.join("perf-test_fn"); + let queue: Queue = Queue::create_or_open_shared(&path, 4096, QueueType::MPMC); + let mut producer = Producer::from(queue); + let file = PerfFile { name: "test_fn".into(), path: path.clone() }; + let mut set = PerfSet::open(&file).unwrap(); + // Prime the cursor at head before producing (mirrors main.rs). + set.drain(); + + producer.produce(&PerfSample { + instr: 3000, + cycles: 1000, + branch_misses: 5, + cache_misses: 12, + }); + producer.produce(&PerfSample { + instr: 1000, + cycles: 1000, + branch_misses: 3, + cache_misses: 4, + }); + set.drain(); + set.roll_bucket(); + + assert_eq!(set.total_count, 2); + assert_eq!(set.call_rate(), 2); + assert_eq!(set.instr_avg(), 2000); + assert_eq!(set.cycles_avg(), 1000); + assert!((set.ipc() - 2.0).abs() < 1e-9, "ipc = {}", set.ipc()); + // 8 misses / 4000 instr = 2 per 1k. + assert!((set.branch_per_kinstr() - 2.0).abs() < 1e-9); + assert_eq!(set.cache_miss_avg(), 8); + assert!(set.has_data()); + + std::fs::remove_dir_all(&tmp).ok(); + } +}