Skip to content

LiveKit Receiving Logic#21

Merged
blstmo merged 10 commits intofluxerjs:mainfrom
lucasconnellm:feat/livekit-receive-all-phases
Feb 22, 2026
Merged

LiveKit Receiving Logic#21
blstmo merged 10 commits intofluxerjs:mainfrom
lucasconnellm:feat/livekit-receive-all-phases

Conversation

@lucasconnellm
Copy link
Contributor

@lucasconnellm lucasconnellm commented Feb 21, 2026

Description

Adds inbound LiveKit receive support to @fluxerjs/voice so consumers can subscribe to participant audio and build speech pipelines (e.g. STT + agent + TTS loops).

This is focused on enabling voice receive parity for downstream integrations while keeping existing playback behavior intact.

Type of change

  • Bug fix
  • New feature
  • Breaking change
  • Documentation update

Checklist

  • My code follows the project's style guidelines (run pnpm run lint)
  • I have run pnpm run build successfully
  • I have run pnpm run test successfully

Summary by CodeRabbit

  • New Features

    • Inbound audio receive APIs for LiveKit participants
    • New lifecycle events: speakerStart, speakerStop, and audioFrame
    • Helpers to list channel participants and subscribe to participant audio
    • Public types for audio frames and receive subscriptions
  • Documentation

    • Updated examples for transcription-oriented inbound-audio usage
  • Tests

    • Added tests for inbound audio receive and subscription behaviors
  • Chores

    • Minor package version bump for voice package

@vercel
Copy link

vercel bot commented Feb 21, 2026

@apachecara is attempting to deploy a commit to the blstmo's projects Team on Vercel.

A member of the Team first needs to authorize it.

@coderabbitai
Copy link

coderabbitai bot commented Feb 21, 2026

No actionable comments were generated in the recent review. 🎉


📝 Walkthrough

Walkthrough

Adds LiveKit inbound audio receive: per-participant receive subscriptions, audioFrame events carrying PCM samples, speakerStart/speakerStop events, VoiceManager helpers to list/subscribe channel participants, tests, README example, and two exported types for frames and subscriptions.

Changes

Cohort / File(s) Summary
Changelog & Docs
\.changeset/mean-crabs-learn.md, packages/voice/README.md
Added changeset and README example showing LiveKitRtcConnection-based inbound transcription flow, usage of speakerStart/speakerStop/audioFrame handlers, participant subscription and cleanup.
LiveKit RTC connection
packages/voice/src/LiveKitRtcConnection.ts
Implemented receive-side audio: new events (speakerStart, speakerStop, audioFrame), types (LiveKitAudioFrame, LiveKitReceiveSubscription), per-participant subscription tracking, subscribeParticipantTrack/subscribeParticipantAudio, room event wiring (TrackSubscribed, TrackUnsubscribed, ParticipantDisconnected, ActiveSpeakersChanged), RECEIVE_READ_TIMEOUT_MS, and clearReceiveSubscriptions on stop.
VoiceManager helpers
packages/voice/src/VoiceManager.ts
Added listParticipantsInChannel(guildId, channelId) and subscribeChannelParticipants(channelId, opts?) to enumerate voice channel members and batch-subscribe inbound audio via LiveKit connections.
Public types export
packages/voice/src/index.ts
Re-exported type LiveKitAudioFrame and type LiveKitReceiveSubscription.
Tests
packages/voice/src/LiveKitRtcConnection.receive.test.ts, packages/voice/src/VoiceManager.receive.test.ts
New unit tests for subscribeParticipantAudio behavior and VoiceManager participant listing / batch subscription logic.

Sequence Diagram

sequenceDiagram
    participant App as Application
    participant VoiceMgr as VoiceManager
    participant LiveKitConn as LiveKitRtcConnection
    participant Room as LiveKit Room
    participant Remote as RemoteParticipant/Track

    App->>VoiceMgr: subscribeChannelParticipants(channelId)
    VoiceMgr->>VoiceMgr: listParticipantsInChannel(guildId, channelId)
    VoiceMgr->>LiveKitConn: subscribeParticipantAudio(participantId)
    LiveKitConn->>Room: (autoSubscribe: true) receive TrackSubscribed
    Room-->>LiveKitConn: TrackSubscribed / Remote Track
    Remote->>LiveKitConn: audio frames (samples)
    LiveKitConn->>LiveKitConn: emit audioFrame {participantId, samples, sampleRate, channels, trackSid}
    LiveKitConn-->>App: audioFrame event
    Room->>LiveKitConn: ActiveSpeakersChanged
    LiveKitConn->>LiveKitConn: emit speakerStart / speakerStop
    App->>LiveKitConn: subscription.stop()
    LiveKitConn->>Room: unsubscribe / cleanup
    Room-->>LiveKitConn: TrackUnsubscribed / ParticipantDisconnected
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 I twitched my nose and tuned my ear,
Subscriptions hummed and frames drew near,
Speakers wake, then softly stop,
PCM hops in rhythmic hop,
A rabbit cheers — receive, don't drop.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'LiveKit Receiving Logic' accurately summarizes the main change: adding inbound LiveKit receive support with participant audio subscription capabilities.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/voice/src/LiveKitRtcConnection.ts`:
- Around line 387-423: subscribeParticipantAudio currently doesn't prevent
automatic audio work because TrackSubscribed always calls
subscribeParticipantTrack with autoSubscribe:true; update the logic to track
explicit subscription intent (e.g., store a flag in this.receiveSubscriptions or
a new this.requestedSubscriptions set keyed by participantId inside
subscribeParticipantAudio and unsubscribe flows) and modify the
TrackSubscribed/subscribeParticipantTrack codepath to only start
decoding/processing (the "pump") when the participantId is present in that
intent store or when autoSubscribe is deliberately enabled; also ensure
unsubscribe respects options.autoResubscribe by clearing the intent store only
when autoResubscribe is false so that automatic re-subscription behavior is
preserved.

Comment on lines +387 to +423
subscribeParticipantAudio(
participantId: string,
options: { autoResubscribe?: boolean } = {},
): LiveKitReceiveSubscription {
const stop = () => {
this.receiveSubscriptions.get(participantId)?.stop();
this.receiveSubscriptions.delete(participantId);
this.participantTrackSids.delete(participantId);
};

const room = this.room;
if (!room || !room.isConnected) return { participantId, stop };

const participant = room.remoteParticipants.get(participantId);
if (!participant) return { participantId, stop };

for (const pub of participant.trackPublications.values()) {
const maybeTrack = (pub as { track?: RemoteTrack }).track;
if (maybeTrack && this.isAudioTrack(maybeTrack)) {
this.subscribeParticipantTrack(participant, maybeTrack);
break;
}
}

if (!options.autoResubscribe && !this.receiveSubscriptions.has(participantId)) {
return { participantId, stop };
}

return { participantId, stop };
}

private clearReceiveSubscriptions(): void {
for (const sub of this.receiveSubscriptions.values()) sub.stop();
this.receiveSubscriptions.clear();
this.participantTrackSids.clear();
this.activeSpeakers.clear();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

subscribeParticipantAudio doesn’t actually gate subscriptions.

Because TrackSubscribed always calls subscribeParticipantTrack (and autoSubscribe: true is set), inbound audio decoding starts for every participant even when no subscription was requested. This makes subscribeParticipantAudio and autoResubscribe effectively no-ops and can drive unexpected CPU usage + audioFrame emissions in large channels.

Consider tracking explicit subscription intent and only starting the pump for those participants (and honor autoResubscribe on unsubscribe):

🔧 Suggested gating approach
@@
-  private readonly receiveSubscriptions = new Map<string, LiveKitReceiveSubscription>();
+  private readonly receiveSubscriptions = new Map<string, LiveKitReceiveSubscription>();
+  private readonly receiveIntents = new Map<string, { autoResubscribe: boolean }>();
@@
   subscribeParticipantAudio(
     participantId: string,
     options: { autoResubscribe?: boolean } = {},
   ): LiveKitReceiveSubscription {
+    this.receiveIntents.set(participantId, {
+      autoResubscribe: options.autoResubscribe ?? true,
+    });
     const stop = () => {
+      this.receiveIntents.delete(participantId);
       this.receiveSubscriptions.get(participantId)?.stop();
       this.receiveSubscriptions.delete(participantId);
       this.participantTrackSids.delete(participantId);
     };
@@
       room.on(RoomEvent.TrackSubscribed, (track, _publication, participant) => {
         if (!this.isAudioTrack(track)) return;
+        const participantId = this.getParticipantId(participant);
+        if (!this.receiveIntents.has(participantId)) return;
         this.subscribeParticipantTrack(participant, track);
       });
@@
       room.on(RoomEvent.TrackUnsubscribed, (track, _publication, participant) => {
         if (!this.isAudioTrack(track)) return;
         const participantId = this.getParticipantId(participant);
         this.receiveSubscriptions.get(participantId)?.stop();
         this.receiveSubscriptions.delete(participantId);
         this.participantTrackSids.delete(participantId);
+        if (!this.receiveIntents.get(participantId)?.autoResubscribe) {
+          this.receiveIntents.delete(participantId);
+        }
       });
@@
   private clearReceiveSubscriptions(): void {
     for (const sub of this.receiveSubscriptions.values()) sub.stop();
     this.receiveSubscriptions.clear();
     this.participantTrackSids.clear();
     this.activeSpeakers.clear();
+    this.receiveIntents.clear();
   }

Also applies to: 473-512

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/voice/src/LiveKitRtcConnection.ts` around lines 387 - 423,
subscribeParticipantAudio currently doesn't prevent automatic audio work because
TrackSubscribed always calls subscribeParticipantTrack with autoSubscribe:true;
update the logic to track explicit subscription intent (e.g., store a flag in
this.receiveSubscriptions or a new this.requestedSubscriptions set keyed by
participantId inside subscribeParticipantAudio and unsubscribe flows) and modify
the TrackSubscribed/subscribeParticipantTrack codepath to only start
decoding/processing (the "pump") when the participantId is present in that
intent store or when autoSubscribe is deliberately enabled; also ensure
unsubscribe respects options.autoResubscribe by clearing the intent store only
when autoResubscribe is false so that automatic re-subscription behavior is
preserved.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/voice/src/LiveKitRtcConnection.ts`:
- Around line 393-423: subscribeParticipantAudio currently clears the
subscription intent too early and stop() doesn't always clear intent; ensure you
set requestedSubscriptions.set(participantId, autoResubscribe) up front and do
not delete that intent when there is no track at subscribe time so a later
TrackSubscribed can trigger subscribeParticipantTrack; also update stop() to
always remove requestedSubscriptions.delete(participantId) (regardless of
autoResubscribe) so calling stop fully cancels intent; finally confirm the
TrackSubscribed handler checks requestedSubscriptions before resurrecting
subscriptions.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
packages/voice/src/LiveKitRtcConnection.ts (1)

361-380: Async pump loop may hold reference after cancellation.

The pump() function reads from audioStream.getReader() in an infinite loop. If audioStream.cancel() throws or the reader doesn't properly signal done, the loop could hang indefinitely. The stopped flag only prevents emission but doesn't break the reader wait.

Consider adding a timeout or ensuring the reader is released:

♻️ Optional: Add reader cleanup in stop()
+    let reader: ReturnType<typeof audioStream.getReader> | null = null;
+
     const pump = async () => {
       try {
-        const reader = audioStream.getReader();
+        reader = audioStream.getReader();
         while (!stopped) {
           const { done, value } = await reader.read();
           if (done || !value) break;
           // ...
         }
       } catch (err) {
         // ...
+      } finally {
+        reader = null;
       }
     };

     const stop = () => {
       stopped = true;
+      // Release reader if pump is awaiting read()
+      try { reader?.releaseLock(); } catch { /* ignore */ }
       audioStream.cancel().catch(() => {});
       this.receiveSubscriptions.delete(participantId);
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/voice/src/LiveKitRtcConnection.ts` around lines 361 - 380, The
pump() async loop can hang waiting on reader.read() after cancellation; update
the code to ensure the ReadableStreamDefaultReader is always released and read()
doesn't block indefinitely by (1) storing the reader returned from
audioStream.getReader() in a variable visible to stop(), (2) calling
reader.cancel() and reader.releaseLock() from the stop() path (or wrapping them
in try/catch) and (3) adding a short read timeout or Promise.race around
reader.read() inside pump() so the loop can observe the stopped flag and exit;
refer to pump, audioStream.getReader, stopped, and implement reader cleanup in
the stop() logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@packages/voice/src/LiveKitRtcConnection.ts`:
- Around line 393-421: subscribeParticipantAudio now correctly sets
requestedSubscriptions before returning and stop() always clears it, so keep the
current flow but remove the duplicate review marker; ensure the TrackSubscribed
handler still guards on requestedSubscriptions (the handler referenced as
TrackSubscribed) and that subscribeParticipantAudio, stop,
requestedSubscriptions, and participantTrackSids remain consistent
(requestedSubscriptions.set(participantId, autoResubscribe) occurs before any
early return and stop() deletes requestedSubscriptions), then remove the stray
"[duplicate_comment]" / duplicate review annotation from the PR text.

---

Nitpick comments:
In `@packages/voice/src/LiveKitRtcConnection.ts`:
- Around line 361-380: The pump() async loop can hang waiting on reader.read()
after cancellation; update the code to ensure the ReadableStreamDefaultReader is
always released and read() doesn't block indefinitely by (1) storing the reader
returned from audioStream.getReader() in a variable visible to stop(), (2)
calling reader.cancel() and reader.releaseLock() from the stop() path (or
wrapping them in try/catch) and (3) adding a short read timeout or Promise.race
around reader.read() inside pump() so the loop can observe the stopped flag and
exit; refer to pump, audioStream.getReader, stopped, and implement reader
cleanup in the stop() logic.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/voice/src/LiveKitRtcConnection.ts`:
- Around line 344-415: subscribeParticipantTrack contains empty catch blocks
inside the pump finally and stop functions (around the reader.releaseLock and
reader.cancel calls) which violate ESLint no-empty; update those catch blocks to
include minimal handling such as logging the error or a deliberate no-op
comment/statement. Specifically, in subscribeParticipantTrack modify the
anonymous catch for reader.releaseLock() in the pump finally and the catch after
reader.cancel() in stop to either call this.emit('error', err instanceof Error ?
err : new Error(String(err))) or at minimum add a no-op like void 0 or a short
comment to satisfy the rule while keeping behavior unchanged; ensure you
reference the reader variable and the stop/pump closures when making the change.

@blstmo
Copy link
Contributor

blstmo commented Feb 22, 2026

@CodeRabbit review

@coderabbitai
Copy link

coderabbitai bot commented Feb 22, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@blstmo blstmo merged commit e34c8cb into fluxerjs:main Feb 22, 2026
1 of 2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants