From 98c283efae47a482423922ffe09cf3aadb3f5a54 Mon Sep 17 00:00:00 2001 From: vladimir-ea Date: Thu, 11 Jun 2026 15:44:10 +0100 Subject: [PATCH] add perf instrumentation feature --- crates/flux/Cargo.toml | 6 + crates/flux/src/tile/instr.rs | 251 ++++++++++++++++++++++++++++++++ crates/flux/src/tile/metrics.rs | 174 ++++++++++++++++++++++ crates/flux/src/tile/mod.rs | 10 ++ 4 files changed, 441 insertions(+) create mode 100644 crates/flux/src/tile/instr.rs diff --git a/crates/flux/Cargo.toml b/crates/flux/Cargo.toml index 80f10ea..45280be 100644 --- a/crates/flux/Cargo.toml +++ b/crates/flux/Cargo.toml @@ -31,6 +31,12 @@ tempfile.workspace = true [features] default = [] +# Per-loop instructions-retired + CPU-cycle counting via perf_event + rdpmc, +# emitted on a separate tileperf-{tile} queue. Self-monitoring requires +# kernel.perf_event_paranoid <= 2: +# sudo sysctl kernel.perf_event_paranoid=2 +# (persist via /etc/sysctl.d/). Disabled with a warn at runtime otherwise. +instr-count = [] wincode = [ "dep:wincode", "dep:wincode-derive", diff --git a/crates/flux/src/tile/instr.rs b/crates/flux/src/tile/instr.rs new file mode 100644 index 0000000..98febe3 --- /dev/null +++ b/crates/flux/src/tile/instr.rs @@ -0,0 +1,251 @@ +//! Hardware counters (instructions retired, CPU cycles) via +//! `perf_event_open` + userspace rdpmc. +//! +//! Reads cost ~30 cycles. Counts userspace only (`exclude_kernel|exclude_hv`), +//! so it works at `perf_event_paranoid` <= 2 without privileges. +//! +//! perf binds pid=0 events to the *opening* thread — construct on the tile +//! thread, never the spawner. +//! +//! ABI structs are defined locally: libc has no `perf_event_mmap_page` and we +//! need only a prefix of `perf_event_attr`. + +#[cfg(all(target_os = "linux", target_arch = "x86_64"))] +mod imp { + use std::sync::atomic::{Ordering, compiler_fence}; + + use tracing::warn; + + const PERF_TYPE_HARDWARE: u32 = 0; + const PERF_COUNT_HW_CPU_CYCLES: u64 = 0; + const PERF_COUNT_HW_INSTRUCTIONS: u64 = 1; + const PERF_FLAG_FD_CLOEXEC: libc::c_ulong = 8; + // perf_event_attr flag bits. + const ATTR_EXCLUDE_KERNEL: u64 = 1 << 5; + const ATTR_EXCLUDE_HV: u64 = 1 << 6; + // perf_event_mmap_page.capabilities bits. + const CAP_USER_RDPMC: u64 = 1 << 2; + + /// Prefix of linux `perf_event_attr`, zero-padded to 128 bytes. + /// Kernel accepts any size with zeroed bytes beyond its known struct. + #[repr(C)] + struct PerfEventAttr { + type_: u32, + size: u32, + config: u64, + sample_period: u64, + sample_type: u64, + read_format: u64, + flags: u64, + _pad: [u64; 10], + } + + /// Prefix of linux `perf_event_mmap_page`. + #[repr(C)] + struct PerfEventMmapPage { + version: u32, + compat_version: u32, + lock: u32, + index: u32, + offset: i64, + time_enabled: u64, + time_running: u64, + capabilities: u64, + pmc_width: u16, + } + + #[derive(Clone, Copy, Debug)] + pub struct HwCounter { + page: *const PerfEventMmapPage, + fd: i32, + } + + // Used from exactly one tile thread; raw ptr blocks the auto impl. + unsafe impl Send for HwCounter {} + + impl HwCounter { + pub fn instructions() -> Option { + Self::new(PERF_COUNT_HW_INSTRUCTIONS) + } + + pub fn cycles() -> Option { + Self::new(PERF_COUNT_HW_CPU_CYCLES) + } + + /// Returns None (with a warn) if `perf_event_open` or rdpmc is + /// unavailable — typically `kernel.perf_event_paranoid` > 2. + /// fd and mapping live for the process; no Drop (keeps Copy). + fn new(config: u64) -> Option { + let mut attr: PerfEventAttr = unsafe { std::mem::zeroed() }; + attr.type_ = PERF_TYPE_HARDWARE; + attr.size = size_of::() as u32; + attr.config = config; + attr.flags = ATTR_EXCLUDE_KERNEL | ATTR_EXCLUDE_HV; + + let fd = unsafe { + libc::syscall(libc::SYS_perf_event_open, &attr, 0, -1, -1, PERF_FLAG_FD_CLOEXEC) + } as i32; + if fd < 0 { + let err = std::io::Error::last_os_error(); + warn!( + ?err, + "perf_event_open failed; instruction counts disabled \ + (check kernel.perf_event_paranoid <= 2)" + ); + return None; + } + + let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as usize; + let page = unsafe { + libc::mmap( + std::ptr::null_mut(), + page_size, + libc::PROT_READ, + libc::MAP_SHARED, + fd, + 0, + ) + }; + if page == libc::MAP_FAILED { + let err = std::io::Error::last_os_error(); + warn!(?err, "perf mmap failed; instruction counts disabled"); + unsafe { libc::close(fd) }; + return None; + } + let page = page as *const PerfEventMmapPage; + + let caps = unsafe { std::ptr::read_volatile(&raw const (*page).capabilities) }; + if caps & CAP_USER_RDPMC == 0 { + warn!( + "perf cap_user_rdpmc unset; instruction counts disabled \ + (check /sys/bus/event_source/devices/cpu/rdpmc)" + ); + unsafe { + libc::munmap(page as *mut libc::c_void, page_size); + libc::close(fd); + } + return None; + } + + Some(Self { page, fd }) + } + + /// Current instructions-retired count. Userspace mmap-page seqlock + /// read (see `perf_event_mmap_page` kernel docs); falls back to the + /// read(2) path if the event is momentarily descheduled (index == 0). + #[inline] + pub fn read(&self) -> u64 { + unsafe { + loop { + let lock = std::ptr::read_volatile(&raw const (*self.page).lock); + compiler_fence(Ordering::Acquire); + + let index = std::ptr::read_volatile(&raw const (*self.page).index); + let offset = std::ptr::read_volatile(&raw const (*self.page).offset); + let width = + u32::from(std::ptr::read_volatile(&raw const (*self.page).pmc_width)); + + if index == 0 { + return self.read_slow(); + } + // rdpmc yields pmc_width valid bits; sign-extend before + // adding the kernel-maintained offset. + let pmc = rdpmc(index - 1); + let pmc = ((pmc << (64 - width)) as i64) >> (64 - width); + let count = offset.wrapping_add(pmc) as u64; + + compiler_fence(Ordering::Acquire); + if std::ptr::read_volatile(&raw const (*self.page).lock) == lock { + return count; + } + } + } + } + + #[cold] + fn read_slow(&self) -> u64 { + let mut count = 0u64; + let n = unsafe { libc::read(self.fd, (&raw mut count).cast::(), 8) }; + if n == 8 { count } else { 0 } + } + } + + #[inline] + fn rdpmc(counter: u32) -> u64 { + let lo: u32; + let hi: u32; + unsafe { + core::arch::asm!( + "rdpmc", + in("ecx") counter, + out("eax") lo, + out("edx") hi, + options(nostack, preserves_flags), + ); + } + ((hi as u64) << 32) | lo as u64 + } +} + +#[cfg(not(all(target_os = "linux", target_arch = "x86_64")))] +mod imp { + #[derive(Clone, Copy, Debug)] + pub struct HwCounter {} + + impl HwCounter { + pub fn instructions() -> Option { + None + } + + pub fn cycles() -> Option { + None + } + + #[inline] + pub fn read(&self) -> u64 { + 0 + } + } +} + +pub use imp::HwCounter; + +#[cfg(all(test, target_os = "linux", target_arch = "x86_64"))] +mod tests { + use super::HwCounter; + + /// Skips (None) where `perf_event_open` is restricted, e.g. CI. + #[test] + fn counts_loop_instructions() { + let Some(c) = HwCounter::instructions() else { + eprintln!("perf unavailable; skipping"); + return; + }; + let start = c.read(); + let mut acc = 0u64; + for i in 0..100_000u64 { + acc = acc.wrapping_add(std::hint::black_box(i)); + } + std::hint::black_box(acc); + let delta = c.read().saturating_sub(start); + // 100k iterations retire >= 100k instructions. + assert!(delta >= 100_000, "delta = {delta}"); + } + + #[test] + fn counts_cycles() { + let Some(c) = HwCounter::cycles() else { + eprintln!("perf unavailable; skipping"); + return; + }; + let start = c.read(); + let mut acc = 0u64; + for i in 0..100_000u64 { + acc = acc.wrapping_add(std::hint::black_box(i)); + } + std::hint::black_box(acc); + let delta = c.read().saturating_sub(start); + // 100k dependent adds take >= ~25k cycles on any core. + assert!(delta >= 10_000, "delta = {delta}"); + } +} diff --git a/crates/flux/src/tile/metrics.rs b/crates/flux/src/tile/metrics.rs index f225b8f..b0701b0 100644 --- a/crates/flux/src/tile/metrics.rs +++ b/crates/flux/src/tile/metrics.rs @@ -4,6 +4,8 @@ use flux_communication::shmem_dir_queues_string_with_base; use flux_timing::{IngestionTime, Instant, Nanos, global_clock_not_mocked}; use crate::communication::queue::{Producer, Queue, QueueType}; +#[cfg(feature = "instr-count")] +use crate::tile::instr::HwCounter; const QUEUE_SIZE: usize = 4096; const SAMPLE_WINDOW: u32 = 1024; @@ -62,6 +64,131 @@ impl TileSample { } } +/// Hardware-counter stats over a sampling window. +/// +/// Emitted on a separate `tileperf-{tile}` queue so perf builds do not +/// change the `TileSample` layout. Type is unconditional (readers need +/// it); collection and queue creation are gated behind `instr-count`. +#[derive(Clone, Copy, Default, Debug)] +#[repr(C)] +#[cfg_attr(feature = "wincode", derive(wincode_derive::SchemaRead, wincode_derive::SchemaWrite))] +pub struct TilePerfSample { + /// Instructions retired over the window (userspace only). + pub instr_sum: u64, + instr_min: u64, + pub instr_max: u64, + /// Instructions retired in busy-classified iterations only. + pub instr_busy_sum: u64, + /// CPU cycles over the window (userspace only). + pub cycles_sum: u64, + /// CPU cycles in busy-classified iterations only. + pub cycles_busy_sum: u64, + /// Diagnostic: the running idle floor at window emit, and how many + /// loops classified busy — exposes classifier behaviour (floor + /// collapse → everything busy; margin too wide → nothing busy). + pub idle_floor: u64, + pub busy_loops: u32, + /// Loops the `did_work` flag (consume/produce auto-marking + manual + /// `mark_work`) reported busy — compare against `busy_loops`. + pub flag_loops: u32, + pub loop_count: u32, +} + +impl TilePerfSample { + #[inline] + fn reset(&mut self) { + *self = Self { instr_min: u64::MAX, ..Default::default() }; + } + + /// Smallest per-loop instruction count in the window — the idle poll + /// path is near-deterministic, so this is the idle baseline. + #[inline] + pub fn instr_min(&self) -> u64 { + if self.instr_min == u64::MAX { 0 } else { self.instr_min } + } +} + +/// Counters + window state for the perf side-queue. Exists only when the +/// instruction counter opened successfully. +#[cfg(feature = "instr-count")] +#[derive(Clone, Copy, Debug)] +struct TilePerf { + instr: HwCounter, + cycles: Option, + instr_begin: u64, + cycles_begin: u64, + /// Running min of per-loop instruction deltas — the idle poll path is + /// near-deterministic and work loops always retire more (poll + work), + /// so this tracks the idle baseline. Decays upward per window (see + /// `emit_and_reset`) so one atypically short loop cannot poison + /// classification permanently. + idle_floor: u64, + sample: TilePerfSample, + producer: Producer, +} + +#[cfg(feature = "instr-count")] +impl TilePerf { + #[inline] + fn begin(&mut self) { + self.instr_begin = self.instr.read(); + if let Some(c) = &self.cycles { + self.cycles_begin = c.read(); + } + } + + /// Read counter deltas and classify the loop: busy when instructions + /// exceed the idle floor by 50%. Drives the instr/cycles busy sums + /// only; `did_work` (passed for the `flag_loops` diagnostic) keeps + /// driving the wall-time busy stats in `TileSample`. + #[inline] + fn record(&mut self, did_work: bool) { + let delta = self.instr.read().saturating_sub(self.instr_begin); + + if delta < self.idle_floor { + self.idle_floor = delta; + } + let busy = delta > self.idle_floor.saturating_add(self.idle_floor / 2); + + let s = &mut self.sample; + s.instr_sum += delta; + if busy { + s.instr_busy_sum += delta; + s.busy_loops += 1; + } + if delta < s.instr_min { + s.instr_min = delta; + } + if delta > s.instr_max { + s.instr_max = delta; + } + + if let Some(c) = &self.cycles { + let delta = c.read().saturating_sub(self.cycles_begin); + s.cycles_sum += delta; + if busy { + s.cycles_busy_sum += delta; + } + } + if did_work { + s.flag_loops += 1; + } + s.loop_count += 1; + } + + #[inline] + fn emit_and_reset(&mut self) { + self.sample.idle_floor = self.idle_floor; + // Decay the floor 12.5% per window, re-clamped by the window min: + // a collapsed floor recovers within a few windows instead of + // misclassifying every idle loop busy forever. + self.idle_floor = + self.sample.instr_min.min(self.idle_floor.saturating_add(self.idle_floor / 8)); + self.producer.produce(&self.sample); + self.sample.reset(); + } +} + /// Per-tile loop instrumentation. /// /// Emits a `TileSample` to `producer` every `SAMPLE_WINDOW` iterations. @@ -72,6 +199,8 @@ pub struct TileMetrics { latest_begin: Instant, sample: TileSample, producer: Producer, + #[cfg(feature = "instr-count")] + perf: Option, } impl TileMetrics { @@ -90,19 +219,60 @@ impl TileMetrics { latest_begin: Instant::default(), sample: TileSample { busy_min: u64::MAX, ..Default::default() }, producer: Producer::from(queue), + #[cfg(feature = "instr-count")] + perf: None, } } + /// Path of the perf side-queue, alongside `tilemetrics-{tile}`. + #[cfg(feature = "instr-count")] + pub fn perf_file, A: AsRef, S: Display>( + base_dir: D, + app_name: A, + tile_name: S, + ) -> String { + let dirstr = shmem_dir_queues_string_with_base(base_dir, &app_name); + format!("{dirstr}/tileperf-{tile_name}") + } + + /// Open the perf counters and the `tileperf` queue. perf binds pid=0 + /// events to the opening thread — call from the tile thread, not the + /// spawner. No-op (no queue created) if perf is unavailable. + #[cfg(feature = "instr-count")] + pub fn init_hw_counters(&mut self, perf_file: &str) { + let Some(instr) = HwCounter::instructions() else { return }; + let queue: Queue = + Queue::create_or_open_shared(perf_file, QUEUE_SIZE, QueueType::SPMC); + self.perf = Some(TilePerf { + instr, + cycles: HwCounter::cycles(), + instr_begin: 0, + cycles_begin: 0, + idle_floor: u64::MAX, + sample: TilePerfSample { instr_min: u64::MAX, ..Default::default() }, + producer: Producer::from(queue), + }); + } + #[inline] pub fn begin(&mut self, now: IngestionTime) { self.latest_begin = now.internal(); if self.sample.loop_count == 0 { self.sample.window_start = now.real(); } + #[cfg(feature = "instr-count")] + if let Some(p) = &mut self.perf { + p.begin(); + } } #[inline] pub fn end(&mut self, did_work: bool) { + #[cfg(feature = "instr-count")] + if let Some(p) = &mut self.perf { + p.record(did_work); + } + if did_work { let ticks = Instant::now().0.saturating_sub(self.latest_begin.0); // Convert TSC ticks → nanoseconds so busy_ticks is in the same @@ -133,5 +303,9 @@ impl TileMetrics { self.sample.window_end = Nanos::now(); self.producer.produce(&self.sample); self.sample.reset(); + #[cfg(feature = "instr-count")] + if let Some(p) = &mut self.perf { + p.emit_and_reset(); + } } } diff --git a/crates/flux/src/tile/mod.rs b/crates/flux/src/tile/mod.rs index ade68d0..304159f 100644 --- a/crates/flux/src/tile/mod.rs +++ b/crates/flux/src/tile/mod.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "instr-count")] +pub mod instr; pub mod metrics; use core::sync::atomic::Ordering; @@ -81,11 +83,19 @@ where } else { None }; + #[cfg(feature = "instr-count")] + let perf_file = TileMetrics::perf_file(spine.spine.base_dir(), S::app_name(), tile.name()); spine.scope.spawn(move || { let _span = span!(Level::INFO, "", tile = %tile.name()).entered(); thread_boot(config.core, config.thread_prio); + // perf pid=0 events bind to the opening thread — must run here. + #[cfg(feature = "instr-count")] + if let Some(m) = &mut metrics { + m.init_hw_counters(&perf_file); + } + while !tile.try_init(&mut adapter) { if stop_flag.load(Ordering::Relaxed) != 0 { tile.teardown(&mut adapter);