Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,7 @@ workspace = true

[features]
alloc-profile = ["silver_common/alloc-profile"]
# #[perf] collection: per-function instructions-retired + CPU cycles on
# perf-{fn} queues (dev; needs `sudo sysctl kernel.perf_event_paranoid=2`).
# Surfer picks the queues up when present; non-perf builds are unaffected.
perf = ["silver_common/perf"]
2 changes: 2 additions & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ backtrace = { workspace = true, optional = true }

[features]
alloc-profile = ["dep:backtrace"]
# Forwards #[perf] collection (rdpmc instr+cycle counting) to silver_metrics.
perf = ["silver_metrics/perf"]
test-util = []

[build-dependencies]
Expand Down
77 changes: 70 additions & 7 deletions crates/common/src/spine/tcache/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ pub(super) struct Buckets {
bucket_mask: u64,
// max difference between head and tail, before 'forced' eviction
lag_threshold: u64,
// Out-of-order acquire lookback: the tail never advances within this
// many seqs of the newest acquire's bucket, so late acquires up to
// this far behind still land at or above the tail. 10% of capacity,
// rounded up to a bucket.
guard: u64,
}

impl Buckets {
Expand All @@ -277,24 +282,35 @@ impl Buckets {
bucket_shift: bucket_size.trailing_zeros() as u64,
bucket_mask: !(bucket_size - 1),
lag_threshold: lag_threshold(cache_capacity as u32),
guard: (cache_capacity / 10).next_multiple_of(bucket_size).max(bucket_size),
}
}

fn acquire(&mut self, seq: u64) {
// Out-of-order acquire beyond the guard window: the producer may
// already be reclaiming this slot, and bucket_index aliases behind
// the tail onto in-window buckets — counting it would corrupt a
// live bucket (the matching release is dropped below tail). Skip;
// the read surfaces as StaleSeq at buffer() time.
if self.tail_seq != u64::MAX && seq < self.tail_seq {
tracing::warn!(seq, tail = self.tail_seq, head = self.head_seq, "acquire below tail");
return;
}

let bucket_idx = self.bucket_index(seq);
self.buckets[bucket_idx] += 1;

self.head_seq = self.head_seq.max(seq);

if self.tail_seq == u64::MAX {
self.tail_seq = self.bucket_start_seq(seq);
self.tail_seq = self.bucket_start_seq(seq).saturating_sub(self.guard);
}

// rollup tail for completed buckets - maintain at least 1 bucket between
// head and tail to avoid any boundary cases where we might roll the tail before
// an entry is acquired.
// Rollup tail for completed buckets — keep `guard` seqs of slack
// below the newest acquire's bucket so bounded out-of-order
// acquires never land behind the tail.
let head_bucket_seq = self.bucket_start_seq(seq);
while head_bucket_seq > (self.tail_seq + self.bucket_size) {
while head_bucket_seq > (self.tail_seq + self.guard) {
let tail_bucket = self.bucket_index(self.tail_seq);
if self.head_seq - self.tail_seq > self.lag_threshold {
tracing::warn!(
Expand Down Expand Up @@ -356,11 +372,58 @@ mod tests {
fn buckets_acquire_initialises_tail_to_bucket_boundary() {
let mut b = Buckets::new(64, 1024);
b.acquire(100);
// 100 lives in bucket 1 (64..128). Tail is bucket-aligned to 64.
assert_eq!(b.tail_seq, 64);
// 100 lives in bucket 1 (64..128); tail starts `guard` (10% of
// 1024 → 128) below its bucket start, saturating at 0.
assert_eq!(b.tail_seq, 0);
assert_eq!(b.head_seq, 100);
}

/// A late acquire within the guard window (10% of capacity behind the
/// newest acquire) must land at or above the tail and be tracked.
#[test]
fn buckets_out_of_order_acquire_within_guard() {
// guard = (1024/10).next_multiple_of(64) = 128.
let mut b = Buckets::new(64, 1024);
b.acquire(0);
b.release(0, "");
b.acquire(300);
// Tail rolled over the released bucket but held 128 back from
// bucket_start(300) = 256.
assert_eq!(b.tail_seq, 128);
// Late low acquire inside the window: tracked, not dropped.
b.acquire(200);
assert!(b.tail_seq <= 200);
b.release(200, "");
b.release(300, "");
}

/// An acquire below the tail (out-of-order beyond the guard window)
/// must not be counted: bucket_index aliases behind-tail seqs onto
/// live buckets and the matching release is dropped, so counting
/// would permanently stall the tail on a phantom holder.
#[test]
fn buckets_acquire_below_tail_dropped_without_corruption() {
let mut b = Buckets::new(64, 1024); // guard 128, lag threshold 921
b.acquire(0);
b.release(0, "");
b.acquire(1000); // forces tail well past bucket 0 (tail = 832)
let tail = b.tail_seq;
assert!(tail >= 128);

// Below-tail acquire: dropped (warn), tail untouched.
b.acquire(1);
assert_eq!(b.tail_seq, tail);
// Its release is the existing below-tail no-op.
b.release(1, "");

// seq 1 aliases onto the same ring bucket as seqs 1024..1088; had
// the acquire been counted, the tail would stall there forever
// (head - tail stays under the lag threshold, so no force-evict).
b.release(1000, "");
b.acquire(1600);
assert!(b.tail_seq > 1024, "tail stalled at {} on a phantom holder", b.tail_seq);
}

#[test]
fn buckets_release_advances_tail_when_head_is_ahead() {
let mut b = Buckets::new(64, 1024);
Expand Down
115 changes: 108 additions & 7 deletions crates/common_macros/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
//! Proc-macro support for the silver crates. Currently exposes
//! `#[timed]`, which wraps a function body in a thread-local flux
//! `Timer` so processing time is emitted to a per-function shmem
//! queue. Storage and Drop-side recording live in
//! `silver_metrics`; this crate is only the attribute-macro
//! glue.
//! Proc-macro support for the silver crates. Exposes `#[timed]` (wraps a
//! function body in a thread-local flux `Timer` so processing time is
//! emitted to a per-function shmem queue) and `#[perf]` (same shape, but
//! records instructions retired + CPU cycles via rdpmc onto `perf-{fn}`
//! queues). Storage and Drop-side recording live in `silver_metrics`;
//! this crate is only the attribute-macro glue.

use proc_macro::TokenStream;
use quote::quote;
use syn::{ItemFn, LitStr, parse_macro_input};
use syn::{
Ident, ItemFn, LitInt, LitStr, Token,
parse::{Parse, ParseStream},
parse_macro_input,
};

/// Wrap a function body in a per-function thread-local flux `Timer`.
///
Expand Down Expand Up @@ -47,3 +51,100 @@ pub fn timed(attr: TokenStream, item: TokenStream) -> TokenStream {

expanded.into()
}

/// Arguments to `#[perf]`: an optional name literal, then an optional
/// `sample = N`.
struct PerfArgs {
name: Option<LitStr>,
sample: u64,
}

impl Parse for PerfArgs {
fn parse(input: ParseStream) -> syn::Result<Self> {
let mut name = None;
let mut sample = 1u64;
if input.peek(LitStr) {
name = Some(input.parse()?);
if input.peek(Token![,]) {
input.parse::<Token![,]>()?;
}
}
if !input.is_empty() {
let ident: Ident = input.parse()?;
if ident != "sample" {
return Err(syn::Error::new(ident.span(), "expected `sample = N`"));
}
input.parse::<Token![=]>()?;
let lit: LitInt = input.parse()?;
sample = lit.base10_parse()?;
if sample == 0 {
return Err(syn::Error::new(lit.span(), "sample must be >= 1"));
}
}
Ok(Self { name, sample })
}
}

/// Record instructions retired + CPU cycles for a function via a
/// Drop guard, emitted per call onto the `perf-{name}` shmem queue.
///
/// Naming matches `#[timed]`: default is
/// `concat!(module_path!(), "::", fn_name)`; pass a string literal to
/// override (`#[perf("custom_name")]`).
///
/// `sample = N` measures one in N calls (per thread, via a
/// thread-local call counter) — use on hot paths where two rdpmc
/// reads per call is too much. Default 1 = every call. Combinable:
/// `#[perf("custom_name", sample = 1000)]`.
///
/// Collection requires the `perf` feature on `silver_metrics` (rdpmc)
/// and `kernel.perf_event_paranoid <= 2` at runtime; otherwise the
/// guard is inert.
#[proc_macro_attribute]
pub fn perf(attr: TokenStream, item: TokenStream) -> TokenStream {
let input = parse_macro_input!(item as ItemFn);
let args = parse_macro_input!(attr as PerfArgs);

let name_expr = match &args.name {
Some(lit) => {
let s = lit.value();
quote! { #s }
}
None => {
let func_name_str = input.sig.ident.to_string();
quote! { concat!(module_path!(), "::", #func_name_str) }
}
};

let ItemFn { attrs, vis, sig, block } = input;

let guard = if args.sample <= 1 {
quote! {
let __perf_guard = ::silver_metrics::PerfGuard::new(
#name_expr,
);
}
} else {
let sample = args.sample;
quote! {
::std::thread_local! {
static __PERF_SKIP: ::core::cell::Cell<u64> =
const { ::core::cell::Cell::new(0) };
}
let __perf_guard = __PERF_SKIP.with(|c| {
let n = c.get();
c.set(n.wrapping_add(1));
(n % #sample == 0).then(|| ::silver_metrics::PerfGuard::new(#name_expr))
});
}
};

let expanded = quote! {
#(#attrs)* #vis #sig {
#guard
#block
}
};

expanded.into()
}
11 changes: 11 additions & 0 deletions crates/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ flux.workspace = true
libc.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing = { workspace = true, optional = true }

[lints]
workspace = true

[features]
# #[perf] collection: per-function instructions-retired + CPU cycles via
# rdpmc (hw_counter), emitted on perf-{fn} queues. Needs
# kernel.perf_event_paranoid <= 2 at runtime (<= 1 to include kernel-mode
# work in the counts):
# sudo sysctl kernel.perf_event_paranoid=1
# Disabled with a warn otherwise. `PerfSample` and the #[perf] macro
# compile regardless — collection alone is gated.
perf = ["dep:tracing"]
Loading
Loading