Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Provide the common audio primitives so that all WaveKat crates speak the same la
- Common constants (sample rates, format standards)
- Shared error types (only when genuinely shared)
- Format normalisation operations on `AudioFrame` (i16→f32, sample-rate conversion) — gated behind optional features
- The `AudioSource` / `AudioSink` traits — the producer/consumer seam any audio pipeline composes against. Concrete impls (cpal mic, AI agent, RTP receive) live in their consuming crates; the trait shape lives here so they all interoperate.
- Telephony codecs (`codec::g711`: PCMU + PCMA). Codecs are consumer-layer for `wavekat-sip` (which stays codec-agnostic), but multiple downstream crates need them — keep one canonical implementation here.

## What Does NOT Belong Here

Expand Down Expand Up @@ -59,7 +61,11 @@ wavekat-core/
│ └── wavekat-core/ # library crate
│ ├── src/
│ │ ├── lib.rs # public API, re-exports
│ │ ├── audio.rs # AudioFrame, IntoSamples, resample
│ │ ├── audio.rs # AudioFrame, IntoSamples, resample, wav
│ │ ├── audio_io.rs # AudioSource / AudioSink traits
│ │ ├── codec/ # telephony codecs (g711 today)
│ │ │ ├── mod.rs
│ │ │ └── g711.rs # PCMU + PCMA encode/decode, G711Codec enum
│ │ └── error.rs # CoreError
│ └── Cargo.toml
├── docs/ # design documents
Expand Down
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Shared types for the WaveKat audio processing ecosystem.
|------|-------------|
| `AudioFrame` | Audio samples with sample rate, accepts `i16` and `f32` in slice, Vec, or array form |
| `IntoSamples` | Trait for transparent sample format conversion |
| `AudioSource` / `AudioSink` | Async producer/consumer traits — the seam every WaveKat audio pipeline composes against |
| `codec::g711` | G.711 μ-law (PCMU) and A-law (PCMA) — telephony codecs for SIP/RTP |

## Quick Start

Expand Down Expand Up @@ -65,6 +67,63 @@ Your audio (any format)
+---> wavekat-asr (future)
```

## Audio Sources and Sinks

`AudioSource` and `AudioSink` are the producer/consumer seam every WaveKat
audio pipeline composes against. Concrete impls (cpal-backed mic/speaker,
agent-driven sources, RTP-driven sinks, …) live in the consuming crates so
that adding a new producer or consumer is "implement the trait" rather than
"rewrite the RTP path."

```rust
use wavekat_core::{AudioFrame, AudioSink, AudioSource};

struct FileSource { /* … */ }
struct RtpSink { /* … */ }

impl AudioSource for FileSource {
async fn next_frame(&mut self) -> Option<AudioFrame<'static>> {
// None signals end-of-stream; callers stop draining.
todo!()
}
}

impl AudioSink for RtpSink {
async fn write_frame(&mut self, frame: AudioFrame<'_>) {
// Implementations may drop on backpressure rather than block —
// stalling the RTP path is worse than dropping a frame.
todo!()
}
}
```

## G.711 Telephony Codec

PCMU (μ-law) and PCMA (A-law) — the two static codecs every SIP endpoint
speaks. One 16-bit PCM sample ↔ one 8-bit codeword; a 20 ms RTP frame at
8 kHz is 160 samples / 160 bytes.

```rust
use wavekat_core::codec::g711::{
G711Codec, G711_FRAME_SAMPLES, G711_SAMPLE_RATE,
};

// Resolve the codec from a SIP/RTP payload type.
let codec = G711Codec::from_payload_type(0).unwrap(); // 0 = PCMU, 8 = PCMA

// Encode a 20 ms frame of PCM into G.711 bytes.
let pcm: Vec<i16> = vec![0; G711_FRAME_SAMPLES];
let mut bytes = Vec::with_capacity(G711_FRAME_SAMPLES);
codec.encode(&pcm, &mut bytes);
assert_eq!(bytes.len(), G711_FRAME_SAMPLES);

// Decode the other direction.
let mut decoded = Vec::with_capacity(bytes.len());
codec.decode(&bytes, &mut decoded);

assert_eq!(G711_SAMPLE_RATE, 8000);
```

## Optional Features

### `wav`
Expand Down
1 change: 1 addition & 0 deletions crates/wavekat-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ hound = { version = "3.5", optional = true }
rubato = { version = "2.0", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
184 changes: 184 additions & 0 deletions crates/wavekat-core/src/audio_io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
//! Audio source/sink traits.
//!
//! These are the seam the WaveKat audio pipeline is drawn against:
//! whatever produces audio (a microphone, a TTS engine, a WAV file)
//! implements [`AudioSource`]; whatever consumes it (a speaker, an
//! RTP encoder, an ASR worker) implements [`AudioSink`]. Concrete
//! impls live in the consuming crates — cpal-backed mic/speaker in
//! `wavekat-voice`, a future agent-driven impl in `wavekat-agent`,
//! and so on — so that adding a new producer or consumer is "implement
//! the trait" rather than "rewrite the RTP path."
//!
//! The traits speak in [`AudioFrame<'static>`]: sample-rate-tagged
//! frames so consumers can resample to whatever rate the codec wants
//! without either side of the trait having to know the codec exists.

use core::future::Future;

use crate::AudioFrame;

/// Produces owned [`AudioFrame`]s. `next_frame().await` returns the
/// next frame when one is available, or `None` once the source has
/// run out (file ended, device closed, dialogue terminated). Each
/// frame's [`AudioFrame::sample_rate`] is set by the implementation —
/// consumers resample as needed.
pub trait AudioSource: Send {
fn next_frame(&mut self) -> impl Future<Output = Option<AudioFrame<'static>>> + Send;
}

/// Consumes audio frames. Implementations may drop frames on
/// backpressure rather than block the caller; the alternative —
/// stalling — is worse on the RTP receive path, where it stalls the
/// whole pipeline.
pub trait AudioSink: Send {
fn write_frame(&mut self, frame: AudioFrame<'_>) -> impl Future<Output = ()> + Send;
}

#[cfg(test)]
mod tests {
use super::*;

/// In-memory sink: collects frames in a Vec.
#[derive(Default)]
struct VecSink {
frames: Vec<AudioFrame<'static>>,
}

impl AudioSink for VecSink {
async fn write_frame(&mut self, frame: AudioFrame<'_>) {
self.frames.push(frame.into_owned());
}
}

/// Source that yields a single frame then ends.
struct OnceSource {
frame: Option<AudioFrame<'static>>,
}

impl AudioSource for OnceSource {
async fn next_frame(&mut self) -> Option<AudioFrame<'static>> {
self.frame.take()
}
}

/// Source that drains a queue of pre-loaded frames, then signals
/// exhaustion with `None`. Mirrors what a file-backed or
/// agent-backed source looks like in practice.
struct QueueSource {
frames: std::collections::VecDeque<AudioFrame<'static>>,
}

impl AudioSource for QueueSource {
async fn next_frame(&mut self) -> Option<AudioFrame<'static>> {
self.frames.pop_front()
}
}

/// Sink that drops every frame past `cap`. Models the "drop on
/// backpressure" pattern the trait docs call out.
struct CapSink {
cap: usize,
frames: Vec<AudioFrame<'static>>,
dropped: usize,
}

impl AudioSink for CapSink {
async fn write_frame(&mut self, frame: AudioFrame<'_>) {
if self.frames.len() >= self.cap {
self.dropped += 1;
return;
}
self.frames.push(frame.into_owned());
}
}

#[tokio::test]
async fn traits_compose_end_to_end() {
let mut source = OnceSource {
frame: Some(AudioFrame::from_vec(vec![0.5, -0.5], 8000)),
};
let mut sink = VecSink::default();

let frame = source.next_frame().await.expect("frame");
sink.write_frame(frame).await;
assert!(source.next_frame().await.is_none());

assert_eq!(sink.frames.len(), 1);
assert_eq!(sink.frames[0].samples(), &[0.5, -0.5]);
assert_eq!(sink.frames[0].sample_rate(), 8000);
}

#[tokio::test]
async fn source_drains_in_order_then_returns_none() {
let mut source = QueueSource {
frames: vec![
AudioFrame::from_vec(vec![0.1], 8000),
AudioFrame::from_vec(vec![0.2], 8000),
AudioFrame::from_vec(vec![0.3], 8000),
]
.into(),
};
let mut sink = VecSink::default();
while let Some(f) = source.next_frame().await {
sink.write_frame(f).await;
}
assert_eq!(sink.frames.len(), 3);
assert_eq!(sink.frames[0].samples(), &[0.1]);
assert_eq!(sink.frames[1].samples(), &[0.2]);
assert_eq!(sink.frames[2].samples(), &[0.3]);

// Past exhaustion the source must keep returning None — callers
// rely on this to unblock their drain loop.
assert!(source.next_frame().await.is_none());
assert!(source.next_frame().await.is_none());
}

#[tokio::test]
async fn sink_with_capacity_drops_overflow() {
let mut sink = CapSink {
cap: 2,
frames: Vec::new(),
dropped: 0,
};
for i in 0..5 {
sink.write_frame(AudioFrame::from_vec(vec![i as f32], 8000))
.await;
}
assert_eq!(sink.frames.len(), 2);
assert_eq!(sink.dropped, 3);
// Cap policy is FIFO-keep / tail-drop here; the first two went
// through, the rest got dropped.
assert_eq!(sink.frames[0].samples(), &[0.0]);
assert_eq!(sink.frames[1].samples(), &[1.0]);
}

#[tokio::test]
async fn frame_sample_rate_round_trips_through_sink() {
// Different impls emit at their native rates — the sink must
// preserve `sample_rate` so the consumer can resample
// downstream without losing the original rate.
let mut sink = VecSink::default();
sink.write_frame(AudioFrame::from_vec(vec![0.0], 8000))
.await;
sink.write_frame(AudioFrame::from_vec(vec![0.0], 16_000))
.await;
sink.write_frame(AudioFrame::from_vec(vec![0.0], 48_000))
.await;
let rates: Vec<u32> = sink.frames.iter().map(|f| f.sample_rate()).collect();
assert_eq!(rates, vec![8000, 16_000, 48_000]);
}

/// Compile-time check that the trait bounds (the trait is `Send`,
/// and so are the returned futures) actually compose. If anything
/// drops the `Send` bound, this test stops compiling — which is
/// what we want, because the RTP path holds these impls across
/// `.await` points in a multi-threaded runtime.
#[test]
fn impls_are_send() {
fn assert_send<T: Send>(_: &T) {}
let source = OnceSource { frame: None };
let sink = VecSink::default();
assert_send(&source);
assert_send(&sink);
}
}
Loading
Loading