diff --git a/crates/wavekat-core/src/audio.rs b/crates/wavekat-core/src/audio.rs index 92b4242..8804525 100644 --- a/crates/wavekat-core/src/audio.rs +++ b/crates/wavekat-core/src/audio.rs @@ -125,10 +125,7 @@ impl AudioFrame<'_> { /// ``` pub fn resample(&self, target_rate: u32) -> Result, crate::CoreError> { use rubato::audioadapter_buffers::direct::InterleavedSlice; - use rubato::{ - Async, FixedAsync, Resampler, SincInterpolationParameters, SincInterpolationType, - WindowFunction, - }; + use rubato::Resampler; if self.sample_rate == target_rate { return Ok(self.clone().into_owned()); @@ -138,24 +135,11 @@ impl AudioFrame<'_> { return Ok(AudioFrame::from_vec(Vec::new(), target_rate)); } - let ratio = target_rate as f64 / self.sample_rate as f64; let nbr_input_frames = self.samples.len(); - - let params = SincInterpolationParameters { - sinc_len: 256, - f_cutoff: 0.95, - interpolation: SincInterpolationType::Cubic, - oversampling_factor: 128, - window: WindowFunction::BlackmanHarris2, - }; - // Match chunk size to input when shorter than the default — avoids // wasting work padding a 160-sample G.711 frame up to 1024 samples. let chunk_size = nbr_input_frames.min(1024); - - let mut resampler = - Async::::new_sinc(ratio, 1.0, ¶ms, chunk_size, 1, FixedAsync::Input) - .map_err(|e| crate::CoreError::Audio(e.to_string()))?; + let mut resampler = build_sinc_resampler(self.sample_rate, target_rate, chunk_size)?; // Ask rubato exactly how much output space `process_all_into_buffer` // needs — it accounts for the per-chunk pad-up, the resampler's @@ -177,6 +161,201 @@ impl AudioFrame<'_> { } } +/// Shared rubato builder used by both [`AudioFrame::resample`] and +/// [`StreamingResampler`]. Keeps the sinc parameters (and the version +/// bumps that come with rubato API churn) in one place. +#[cfg(feature = "resample")] +fn build_sinc_resampler( + source_rate: u32, + target_rate: u32, + chunk_size: usize, +) -> Result, crate::CoreError> { + use rubato::{ + Async, FixedAsync, SincInterpolationParameters, SincInterpolationType, WindowFunction, + }; + + if source_rate == 0 || target_rate == 0 { + return Err(crate::CoreError::Audio( + "sample rate must be non-zero".into(), + )); + } + if chunk_size == 0 { + return Err(crate::CoreError::Audio( + "chunk_size must be non-zero".into(), + )); + } + + let params = SincInterpolationParameters { + sinc_len: 256, + f_cutoff: 0.95, + interpolation: SincInterpolationType::Cubic, + oversampling_factor: 128, + window: WindowFunction::BlackmanHarris2, + }; + let ratio = target_rate as f64 / source_rate as f64; + Async::::new_sinc(ratio, 1.0, ¶ms, chunk_size, 1, FixedAsync::Input) + .map_err(|e| crate::CoreError::Audio(e.to_string())) +} + +/// Stateful streaming resampler. +/// +/// [`AudioFrame::resample`] is convenient but constructs a fresh rubato +/// resampler per call. For real-time pipelines that hand the resampler +/// short frames (e.g. 20 ms G.711 packets off an RTP socket) the per-call +/// resampler has no state to carry across frame boundaries, and sinc +/// reconstruction produces audible edge artifacts at the frame rate — +/// 50 Hz for 20 ms packets, perceived as continuous noise/buzz over the +/// voice. `StreamingResampler` builds rubato once at stream open and +/// reuses its internal filter state for every call, so output samples +/// stitch together cleanly. +/// +/// Build it with [`StreamingResampler::new`], then call +/// [`process`](Self::process) for each arriving block of audio. Samples +/// accumulate inside the resampler until a full `chunk_size` is ready, +/// then a chunk's worth of output is appended to the caller's buffer. +/// +/// If `source_rate == target_rate`, `process` becomes a pure copy and +/// `chunk_size` is ignored. +/// +/// # Example +/// +/// ``` +/// use wavekat_core::StreamingResampler; +/// +/// // 8 kHz → 44.1 kHz, 160-sample input chunks (matches 20 ms G.711). +/// let mut resampler = StreamingResampler::new(8000, 44100, 160).unwrap(); +/// +/// let mut out = Vec::new(); +/// for _packet in 0..5 { +/// let input = vec![0.0f32; 160]; // 20 ms of silence per packet +/// resampler.process(&input, &mut out).unwrap(); +/// } +/// // Five 160-sample inputs at 8 kHz expand to roughly 5 × 882 samples +/// // at 44.1 kHz (the exact count depends on rubato's edge handling). +/// assert!(out.len() > 4000); +/// ``` +#[cfg(feature = "resample")] +pub struct StreamingResampler { + // `None` when source_rate == target_rate (pass-through fast path). + inner: Option>, + source_rate: u32, + target_rate: u32, + chunk_size: usize, + // Accumulates partial input across calls until we have `chunk_size` + // samples for the next rubato step. + input_buf: Vec, + // Reusable scratch sized to `output_frames_max()` so we don't + // re-allocate on every chunk. + output_buf: Vec, +} + +#[cfg(feature = "resample")] +impl StreamingResampler { + /// Build a streaming resampler. + /// + /// `chunk_size` is how many input samples are processed per internal + /// rubato step. Match it to the natural arrival size of your input + /// — e.g. 160 for 20 ms G.711 frames at 8 kHz. Smaller chunks mean + /// lower latency; larger chunks are marginally more efficient. + /// + /// Returns [`CoreError::Audio`] if the resampler cannot be built + /// (zero rate, zero chunk size, or rubato rejects the ratio). + pub fn new( + source_rate: u32, + target_rate: u32, + chunk_size: usize, + ) -> Result { + if source_rate == target_rate { + // Pass-through still validates the rates so calling code + // can't smuggle a zero rate past us. + if source_rate == 0 { + return Err(crate::CoreError::Audio( + "sample rate must be non-zero".into(), + )); + } + return Ok(Self { + inner: None, + source_rate, + target_rate, + chunk_size, + input_buf: Vec::new(), + output_buf: Vec::new(), + }); + } + + let inner = build_sinc_resampler(source_rate, target_rate, chunk_size)?; + let out_max = { + use rubato::Resampler; + inner.output_frames_max() + }; + Ok(Self { + inner: Some(inner), + source_rate, + target_rate, + chunk_size, + input_buf: Vec::with_capacity(chunk_size), + output_buf: vec![0.0; out_max], + }) + } + + /// Source sample rate this resampler was built for. + pub fn source_rate(&self) -> u32 { + self.source_rate + } + + /// Target sample rate this resampler emits. + pub fn target_rate(&self) -> u32 { + self.target_rate + } + + /// Input chunk size — how many samples per internal step. + pub fn chunk_size(&self) -> usize { + self.chunk_size + } + + /// Resample `input` and append the output samples to `out`. + /// + /// Input is buffered internally until a full `chunk_size` has been + /// received; partial chunks remain buffered until the next call. + /// State is carried across calls so there are no boundary artifacts + /// — feeding two adjacent 160-sample chunks is equivalent to + /// feeding one 320-sample chunk (modulo the resampler's group + /// delay, paid once at the start of the stream). + pub fn process(&mut self, input: &[f32], out: &mut Vec) -> Result<(), crate::CoreError> { + let Some(inner) = self.inner.as_mut() else { + out.extend_from_slice(input); + return Ok(()); + }; + use rubato::audioadapter_buffers::direct::InterleavedSlice; + use rubato::Resampler; + + let mut remaining = input; + while !remaining.is_empty() { + let need = self.chunk_size - self.input_buf.len(); + let take = need.min(remaining.len()); + self.input_buf.extend_from_slice(&remaining[..take]); + remaining = &remaining[take..]; + + if self.input_buf.len() < self.chunk_size { + break; + } + + let in_adapter = InterleavedSlice::new(&self.input_buf[..], 1, self.chunk_size) + .map_err(|e| crate::CoreError::Audio(e.to_string()))?; + let out_buf_len = self.output_buf.len(); + let mut out_adapter = + InterleavedSlice::new_mut(&mut self.output_buf[..], 1, out_buf_len) + .map_err(|e| crate::CoreError::Audio(e.to_string()))?; + let (_in_used, out_produced) = inner + .process_into_buffer(&in_adapter, &mut out_adapter, None) + .map_err(|e| crate::CoreError::Audio(e.to_string()))?; + out.extend_from_slice(&self.output_buf[..out_produced]); + self.input_buf.clear(); + } + Ok(()) + } +} + #[cfg(feature = "wav")] impl AudioFrame<'_> { /// Write this frame to a WAV file at `path`. @@ -560,4 +739,194 @@ mod tests { "expected ~{freq} Hz, measured {measured_freq} Hz" ); } + + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_same_rate_is_passthrough() { + // No-op short-circuit: no resampler is built, no work is done, + // samples pass through verbatim. Guards against accidentally + // putting a same-rate stream through rubato (which adds group + // delay we don't want). + use crate::StreamingResampler; + let mut r = StreamingResampler::new(16000, 16000, 160).unwrap(); + let input = vec![0.1, -0.2, 0.3, -0.4]; + let mut out = Vec::new(); + r.process(&input, &mut out).unwrap(); + assert_eq!(out, input); + } + + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_accessors_report_construction_args() { + use crate::StreamingResampler; + let r = StreamingResampler::new(8000, 44100, 160).unwrap(); + assert_eq!(r.source_rate(), 8000); + assert_eq!(r.target_rate(), 44100); + assert_eq!(r.chunk_size(), 160); + } + + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_short_input_chunked_calls() { + // The exact shape `wavekat-voice`'s RTP receive path drives: + // repeated 160-sample inputs at 8 kHz → 44.1 kHz. Each call + // produces ~882 output samples; total over N calls is ~N × 882 + // (the first chunk may emit slightly less while rubato fills + // its filter delay). + use crate::StreamingResampler; + let mut r = StreamingResampler::new(8000, 44100, 160).unwrap(); + let mut out = Vec::new(); + for _ in 0..10 { + let input = vec![0.0f32; 160]; + r.process(&input, &mut out).unwrap(); + } + // 10 × 160 input @ 8k = 200 ms; @ 44.1k that's ~8820 samples. + // Allow generous tolerance for rubato's initial transient. + let expected = (10 * 160 * 44100 / 8000) as i64; + let actual = out.len() as i64; + assert!( + (actual - expected).unsigned_abs() < 2000, + "expected ~{expected} samples, got {actual}" + ); + } + + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_buffers_across_partial_calls() { + // Splitting an input across two `process` calls must produce + // the same output as one big call. Catches a regression where + // partial input is dropped on the floor instead of buffered. + use crate::StreamingResampler; + let input: Vec = (0..320).map(|i| (i as f32) * 0.01).collect(); + + let mut split_out = Vec::new(); + let mut r1 = StreamingResampler::new(8000, 16000, 160).unwrap(); + r1.process(&input[..50], &mut split_out).unwrap(); + // No full chunk yet — buffered. + assert!(split_out.is_empty(), "no output before a full chunk"); + r1.process(&input[50..], &mut split_out).unwrap(); + + let mut whole_out = Vec::new(); + let mut r2 = StreamingResampler::new(8000, 16000, 160).unwrap(); + r2.process(&input, &mut whole_out).unwrap(); + + assert_eq!( + split_out.len(), + whole_out.len(), + "split call must produce same number of samples as one-shot" + ); + // The samples themselves must match too — same rubato state + // either way. + for (i, (a, b)) in split_out.iter().zip(whole_out.iter()).enumerate() { + assert!( + (a - b).abs() < 1e-6, + "split vs whole differ at {i}: {a} vs {b}" + ); + } + } + + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_avoids_per_frame_edge_artifacts() { + // The motivating regression: a stateless per-call resampler + // (i.e. `AudioFrame::resample` invoked on each 160-sample + // chunk) produces edge artifacts at every chunk boundary, + // because rubato assumes silence before t=0 and after t=N for + // each isolated chunk — sinc reconstruction near the edges + // sees an abrupt step. + // + // We don't compare against a reference signal (group-delay + // offsets across approaches make sample-index alignment + // unreliable). Instead we check the output's own *smoothness*: + // a band-limited signal at the input rate, upsampled, produces + // a band-limited output, so consecutive-sample deltas are + // bounded by `2π × freq / sr_out`. Edge artifacts show up as + // spikes in that consecutive delta — much larger than the + // smooth bound. + use crate::StreamingResampler; + let sr_in: u32 = 8000; + let sr_out: u32 = 44100; + let chunks = 30; + let chunk_size = 160; + + // Mid-band sine that exercises the sinc filter without + // touching the anti-aliasing edge. + let freq = 600.0_f32; + let signal: Vec = (0..chunks * chunk_size) + .map(|i| (2.0 * std::f32::consts::PI * freq * i as f32 / sr_in as f32).sin()) + .collect(); + + // Streaming: state carried across calls. + let mut streaming = StreamingResampler::new(sr_in, sr_out, chunk_size).unwrap(); + let mut streaming_out: Vec = Vec::new(); + for c in 0..chunks { + streaming + .process( + &signal[c * chunk_size..(c + 1) * chunk_size], + &mut streaming_out, + ) + .unwrap(); + } + + // Stateless per-chunk: fresh resampler every call (the bug). + let mut stateless_out: Vec = Vec::new(); + for c in 0..chunks { + let chunk = + AudioFrame::from_vec(signal[c * chunk_size..(c + 1) * chunk_size].to_vec(), sr_in); + let resampled = chunk.resample(sr_out).unwrap(); + stateless_out.extend_from_slice(resampled.samples()); + } + + // Skip the initial group-delay transient and the trailing + // tail; we want steady-state behavior. + let skip = 1500; + let tail = 500; + + // Smooth bound: for a 600 Hz sine sampled at 44.1 kHz, the + // maximum delta between adjacent samples is ~ 2π × 600 / 44100 + // ≈ 0.085. Allow generous headroom (4×) before we call a delta + // "spiky." + let expected_max_delta = 2.0 * std::f32::consts::PI * freq / sr_out as f32; + let spike_threshold = expected_max_delta * 4.0; + + let count_spikes = |samples: &[f32], skip: usize, tail: usize| -> usize { + samples[skip..samples.len() - tail] + .windows(2) + .filter(|w| (w[1] - w[0]).abs() > spike_threshold) + .count() + }; + + let streaming_spikes = count_spikes(&streaming_out, skip, tail); + let stateless_spikes = count_spikes(&stateless_out, skip, tail); + + // Streaming output should be smooth: essentially zero spikes + // in steady state. + assert!( + streaming_spikes < 10, + "streaming output should be smooth, found {streaming_spikes} sample-delta spikes (threshold {spike_threshold})" + ); + // Stateless per-chunk output should have many spikes — one + // per chunk boundary, at minimum. We have ~25 chunks in the + // compared range, so expect at least 25 spikes. + assert!( + stateless_spikes > streaming_spikes * 5, + "stateless per-chunk should have far more spikes than streaming; got stateless={stateless_spikes}, streaming={streaming_spikes}" + ); + } + + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_rejects_zero_rate() { + use crate::StreamingResampler; + assert!(StreamingResampler::new(0, 16000, 160).is_err()); + assert!(StreamingResampler::new(16000, 0, 160).is_err()); + assert!(StreamingResampler::new(0, 0, 160).is_err()); + } + + #[cfg(feature = "resample")] + #[test] + fn streaming_resampler_rejects_zero_chunk_size() { + use crate::StreamingResampler; + assert!(StreamingResampler::new(8000, 16000, 0).is_err()); + } } diff --git a/crates/wavekat-core/src/lib.rs b/crates/wavekat-core/src/lib.rs index f0d22af..bfe3ba1 100644 --- a/crates/wavekat-core/src/lib.rs +++ b/crates/wavekat-core/src/lib.rs @@ -29,6 +29,8 @@ pub mod audio_io; pub mod codec; mod error; +#[cfg(feature = "resample")] +pub use audio::StreamingResampler; pub use audio::{AudioFrame, IntoSamples}; pub use audio_io::{AudioSink, AudioSource}; pub use error::CoreError;