Conversation
|
@apachecara is attempting to deploy a commit to the blstmo's projects Team on Vercel. A member of the Team first needs to authorize it. |
|
No actionable comments were generated in the recent review. 🎉 📝 WalkthroughWalkthroughAdds LiveKit inbound audio receive: per-participant receive subscriptions, audioFrame events carrying PCM samples, speakerStart/speakerStop events, VoiceManager helpers to list/subscribe channel participants, tests, README example, and two exported types for frames and subscriptions. Changes
Sequence DiagramsequenceDiagram
participant App as Application
participant VoiceMgr as VoiceManager
participant LiveKitConn as LiveKitRtcConnection
participant Room as LiveKit Room
participant Remote as RemoteParticipant/Track
App->>VoiceMgr: subscribeChannelParticipants(channelId)
VoiceMgr->>VoiceMgr: listParticipantsInChannel(guildId, channelId)
VoiceMgr->>LiveKitConn: subscribeParticipantAudio(participantId)
LiveKitConn->>Room: (autoSubscribe: true) receive TrackSubscribed
Room-->>LiveKitConn: TrackSubscribed / Remote Track
Remote->>LiveKitConn: audio frames (samples)
LiveKitConn->>LiveKitConn: emit audioFrame {participantId, samples, sampleRate, channels, trackSid}
LiveKitConn-->>App: audioFrame event
Room->>LiveKitConn: ActiveSpeakersChanged
LiveKitConn->>LiveKitConn: emit speakerStart / speakerStop
App->>LiveKitConn: subscription.stop()
LiveKitConn->>Room: unsubscribe / cleanup
Room-->>LiveKitConn: TrackUnsubscribed / ParticipantDisconnected
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/voice/src/LiveKitRtcConnection.ts`:
- Around line 387-423: subscribeParticipantAudio currently doesn't prevent
automatic audio work because TrackSubscribed always calls
subscribeParticipantTrack with autoSubscribe:true; update the logic to track
explicit subscription intent (e.g., store a flag in this.receiveSubscriptions or
a new this.requestedSubscriptions set keyed by participantId inside
subscribeParticipantAudio and unsubscribe flows) and modify the
TrackSubscribed/subscribeParticipantTrack codepath to only start
decoding/processing (the "pump") when the participantId is present in that
intent store or when autoSubscribe is deliberately enabled; also ensure
unsubscribe respects options.autoResubscribe by clearing the intent store only
when autoResubscribe is false so that automatic re-subscription behavior is
preserved.
| subscribeParticipantAudio( | ||
| participantId: string, | ||
| options: { autoResubscribe?: boolean } = {}, | ||
| ): LiveKitReceiveSubscription { | ||
| const stop = () => { | ||
| this.receiveSubscriptions.get(participantId)?.stop(); | ||
| this.receiveSubscriptions.delete(participantId); | ||
| this.participantTrackSids.delete(participantId); | ||
| }; | ||
|
|
||
| const room = this.room; | ||
| if (!room || !room.isConnected) return { participantId, stop }; | ||
|
|
||
| const participant = room.remoteParticipants.get(participantId); | ||
| if (!participant) return { participantId, stop }; | ||
|
|
||
| for (const pub of participant.trackPublications.values()) { | ||
| const maybeTrack = (pub as { track?: RemoteTrack }).track; | ||
| if (maybeTrack && this.isAudioTrack(maybeTrack)) { | ||
| this.subscribeParticipantTrack(participant, maybeTrack); | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| if (!options.autoResubscribe && !this.receiveSubscriptions.has(participantId)) { | ||
| return { participantId, stop }; | ||
| } | ||
|
|
||
| return { participantId, stop }; | ||
| } | ||
|
|
||
| private clearReceiveSubscriptions(): void { | ||
| for (const sub of this.receiveSubscriptions.values()) sub.stop(); | ||
| this.receiveSubscriptions.clear(); | ||
| this.participantTrackSids.clear(); | ||
| this.activeSpeakers.clear(); | ||
| } |
There was a problem hiding this comment.
subscribeParticipantAudio doesn’t actually gate subscriptions.
Because TrackSubscribed always calls subscribeParticipantTrack (and autoSubscribe: true is set), inbound audio decoding starts for every participant even when no subscription was requested. This makes subscribeParticipantAudio and autoResubscribe effectively no-ops and can drive unexpected CPU usage + audioFrame emissions in large channels.
Consider tracking explicit subscription intent and only starting the pump for those participants (and honor autoResubscribe on unsubscribe):
🔧 Suggested gating approach
@@
- private readonly receiveSubscriptions = new Map<string, LiveKitReceiveSubscription>();
+ private readonly receiveSubscriptions = new Map<string, LiveKitReceiveSubscription>();
+ private readonly receiveIntents = new Map<string, { autoResubscribe: boolean }>();
@@
subscribeParticipantAudio(
participantId: string,
options: { autoResubscribe?: boolean } = {},
): LiveKitReceiveSubscription {
+ this.receiveIntents.set(participantId, {
+ autoResubscribe: options.autoResubscribe ?? true,
+ });
const stop = () => {
+ this.receiveIntents.delete(participantId);
this.receiveSubscriptions.get(participantId)?.stop();
this.receiveSubscriptions.delete(participantId);
this.participantTrackSids.delete(participantId);
};
@@
room.on(RoomEvent.TrackSubscribed, (track, _publication, participant) => {
if (!this.isAudioTrack(track)) return;
+ const participantId = this.getParticipantId(participant);
+ if (!this.receiveIntents.has(participantId)) return;
this.subscribeParticipantTrack(participant, track);
});
@@
room.on(RoomEvent.TrackUnsubscribed, (track, _publication, participant) => {
if (!this.isAudioTrack(track)) return;
const participantId = this.getParticipantId(participant);
this.receiveSubscriptions.get(participantId)?.stop();
this.receiveSubscriptions.delete(participantId);
this.participantTrackSids.delete(participantId);
+ if (!this.receiveIntents.get(participantId)?.autoResubscribe) {
+ this.receiveIntents.delete(participantId);
+ }
});
@@
private clearReceiveSubscriptions(): void {
for (const sub of this.receiveSubscriptions.values()) sub.stop();
this.receiveSubscriptions.clear();
this.participantTrackSids.clear();
this.activeSpeakers.clear();
+ this.receiveIntents.clear();
}Also applies to: 473-512
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/voice/src/LiveKitRtcConnection.ts` around lines 387 - 423,
subscribeParticipantAudio currently doesn't prevent automatic audio work because
TrackSubscribed always calls subscribeParticipantTrack with autoSubscribe:true;
update the logic to track explicit subscription intent (e.g., store a flag in
this.receiveSubscriptions or a new this.requestedSubscriptions set keyed by
participantId inside subscribeParticipantAudio and unsubscribe flows) and modify
the TrackSubscribed/subscribeParticipantTrack codepath to only start
decoding/processing (the "pump") when the participantId is present in that
intent store or when autoSubscribe is deliberately enabled; also ensure
unsubscribe respects options.autoResubscribe by clearing the intent store only
when autoResubscribe is false so that automatic re-subscription behavior is
preserved.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/voice/src/LiveKitRtcConnection.ts`:
- Around line 393-423: subscribeParticipantAudio currently clears the
subscription intent too early and stop() doesn't always clear intent; ensure you
set requestedSubscriptions.set(participantId, autoResubscribe) up front and do
not delete that intent when there is no track at subscribe time so a later
TrackSubscribed can trigger subscribeParticipantTrack; also update stop() to
always remove requestedSubscriptions.delete(participantId) (regardless of
autoResubscribe) so calling stop fully cancels intent; finally confirm the
TrackSubscribed handler checks requestedSubscriptions before resurrecting
subscriptions.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
packages/voice/src/LiveKitRtcConnection.ts (1)
361-380: Async pump loop may hold reference after cancellation.The
pump()function reads fromaudioStream.getReader()in an infinite loop. IfaudioStream.cancel()throws or the reader doesn't properly signaldone, the loop could hang indefinitely. Thestoppedflag only prevents emission but doesn't break the reader wait.Consider adding a timeout or ensuring the reader is released:
♻️ Optional: Add reader cleanup in stop()
+ let reader: ReturnType<typeof audioStream.getReader> | null = null; + const pump = async () => { try { - const reader = audioStream.getReader(); + reader = audioStream.getReader(); while (!stopped) { const { done, value } = await reader.read(); if (done || !value) break; // ... } } catch (err) { // ... + } finally { + reader = null; } }; const stop = () => { stopped = true; + // Release reader if pump is awaiting read() + try { reader?.releaseLock(); } catch { /* ignore */ } audioStream.cancel().catch(() => {}); this.receiveSubscriptions.delete(participantId); };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/voice/src/LiveKitRtcConnection.ts` around lines 361 - 380, The pump() async loop can hang waiting on reader.read() after cancellation; update the code to ensure the ReadableStreamDefaultReader is always released and read() doesn't block indefinitely by (1) storing the reader returned from audioStream.getReader() in a variable visible to stop(), (2) calling reader.cancel() and reader.releaseLock() from the stop() path (or wrapping them in try/catch) and (3) adding a short read timeout or Promise.race around reader.read() inside pump() so the loop can observe the stopped flag and exit; refer to pump, audioStream.getReader, stopped, and implement reader cleanup in the stop() logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@packages/voice/src/LiveKitRtcConnection.ts`:
- Around line 393-421: subscribeParticipantAudio now correctly sets
requestedSubscriptions before returning and stop() always clears it, so keep the
current flow but remove the duplicate review marker; ensure the TrackSubscribed
handler still guards on requestedSubscriptions (the handler referenced as
TrackSubscribed) and that subscribeParticipantAudio, stop,
requestedSubscriptions, and participantTrackSids remain consistent
(requestedSubscriptions.set(participantId, autoResubscribe) occurs before any
early return and stop() deletes requestedSubscriptions), then remove the stray
"[duplicate_comment]" / duplicate review annotation from the PR text.
---
Nitpick comments:
In `@packages/voice/src/LiveKitRtcConnection.ts`:
- Around line 361-380: The pump() async loop can hang waiting on reader.read()
after cancellation; update the code to ensure the ReadableStreamDefaultReader is
always released and read() doesn't block indefinitely by (1) storing the reader
returned from audioStream.getReader() in a variable visible to stop(), (2)
calling reader.cancel() and reader.releaseLock() from the stop() path (or
wrapping them in try/catch) and (3) adding a short read timeout or Promise.race
around reader.read() inside pump() so the loop can observe the stopped flag and
exit; refer to pump, audioStream.getReader, stopped, and implement reader
cleanup in the stop() logic.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/voice/src/LiveKitRtcConnection.ts`:
- Around line 344-415: subscribeParticipantTrack contains empty catch blocks
inside the pump finally and stop functions (around the reader.releaseLock and
reader.cancel calls) which violate ESLint no-empty; update those catch blocks to
include minimal handling such as logging the error or a deliberate no-op
comment/statement. Specifically, in subscribeParticipantTrack modify the
anonymous catch for reader.releaseLock() in the pump finally and the catch after
reader.cancel() in stop to either call this.emit('error', err instanceof Error ?
err : new Error(String(err))) or at minimum add a no-op like void 0 or a short
comment to satisfy the rule while keeping behavior unchanged; ensure you
reference the reader variable and the stop/pump closures when making the change.
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
Description
Adds inbound LiveKit receive support to @fluxerjs/voice so consumers can subscribe to participant audio and build speech pipelines (e.g. STT + agent + TTS loops).
This is focused on enabling voice receive parity for downstream integrations while keeping existing playback behavior intact.
Type of change
Checklist
pnpm run lint)pnpm run buildsuccessfullypnpm run testsuccessfullySummary by CodeRabbit
New Features
Documentation
Tests
Chores