From ae35ead5aef5ad3038e32e64736fe7a955ac37dd Mon Sep 17 00:00:00 2001 From: Eason WaveKat Date: Thu, 14 May 2026 18:14:41 +1200 Subject: [PATCH 1/3] feat: add StreamingResampler for real-time pipelines AudioFrame::resample builds a fresh stateless rubato per call. For real-time consumers that feed it short frames in sequence (20 ms G.711 packets off an RTP socket are the motivating case), each call re-pays the sinc reconstruction's edge cost: it sees an abrupt step from zero to the chunk's first sample, smears the result, and produces audible boundary artifacts at the frame rate (50 Hz for 20 ms packets). Real playback of resampled telephony audio sounds like continuous noise/buzz over the voice. StreamingResampler builds rubato once with `new(source, target, chunk_size)` and reuses its internal filter state for every `process` call. State carries across calls, so the only edge cost is paid once at the start of the stream. The sinc parameters and rubato construction are factored into a `build_sinc_resampler` helper shared with `AudioFrame::resample`, so the two paths can't drift on quality settings. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/wavekat-core/src/audio.rs | 398 +++++++++++++++++++++++++++++-- crates/wavekat-core/src/lib.rs | 2 + 2 files changed, 382 insertions(+), 18 deletions(-) diff --git a/crates/wavekat-core/src/audio.rs b/crates/wavekat-core/src/audio.rs index 92b4242..69a4001 100644 --- a/crates/wavekat-core/src/audio.rs +++ b/crates/wavekat-core/src/audio.rs @@ -125,10 +125,7 @@ impl AudioFrame<'_> { /// ``` pub fn resample(&self, target_rate: u32) -> Result, crate::CoreError> { use rubato::audioadapter_buffers::direct::InterleavedSlice; - use rubato::{ - Async, FixedAsync, Resampler, SincInterpolationParameters, SincInterpolationType, - WindowFunction, - }; + use rubato::Resampler; if self.sample_rate == target_rate { return Ok(self.clone().into_owned()); @@ -138,24 +135,11 @@ impl AudioFrame<'_> { return Ok(AudioFrame::from_vec(Vec::new(), target_rate)); } - let ratio = target_rate as f64 / self.sample_rate as f64; let nbr_input_frames = self.samples.len(); - - let params = SincInterpolationParameters { - sinc_len: 256, - f_cutoff: 0.95, - interpolation: SincInterpolationType::Cubic, - oversampling_factor: 128, - window: WindowFunction::BlackmanHarris2, - }; - // Match chunk size to input when shorter than the default — avoids // wasting work padding a 160-sample G.711 frame up to 1024 samples. let chunk_size = nbr_input_frames.min(1024); - - let mut resampler = - Async::::new_sinc(ratio, 1.0, ¶ms, chunk_size, 1, FixedAsync::Input) - .map_err(|e| crate::CoreError::Audio(e.to_string()))?; + let mut resampler = build_sinc_resampler(self.sample_rate, target_rate, chunk_size)?; // Ask rubato exactly how much output space `process_all_into_buffer` // needs — it accounts for the per-chunk pad-up, the resampler's @@ -177,6 +161,201 @@ impl AudioFrame<'_> { } } +/// Shared rubato builder used by both [`AudioFrame::resample`] and +/// [`StreamingResampler`]. Keeps the sinc parameters (and the version +/// bumps that come with rubato API churn) in one place. +#[cfg(feature = "resample")] +fn build_sinc_resampler( + source_rate: u32, + target_rate: u32, + chunk_size: usize, +) -> Result, crate::CoreError> { + use rubato::{ + Async, FixedAsync, SincInterpolationParameters, SincInterpolationType, WindowFunction, + }; + + if source_rate == 0 || target_rate == 0 { + return Err(crate::CoreError::Audio( + "sample rate must be non-zero".into(), + )); + } + if chunk_size == 0 { + return Err(crate::CoreError::Audio( + "chunk_size must be non-zero".into(), + )); + } + + let params = SincInterpolationParameters { + sinc_len: 256, + f_cutoff: 0.95, + interpolation: SincInterpolationType::Cubic, + oversampling_factor: 128, + window: WindowFunction::BlackmanHarris2, + }; + let ratio = target_rate as f64 / source_rate as f64; + Async::::new_sinc(ratio, 1.0, ¶ms, chunk_size, 1, FixedAsync::Input) + .map_err(|e| crate::CoreError::Audio(e.to_string())) +} + +/// Stateful streaming resampler. +/// +/// [`AudioFrame::resample`] is convenient but constructs a fresh rubato +/// resampler per call. For real-time pipelines that hand the resampler +/// short frames (e.g. 20 ms G.711 packets off an RTP socket) the per-call +/// resampler has no state to carry across frame boundaries, and sinc +/// reconstruction produces audible edge artifacts at the frame rate — +/// 50 Hz for 20 ms packets, perceived as continuous noise/buzz over the +/// voice. `StreamingResampler` builds rubato once at stream open and +/// reuses its internal filter state for every call, so output samples +/// stitch together cleanly. +/// +/// Build it with [`StreamingResampler::new`], then call +/// [`process`](Self::process) for each arriving block of audio. Samples +/// accumulate inside the resampler until a full `chunk_size` is ready, +/// then a chunk's worth of output is appended to the caller's buffer. +/// +/// If `source_rate == target_rate`, `process` becomes a pure copy and +/// `chunk_size` is ignored. +/// +/// # Example +/// +/// ``` +/// use wavekat_core::StreamingResampler; +/// +/// // 8 kHz → 44.1 kHz, 160-sample input chunks (matches 20 ms G.711). +/// let mut resampler = StreamingResampler::new(8000, 44100, 160).unwrap(); +/// +/// let mut out = Vec::new(); +/// for _packet in 0..5 { +/// let input = vec![0.0f32; 160]; // 20 ms of silence per packet +/// resampler.process(&input, &mut out).unwrap(); +/// } +/// // Five 160-sample inputs at 8 kHz expand to roughly 5 × 882 samples +/// // at 44.1 kHz (the exact count depends on rubato's edge handling). +/// assert!(out.len() > 4000); +/// ``` +#[cfg(feature = "resample")] +pub struct StreamingResampler { + // `None` when source_rate == target_rate (pass-through fast path). + inner: Option>, + source_rate: u32, + target_rate: u32, + chunk_size: usize, + // Accumulates partial input across calls until we have `chunk_size` + // samples for the next rubato step. + input_buf: Vec, + // Reusable scratch sized to `output_frames_max()` so we don't + // re-allocate on every chunk. + output_buf: Vec, +} + +#[cfg(feature = "resample")] +impl StreamingResampler { + /// Build a streaming resampler. + /// + /// `chunk_size` is how many input samples are processed per internal + /// rubato step. Match it to the natural arrival size of your input + /// — e.g. 160 for 20 ms G.711 frames at 8 kHz. Smaller chunks mean + /// lower latency; larger chunks are marginally more efficient. + /// + /// Returns [`CoreError::Audio`] if the resampler cannot be built + /// (zero rate, zero chunk size, or rubato rejects the ratio). + pub fn new( + source_rate: u32, + target_rate: u32, + chunk_size: usize, + ) -> Result { + if source_rate == target_rate { + // Pass-through still validates the rates so calling code + // can't smuggle a zero rate past us. + if source_rate == 0 { + return Err(crate::CoreError::Audio( + "sample rate must be non-zero".into(), + )); + } + return Ok(Self { + inner: None, + source_rate, + target_rate, + chunk_size, + input_buf: Vec::new(), + output_buf: Vec::new(), + }); + } + + let inner = build_sinc_resampler(source_rate, target_rate, chunk_size)?; + let out_max = { + use rubato::Resampler; + inner.output_frames_max() + }; + Ok(Self { + inner: Some(inner), + source_rate, + target_rate, + chunk_size, + input_buf: Vec::with_capacity(chunk_size), + output_buf: vec![0.0; out_max], + }) + } + + /// Source sample rate this resampler was built for. + pub fn source_rate(&self) -> u32 { + self.source_rate + } + + /// Target sample rate this resampler emits. + pub fn target_rate(&self) -> u32 { + self.target_rate + } + + /// Input chunk size — how many samples per internal step. + pub fn chunk_size(&self) -> usize { + self.chunk_size + } + + /// Resample `input` and append the output samples to `out`. + /// + /// Input is buffered internally until a full `chunk_size` has been + /// received; partial chunks remain buffered until the next call. + /// State is carried across calls so there are no boundary artifacts + /// — feeding two adjacent 160-sample chunks is equivalent to + /// feeding one 320-sample chunk (modulo the resampler's group + /// delay, paid once at the start of the stream). + pub fn process(&mut self, input: &[f32], out: &mut Vec) -> Result<(), crate::CoreError> { + let Some(inner) = self.inner.as_mut() else { + out.extend_from_slice(input); + return Ok(()); + }; + use rubato::audioadapter_buffers::direct::InterleavedSlice; + use rubato::Resampler; + + let mut remaining = input; + while !remaining.is_empty() { + let need = self.chunk_size - self.input_buf.len(); + let take = need.min(remaining.len()); + self.input_buf.extend_from_slice(&remaining[..take]); + remaining = &remaining[take..]; + + if self.input_buf.len() < self.chunk_size { + break; + } + + let in_adapter = InterleavedSlice::new(&self.input_buf[..], 1, self.chunk_size) + .map_err(|e| crate::CoreError::Audio(e.to_string()))?; + let out_buf_len = self.output_buf.len(); + let mut out_adapter = + InterleavedSlice::new_mut(&mut self.output_buf[..], 1, out_buf_len) + .map_err(|e| crate::CoreError::Audio(e.to_string()))?; + let (_in_used, out_produced) = inner + .process_into_buffer(&in_adapter, &mut out_adapter, None) + .map_err(|e| crate::CoreError::Audio(e.to_string()))?; + out.extend_from_slice(&self.output_buf[..out_produced]); + self.input_buf.clear(); + } + Ok(()) + } +} + #[cfg(feature = "wav")] impl AudioFrame<'_> { /// Write this frame to a WAV file at `path`. @@ -560,4 +739,187 @@ mod tests { "expected ~{freq} Hz, measured {measured_freq} Hz" ); } + + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_same_rate_is_passthrough() { + // No-op short-circuit: no resampler is built, no work is done, + // samples pass through verbatim. Guards against accidentally + // putting a same-rate stream through rubato (which adds group + // delay we don't want). + use crate::StreamingResampler; + let mut r = StreamingResampler::new(16000, 16000, 160).unwrap(); + let input = vec![0.1, -0.2, 0.3, -0.4]; + let mut out = Vec::new(); + r.process(&input, &mut out).unwrap(); + assert_eq!(out, input); + } + + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_short_input_chunked_calls() { + // The exact shape `wavekat-voice`'s RTP receive path drives: + // repeated 160-sample inputs at 8 kHz → 44.1 kHz. Each call + // produces ~882 output samples; total over N calls is ~N × 882 + // (the first chunk may emit slightly less while rubato fills + // its filter delay). + use crate::StreamingResampler; + let mut r = StreamingResampler::new(8000, 44100, 160).unwrap(); + let mut out = Vec::new(); + for _ in 0..10 { + let input = vec![0.0f32; 160]; + r.process(&input, &mut out).unwrap(); + } + // 10 × 160 input @ 8k = 200 ms; @ 44.1k that's ~8820 samples. + // Allow generous tolerance for rubato's initial transient. + let expected = (10 * 160 * 44100 / 8000) as i64; + let actual = out.len() as i64; + assert!( + (actual - expected).unsigned_abs() < 2000, + "expected ~{expected} samples, got {actual}" + ); + } + + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_buffers_across_partial_calls() { + // Splitting an input across two `process` calls must produce + // the same output as one big call. Catches a regression where + // partial input is dropped on the floor instead of buffered. + use crate::StreamingResampler; + let input: Vec = (0..320).map(|i| (i as f32) * 0.01).collect(); + + let mut split_out = Vec::new(); + let mut r1 = StreamingResampler::new(8000, 16000, 160).unwrap(); + r1.process(&input[..50], &mut split_out).unwrap(); + // No full chunk yet — buffered. + assert!(split_out.is_empty(), "no output before a full chunk"); + r1.process(&input[50..], &mut split_out).unwrap(); + + let mut whole_out = Vec::new(); + let mut r2 = StreamingResampler::new(8000, 16000, 160).unwrap(); + r2.process(&input, &mut whole_out).unwrap(); + + assert_eq!( + split_out.len(), + whole_out.len(), + "split call must produce same number of samples as one-shot" + ); + // The samples themselves must match too — same rubato state + // either way. + for (i, (a, b)) in split_out.iter().zip(whole_out.iter()).enumerate() { + assert!( + (a - b).abs() < 1e-6, + "split vs whole differ at {i}: {a} vs {b}" + ); + } + } + + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_avoids_per_frame_edge_artifacts() { + // The motivating regression: a stateless per-call resampler + // (i.e. `AudioFrame::resample` invoked on each 160-sample + // chunk) produces edge artifacts at every chunk boundary, + // because rubato assumes silence before t=0 and after t=N for + // each isolated chunk — sinc reconstruction near the edges + // sees an abrupt step. + // + // We don't compare against a reference signal (group-delay + // offsets across approaches make sample-index alignment + // unreliable). Instead we check the output's own *smoothness*: + // a band-limited signal at the input rate, upsampled, produces + // a band-limited output, so consecutive-sample deltas are + // bounded by `2π × freq / sr_out`. Edge artifacts show up as + // spikes in that consecutive delta — much larger than the + // smooth bound. + use crate::StreamingResampler; + let sr_in: u32 = 8000; + let sr_out: u32 = 44100; + let chunks = 30; + let chunk_size = 160; + + // Mid-band sine that exercises the sinc filter without + // touching the anti-aliasing edge. + let freq = 600.0_f32; + let signal: Vec = (0..chunks * chunk_size) + .map(|i| (2.0 * std::f32::consts::PI * freq * i as f32 / sr_in as f32).sin()) + .collect(); + + // Streaming: state carried across calls. + let mut streaming = StreamingResampler::new(sr_in, sr_out, chunk_size).unwrap(); + let mut streaming_out: Vec = Vec::new(); + for c in 0..chunks { + streaming + .process( + &signal[c * chunk_size..(c + 1) * chunk_size], + &mut streaming_out, + ) + .unwrap(); + } + + // Stateless per-chunk: fresh resampler every call (the bug). + let mut stateless_out: Vec = Vec::new(); + for c in 0..chunks { + let chunk = + AudioFrame::from_vec(signal[c * chunk_size..(c + 1) * chunk_size].to_vec(), sr_in); + let resampled = chunk.resample(sr_out).unwrap(); + stateless_out.extend_from_slice(resampled.samples()); + } + + // Skip the initial group-delay transient and the trailing + // tail; we want steady-state behavior. + let skip = 1500; + let tail = 500; + + // Smooth bound: for a 600 Hz sine sampled at 44.1 kHz, the + // maximum delta between adjacent samples is ~ 2π × 600 / 44100 + // ≈ 0.085. Allow generous headroom (4×) before we call a delta + // "spiky." + let expected_max_delta = 2.0 * std::f32::consts::PI * freq / sr_out as f32; + let spike_threshold = expected_max_delta * 4.0; + + let count_spikes = |samples: &[f32], skip: usize, tail: usize| -> usize { + if samples.len() < skip + tail { + return 0; + } + samples[skip..samples.len() - tail] + .windows(2) + .filter(|w| (w[1] - w[0]).abs() > spike_threshold) + .count() + }; + + let streaming_spikes = count_spikes(&streaming_out, skip, tail); + let stateless_spikes = count_spikes(&stateless_out, skip, tail); + + // Streaming output should be smooth: essentially zero spikes + // in steady state. + assert!( + streaming_spikes < 10, + "streaming output should be smooth, found {streaming_spikes} sample-delta spikes (threshold {spike_threshold})" + ); + // Stateless per-chunk output should have many spikes — one + // per chunk boundary, at minimum. We have ~25 chunks in the + // compared range, so expect at least 25 spikes. + assert!( + stateless_spikes > streaming_spikes * 5, + "stateless per-chunk should have far more spikes than streaming; got stateless={stateless_spikes}, streaming={streaming_spikes}" + ); + } + + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_rejects_zero_rate() { + use crate::StreamingResampler; + assert!(StreamingResampler::new(0, 16000, 160).is_err()); + assert!(StreamingResampler::new(16000, 0, 160).is_err()); + assert!(StreamingResampler::new(0, 0, 160).is_err()); + } + + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_rejects_zero_chunk_size() { + use crate::StreamingResampler; + assert!(StreamingResampler::new(8000, 16000, 0).is_err()); + } } diff --git a/crates/wavekat-core/src/lib.rs b/crates/wavekat-core/src/lib.rs index f0d22af..bfe3ba1 100644 --- a/crates/wavekat-core/src/lib.rs +++ b/crates/wavekat-core/src/lib.rs @@ -29,6 +29,8 @@ pub mod audio_io; pub mod codec; mod error; +#[cfg(feature = "resample")] +pub use audio::StreamingResampler; pub use audio::{AudioFrame, IntoSamples}; pub use audio_io::{AudioSink, AudioSource}; pub use error::CoreError; From efe8c7dc72c3e320665cf51e6ec7eb3681b5ebb2 Mon Sep 17 00:00:00 2001 From: Eason WaveKat Date: Thu, 14 May 2026 18:21:43 +1200 Subject: [PATCH 2/3] test: cover StreamingResampler accessors Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/wavekat-core/src/audio.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/crates/wavekat-core/src/audio.rs b/crates/wavekat-core/src/audio.rs index 69a4001..7afe308 100644 --- a/crates/wavekat-core/src/audio.rs +++ b/crates/wavekat-core/src/audio.rs @@ -755,6 +755,16 @@ mod tests { assert_eq!(out, input); } + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_accessors_report_construction_args() { + use crate::StreamingResampler; + let r = StreamingResampler::new(8000, 44100, 160).unwrap(); + assert_eq!(r.source_rate(), 8000); + assert_eq!(r.target_rate(), 44100); + assert_eq!(r.chunk_size(), 160); + } + #[cfg(feature = "resample")] #[test] fn streaming_resampler_short_input_chunked_calls() { From 4c51ba03a279e5204c5840ddd07bb579c219ac98 Mon Sep 17 00:00:00 2001 From: Eason WaveKat Date: Thu, 14 May 2026 18:26:13 +1200 Subject: [PATCH 3/3] test: drop unreachable guard in spike-counter helper Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/wavekat-core/src/audio.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/wavekat-core/src/audio.rs b/crates/wavekat-core/src/audio.rs index 7afe308..8804525 100644 --- a/crates/wavekat-core/src/audio.rs +++ b/crates/wavekat-core/src/audio.rs @@ -890,9 +890,6 @@ mod tests { let spike_threshold = expected_max_delta * 4.0; let count_spikes = |samples: &[f32], skip: usize, tail: usize| -> usize { - if samples.len() < skip + tail { - return 0; - } samples[skip..samples.len() - tail] .windows(2) .filter(|w| (w[1] - w[0]).abs() > spike_threshold)