From d84120a846596d4488d9d5718e7d88c89d785037 Mon Sep 17 00:00:00 2001 From: Carapache Date: Sat, 21 Feb 2026 02:47:38 +0000 Subject: [PATCH 01/10] feat(voice): add livekit inbound receive events and subscriptions --- packages/voice/src/LiveKitRtcConnection.ts | 159 ++++++++++++++++++++- packages/voice/src/index.ts | 2 + 2 files changed, 160 insertions(+), 1 deletion(-) diff --git a/packages/voice/src/LiveKitRtcConnection.ts b/packages/voice/src/LiveKitRtcConnection.ts index 253af2d..09a9442 100644 --- a/packages/voice/src/LiveKitRtcConnection.ts +++ b/packages/voice/src/LiveKitRtcConnection.ts @@ -7,6 +7,7 @@ import { GatewayVoiceStateUpdateDispatchData, } from '@fluxerjs/types'; import { + AudioStream, Room, RoomEvent, AudioSource, @@ -18,6 +19,9 @@ import { VideoBufferType, VideoFrame, VideoSource, + type RemoteParticipant, + type RemoteTrack, + TrackKind, } from '@livekit/rtc-node'; import { buildLiveKitUrlForRtcSdk } from './livekit.js'; import { parseOpusPacketBoundaries, concatUint8Arrays } from './opusUtils.js'; @@ -177,6 +181,25 @@ export type LiveKitRtcConnectionEvents = VoiceConnectionEvents & { serverLeave: []; /** Emitted when voice state should be synced (self_stream/self_video). VoiceManager listens. */ requestVoiceStateSync: [payload: { self_stream?: boolean; self_video?: boolean }]; + /** Emitted when a remote participant starts speaking. */ + speakerStart: [payload: { participantId: string }]; + /** Emitted when a remote participant stops speaking. */ + speakerStop: [payload: { participantId: string }]; + /** Emitted for each decoded inbound audio frame. */ + audioFrame: [frame: LiveKitAudioFrame]; +}; + +export type LiveKitAudioFrame = { + participantId: string; + trackSid?: string; + sampleRate: number; + channels: number; + samples: Int16Array; +}; + +export type LiveKitReceiveSubscription = { + participantId: string; + stop: () => void; }; /** @@ -245,6 +268,9 @@ export class LiveKitRtcConnection extends EventEmitter { private lastServerEndpoint: string | null = null; private lastServerToken: string | null = null; private _disconnectEmitted = false; + private readonly receiveSubscriptions = new Map(); + private readonly participantTrackSids = new Map(); + private readonly activeSpeakers = new Set(); /** * @param client - The Fluxer client instance @@ -305,6 +331,97 @@ export class LiveKitRtcConnection extends EventEmitter { return this._volume ?? 100; } + private isAudioTrack(track: RemoteTrack): boolean { + return track.kind === TrackKind.KIND_AUDIO; + } + + private getParticipantId(participant: RemoteParticipant): string { + return participant.identity; + } + + private subscribeParticipantTrack(participant: RemoteParticipant, track: RemoteTrack): void { + if (!this.isAudioTrack(track)) return; + const participantId = this.getParticipantId(participant); + const current = this.receiveSubscriptions.get(participantId); + if (current) current.stop(); + + const audioStream = new AudioStream(track, { + sampleRate: SAMPLE_RATE, + numChannels: CHANNELS, + frameSizeMs: 10, + }); + let stopped = false; + + const pump = async () => { + try { + const reader = audioStream.getReader(); + while (!stopped) { + const { done, value } = await reader.read(); + if (done || !value) break; + this.emit('audioFrame', { + participantId, + trackSid: track.sid, + sampleRate: value.sampleRate, + channels: value.channels, + samples: value.data, + }); + } + } catch (err) { + if (!stopped) { + this.emit('error', err instanceof Error ? err : new Error(String(err))); + } + } + }; + + const stop = () => { + stopped = true; + audioStream.cancel().catch(() => {}); + this.receiveSubscriptions.delete(participantId); + }; + + this.receiveSubscriptions.set(participantId, { participantId, stop }); + this.participantTrackSids.set(participantId, track.sid ?? ''); + void pump(); + } + + subscribeParticipantAudio( + participantId: string, + options: { autoResubscribe?: boolean } = {}, + ): LiveKitReceiveSubscription { + const stop = () => { + this.receiveSubscriptions.get(participantId)?.stop(); + this.receiveSubscriptions.delete(participantId); + this.participantTrackSids.delete(participantId); + }; + + const room = this.room; + if (!room || !room.isConnected) return { participantId, stop }; + + const participant = room.remoteParticipants.get(participantId); + if (!participant) return { participantId, stop }; + + for (const pub of participant.trackPublications.values()) { + const maybeTrack = (pub as { track?: RemoteTrack }).track; + if (maybeTrack && this.isAudioTrack(maybeTrack)) { + this.subscribeParticipantTrack(participant, maybeTrack); + break; + } + } + + if (options.autoResubscribe === false && !this.receiveSubscriptions.has(participantId)) { + return { participantId, stop }; + } + + return { participantId, stop }; + } + + private clearReceiveSubscriptions(): void { + for (const sub of this.receiveSubscriptions.values()) sub.stop(); + this.receiveSubscriptions.clear(); + this.participantTrackSids.clear(); + this.activeSpeakers.clear(); + } + playOpus(_stream: NodeJS.ReadableStream): void { this.emit( 'error', @@ -353,7 +470,46 @@ export class LiveKitRtcConnection extends EventEmitter { this.debug('Room reconnected'); }); - await room.connect(url, token, { autoSubscribe: false, dynacast: false }); + room.on(RoomEvent.TrackSubscribed, (track, _publication, participant) => { + if (!this.isAudioTrack(track)) return; + this.subscribeParticipantTrack(participant, track); + }); + + room.on(RoomEvent.TrackUnsubscribed, (track, _publication, participant) => { + if (!this.isAudioTrack(track)) return; + const participantId = this.getParticipantId(participant); + this.receiveSubscriptions.get(participantId)?.stop(); + this.receiveSubscriptions.delete(participantId); + this.participantTrackSids.delete(participantId); + }); + + room.on(RoomEvent.ParticipantDisconnected, (participant) => { + const participantId = this.getParticipantId(participant); + this.receiveSubscriptions.get(participantId)?.stop(); + this.receiveSubscriptions.delete(participantId); + this.participantTrackSids.delete(participantId); + if (this.activeSpeakers.delete(participantId)) { + this.emit('speakerStop', { participantId }); + } + }); + + room.on(RoomEvent.ActiveSpeakersChanged, (speakers) => { + const next = new Set(speakers.map((speaker) => speaker.identity)); + for (const participantId of next) { + if (!this.activeSpeakers.has(participantId)) { + this.emit('speakerStart', { participantId }); + } + } + for (const participantId of this.activeSpeakers) { + if (!next.has(participantId)) { + this.emit('speakerStop', { participantId }); + } + } + this.activeSpeakers.clear(); + for (const participantId of next) this.activeSpeakers.add(participantId); + }); + + await room.connect(url, token, { autoSubscribe: true, dynacast: false }); this.lastServerEndpoint = raw; this.lastServerToken = token; this.debug('connected to room'); @@ -1523,6 +1679,7 @@ export class LiveKitRtcConnection extends EventEmitter { stop(): void { this._playing = false; this.stopVideo(); + this.clearReceiveSubscriptions(); if (this.currentStream?.destroy) this.currentStream.destroy(); this.currentStream = null; if (this.audioTrack) { diff --git a/packages/voice/src/index.ts b/packages/voice/src/index.ts index de5d9af..5509a73 100644 --- a/packages/voice/src/index.ts +++ b/packages/voice/src/index.ts @@ -3,6 +3,8 @@ export { VoiceConnection, type VoiceConnectionEvents } from './VoiceConnection.j export { LiveKitRtcConnection, type LiveKitRtcConnectionEvents, + type LiveKitAudioFrame, + type LiveKitReceiveSubscription, type VideoPlayOptions, } from './LiveKitRtcConnection.js'; import { Client } from '@fluxerjs/core'; From 992780404045342f40b9189133c88ebd83705d04 Mon Sep 17 00:00:00 2001 From: Carapache Date: Sat, 21 Feb 2026 02:47:38 +0000 Subject: [PATCH 02/10] feat(voice): add channel participant receive helpers --- packages/voice/src/VoiceManager.ts | 37 +++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/packages/voice/src/VoiceManager.ts b/packages/voice/src/VoiceManager.ts index 2555d37..c08e4d2 100644 --- a/packages/voice/src/VoiceManager.ts +++ b/packages/voice/src/VoiceManager.ts @@ -9,7 +9,10 @@ import { GatewayVoiceStateUpdateDispatchData, } from '@fluxerjs/types'; import { VoiceConnection } from './VoiceConnection.js'; -import { LiveKitRtcConnection } from './LiveKitRtcConnection.js'; +import { + LiveKitRtcConnection, + type LiveKitReceiveSubscription, +} from './LiveKitRtcConnection.js'; import { isLiveKitEndpoint } from './livekit.js'; import { Collection } from '@fluxerjs/collection'; @@ -93,6 +96,38 @@ export class VoiceManager extends EventEmitter { return guildMap.get(userId) ?? null; } + /** + * List participant user IDs currently in a specific voice channel. + */ + listParticipantsInChannel(guildId: string, channelId: string): string[] { + const guildMap = this.voiceStates.get(guildId); + if (!guildMap) return []; + const participants: string[] = []; + for (const [userId, voiceChannelId] of guildMap.entries()) { + if (voiceChannelId === channelId) participants.push(userId); + } + return participants; + } + + /** + * Subscribe to inbound audio for all known participants currently in a voice channel. + * Only supported for LiveKit connections. + */ + subscribeChannelParticipants( + channelId: string, + opts?: { autoResubscribe?: boolean }, + ): LiveKitReceiveSubscription[] { + const conn = this.connections.get(channelId); + if (!(conn instanceof LiveKitRtcConnection)) return []; + const guildId = conn.channel.guildId; + const participants = this.listParticipantsInChannel(guildId, channelId).filter( + (participantId) => participantId !== this.client.user?.id, + ); + return participants.map((participantId) => + conn.subscribeParticipantAudio(participantId, opts), + ); + } + private handleVoiceStateUpdate(data: GatewayVoiceStateUpdateDispatchData): void { const guildId = data.guild_id ?? ''; if (!guildId) return; From 260b13dfdd50640a399a037a560440a4db6a4700 Mon Sep 17 00:00:00 2001 From: Carapache Date: Sat, 21 Feb 2026 02:47:38 +0000 Subject: [PATCH 03/10] test(voice): cover inbound receive and manager helpers --- .../src/LiveKitRtcConnection.receive.test.ts | 24 ++++++++++ .../voice/src/VoiceManager.receive.test.ts | 46 +++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 packages/voice/src/LiveKitRtcConnection.receive.test.ts create mode 100644 packages/voice/src/VoiceManager.receive.test.ts diff --git a/packages/voice/src/LiveKitRtcConnection.receive.test.ts b/packages/voice/src/LiveKitRtcConnection.receive.test.ts new file mode 100644 index 0000000..adfb5c8 --- /dev/null +++ b/packages/voice/src/LiveKitRtcConnection.receive.test.ts @@ -0,0 +1,24 @@ +import { describe, it, expect, vi } from 'vitest'; +import { LiveKitRtcConnection } from './LiveKitRtcConnection.js'; + +function makeClient() { + return { + on: vi.fn(), + emit: vi.fn(), + sendToGateway: vi.fn(), + user: { id: 'bot' }, + rest: { post: vi.fn() }, + }; +} + +describe('LiveKitRtcConnection receive api', () => { + it('returns inert subscription when room is not connected', () => { + const channel = { id: 'c1', guildId: 'g1' } as never; + const conn = new LiveKitRtcConnection(makeClient() as never, channel, 'bot'); + + const sub = conn.subscribeParticipantAudio('u1'); + + expect(sub.participantId).toBe('u1'); + expect(() => sub.stop()).not.toThrow(); + }); +}); diff --git a/packages/voice/src/VoiceManager.receive.test.ts b/packages/voice/src/VoiceManager.receive.test.ts new file mode 100644 index 0000000..c6710d8 --- /dev/null +++ b/packages/voice/src/VoiceManager.receive.test.ts @@ -0,0 +1,46 @@ +import { describe, it, expect, vi } from 'vitest'; +import { VoiceManager } from './VoiceManager.js'; +import { LiveKitRtcConnection } from './LiveKitRtcConnection.js'; + +function makeClient() { + return { + on: vi.fn(), + emit: vi.fn(), + sendToGateway: vi.fn(), + user: { id: 'bot' }, + rest: { post: vi.fn() }, + }; +} + +describe('VoiceManager receive helpers', () => { + it('lists participants in a channel from voice state map', () => { + const manager = new VoiceManager(makeClient() as never); + + manager.voiceStates.set('g1', new Map([['u1', 'c1'], ['u2', 'c1'], ['u3', 'c2']])); + + expect(manager.listParticipantsInChannel('g1', 'c1')).toEqual(['u1', 'u2']); + expect(manager.listParticipantsInChannel('g1', 'missing')).toEqual([]); + }); + + it('subscribes known channel participants for livekit connections', () => { + const client = makeClient(); + const manager = new VoiceManager(client as never); + + const channel = { id: 'c1', guildId: 'g1' } as never; + const conn = new LiveKitRtcConnection(client as never, channel, 'bot'); + const subscribeSpy = vi + .spyOn(conn, 'subscribeParticipantAudio') + .mockImplementation((participantId) => ({ participantId, stop: vi.fn() })); + + manager.voiceStates.set('g1', new Map([['bot', 'c1'], ['u1', 'c1'], ['u2', 'c1'], ['u3', 'c9']])); + (manager as unknown as { connections: Map }).connections.set( + 'c1', + conn, + ); + + const subs = manager.subscribeChannelParticipants('c1'); + + expect(subscribeSpy).toHaveBeenCalledTimes(2); + expect(subs.map((s) => s.participantId)).toEqual(['u1', 'u2']); + }); +}); From e2b0cef7d7677bb158e51077b11ecdf4b204fa5f Mon Sep 17 00:00:00 2001 From: Carapache Date: Sat, 21 Feb 2026 02:47:38 +0000 Subject: [PATCH 04/10] docs(voice): document inbound transcription flow --- packages/voice/README.md | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/packages/voice/README.md b/packages/voice/README.md index 60f3eaa..49c1685 100644 --- a/packages/voice/README.md +++ b/packages/voice/README.md @@ -11,12 +11,30 @@ pnpm add @fluxerjs/voice @fluxerjs/core ## Usage ```javascript -import { getVoiceManager } from '@fluxerjs/voice'; +import { getVoiceManager, LiveKitRtcConnection } from '@fluxerjs/voice'; const voiceManager = getVoiceManager(client); const connection = await voiceManager.join(channel); await connection.play(streamUrl); +// Inbound transcription / speech-to-text pipeline +if (connection instanceof LiveKitRtcConnection) { + const subs = voiceManager.subscribeChannelParticipants(channel.id); + connection.on('speakerStart', ({ participantId }) => { + console.log('speaker start', participantId); + }); + connection.on('speakerStop', ({ participantId }) => { + console.log('speaker stop', participantId); + }); + connection.on('audioFrame', (frame) => { + // frame.samples is Int16 PCM suitable for WAV/STT pipelines + console.log(frame.participantId, frame.sampleRate, frame.channels, frame.samples.length); + }); + + // cleanup subscriptions when done + for (const sub of subs) sub.stop(); +} + connection.stop(); voiceManager.leave(guildId); ``` From ce8d27ad78b22e240584d96ee06fa14b384c6441 Mon Sep 17 00:00:00 2001 From: Carapache Date: Sat, 21 Feb 2026 02:47:38 +0000 Subject: [PATCH 05/10] chore(changeset): bump voice for inbound receive api --- .changeset/mean-crabs-learn.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/mean-crabs-learn.md diff --git a/.changeset/mean-crabs-learn.md b/.changeset/mean-crabs-learn.md new file mode 100644 index 0000000..30de0c5 --- /dev/null +++ b/.changeset/mean-crabs-learn.md @@ -0,0 +1,5 @@ +--- +"@fluxerjs/voice": minor +--- + +Add LiveKit inbound audio receive APIs for participant subscriptions, `audioFrame` events, and speaking lifecycle events. Also add voice manager helpers for channel participant subscriptions and update docs/examples for transcription-oriented usage. From baa13782b2198020e5f0c0fc6bb2bee2b7abff27 Mon Sep 17 00:00:00 2001 From: Carapache Date: Sat, 21 Feb 2026 03:01:19 +0000 Subject: [PATCH 06/10] refactor(voice): simplify autoResubscribe guard --- packages/voice/src/LiveKitRtcConnection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/voice/src/LiveKitRtcConnection.ts b/packages/voice/src/LiveKitRtcConnection.ts index 09a9442..9044ae0 100644 --- a/packages/voice/src/LiveKitRtcConnection.ts +++ b/packages/voice/src/LiveKitRtcConnection.ts @@ -408,7 +408,7 @@ export class LiveKitRtcConnection extends EventEmitter { } } - if (options.autoResubscribe === false && !this.receiveSubscriptions.has(participantId)) { + if (!options.autoResubscribe && !this.receiveSubscriptions.has(participantId)) { return { participantId, stop }; } From 98529793752946293a9b8108980c73d145d5824e Mon Sep 17 00:00:00 2001 From: Lucas Connell <38635411+lucasconnellm@users.noreply.github.com> Date: Sat, 21 Feb 2026 11:54:30 -0600 Subject: [PATCH 07/10] fix(voice): gate inbound audio on explicit subscription intent --- packages/voice/src/LiveKitRtcConnection.ts | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/packages/voice/src/LiveKitRtcConnection.ts b/packages/voice/src/LiveKitRtcConnection.ts index 9044ae0..18dc146 100644 --- a/packages/voice/src/LiveKitRtcConnection.ts +++ b/packages/voice/src/LiveKitRtcConnection.ts @@ -269,6 +269,7 @@ export class LiveKitRtcConnection extends EventEmitter { private lastServerToken: string | null = null; private _disconnectEmitted = false; private readonly receiveSubscriptions = new Map(); + private readonly requestedSubscriptions = new Map(); private readonly participantTrackSids = new Map(); private readonly activeSpeakers = new Set(); @@ -339,9 +340,14 @@ export class LiveKitRtcConnection extends EventEmitter { return participant.identity; } - private subscribeParticipantTrack(participant: RemoteParticipant, track: RemoteTrack): void { + private subscribeParticipantTrack( + participant: RemoteParticipant, + track: RemoteTrack, + options: { autoSubscribe?: boolean } = {}, + ): void { if (!this.isAudioTrack(track)) return; const participantId = this.getParticipantId(participant); + if (!options.autoSubscribe && !this.requestedSubscriptions.has(participantId)) return; const current = this.receiveSubscriptions.get(participantId); if (current) current.stop(); @@ -388,10 +394,12 @@ export class LiveKitRtcConnection extends EventEmitter { participantId: string, options: { autoResubscribe?: boolean } = {}, ): LiveKitReceiveSubscription { + const autoResubscribe = options.autoResubscribe === true; const stop = () => { this.receiveSubscriptions.get(participantId)?.stop(); this.receiveSubscriptions.delete(participantId); this.participantTrackSids.delete(participantId); + if (!autoResubscribe) this.requestedSubscriptions.delete(participantId); }; const room = this.room; @@ -399,6 +407,7 @@ export class LiveKitRtcConnection extends EventEmitter { const participant = room.remoteParticipants.get(participantId); if (!participant) return { participantId, stop }; + this.requestedSubscriptions.set(participantId, autoResubscribe); for (const pub of participant.trackPublications.values()) { const maybeTrack = (pub as { track?: RemoteTrack }).track; @@ -408,7 +417,8 @@ export class LiveKitRtcConnection extends EventEmitter { } } - if (!options.autoResubscribe && !this.receiveSubscriptions.has(participantId)) { + if (!autoResubscribe && !this.receiveSubscriptions.has(participantId)) { + this.requestedSubscriptions.delete(participantId); return { participantId, stop }; } @@ -418,6 +428,7 @@ export class LiveKitRtcConnection extends EventEmitter { private clearReceiveSubscriptions(): void { for (const sub of this.receiveSubscriptions.values()) sub.stop(); this.receiveSubscriptions.clear(); + this.requestedSubscriptions.clear(); this.participantTrackSids.clear(); this.activeSpeakers.clear(); } @@ -480,6 +491,9 @@ export class LiveKitRtcConnection extends EventEmitter { const participantId = this.getParticipantId(participant); this.receiveSubscriptions.get(participantId)?.stop(); this.receiveSubscriptions.delete(participantId); + if (this.requestedSubscriptions.get(participantId) !== true) { + this.requestedSubscriptions.delete(participantId); + } this.participantTrackSids.delete(participantId); }); @@ -487,6 +501,9 @@ export class LiveKitRtcConnection extends EventEmitter { const participantId = this.getParticipantId(participant); this.receiveSubscriptions.get(participantId)?.stop(); this.receiveSubscriptions.delete(participantId); + if (this.requestedSubscriptions.get(participantId) !== true) { + this.requestedSubscriptions.delete(participantId); + } this.participantTrackSids.delete(participantId); if (this.activeSpeakers.delete(participantId)) { this.emit('speakerStop', { participantId }); From ca675b88eddd7f1b95b55c41ff748c57201d9010 Mon Sep 17 00:00:00 2001 From: Lucas Connell <38635411+lucasconnellm@users.noreply.github.com> Date: Sat, 21 Feb 2026 12:27:12 -0600 Subject: [PATCH 08/10] fix(voice): preserve receive intent until explicit stop --- packages/voice/src/LiveKitRtcConnection.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/packages/voice/src/LiveKitRtcConnection.ts b/packages/voice/src/LiveKitRtcConnection.ts index 18dc146..61137bb 100644 --- a/packages/voice/src/LiveKitRtcConnection.ts +++ b/packages/voice/src/LiveKitRtcConnection.ts @@ -399,15 +399,15 @@ export class LiveKitRtcConnection extends EventEmitter { this.receiveSubscriptions.get(participantId)?.stop(); this.receiveSubscriptions.delete(participantId); this.participantTrackSids.delete(participantId); - if (!autoResubscribe) this.requestedSubscriptions.delete(participantId); + this.requestedSubscriptions.delete(participantId); }; + this.requestedSubscriptions.set(participantId, autoResubscribe); const room = this.room; if (!room || !room.isConnected) return { participantId, stop }; const participant = room.remoteParticipants.get(participantId); if (!participant) return { participantId, stop }; - this.requestedSubscriptions.set(participantId, autoResubscribe); for (const pub of participant.trackPublications.values()) { const maybeTrack = (pub as { track?: RemoteTrack }).track; @@ -417,11 +417,6 @@ export class LiveKitRtcConnection extends EventEmitter { } } - if (!autoResubscribe && !this.receiveSubscriptions.has(participantId)) { - this.requestedSubscriptions.delete(participantId); - return { participantId, stop }; - } - return { participantId, stop }; } From be4f39d095e71f971439b2ba9de35bb330b0d3ca Mon Sep 17 00:00:00 2001 From: Lucas Connell <38635411+lucasconnellm@users.noreply.github.com> Date: Sat, 21 Feb 2026 12:44:32 -0600 Subject: [PATCH 09/10] fix(voice): harden receive pump cancellation and reader cleanup --- packages/voice/src/LiveKitRtcConnection.ts | 29 ++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/packages/voice/src/LiveKitRtcConnection.ts b/packages/voice/src/LiveKitRtcConnection.ts index 61137bb..3555b7a 100644 --- a/packages/voice/src/LiveKitRtcConnection.ts +++ b/packages/voice/src/LiveKitRtcConnection.ts @@ -35,6 +35,7 @@ import type { VideoFrame as WebCodecsVideoFrame } from 'node-webcodecs'; const SAMPLE_RATE = 48000; const CHANNELS = 1; +const RECEIVE_READ_TIMEOUT_MS = 100; /** avcC box structure from mp4box (AVCConfigurationBox). */ interface AvcCBox { @@ -357,12 +358,22 @@ export class LiveKitRtcConnection extends EventEmitter { frameSizeMs: 10, }); let stopped = false; + let reader: ReturnType | null = null; const pump = async () => { try { - const reader = audioStream.getReader(); + reader = audioStream.getReader(); while (!stopped) { - const { done, value } = await reader.read(); + let readTimeout: NodeJS.Timeout | null = null; + const next = await Promise.race([ + reader.read(), + new Promise((resolve) => { + readTimeout = setTimeout(() => resolve(null), RECEIVE_READ_TIMEOUT_MS); + }), + ]); + if (readTimeout) clearTimeout(readTimeout); + if (next === null) continue; + const { done, value } = next; if (done || !value) break; this.emit('audioFrame', { participantId, @@ -376,11 +387,25 @@ export class LiveKitRtcConnection extends EventEmitter { if (!stopped) { this.emit('error', err instanceof Error ? err : new Error(String(err))); } + } finally { + if (reader) { + try { + reader.releaseLock(); + } catch {} + reader = null; + } } }; const stop = () => { + if (stopped) return; stopped = true; + if (reader) { + reader.cancel().catch(() => {}); + try { + reader.releaseLock(); + } catch {} + } audioStream.cancel().catch(() => {}); this.receiveSubscriptions.delete(participantId); }; From 8793570685d669767724dcce55a942b8271c9e5c Mon Sep 17 00:00:00 2001 From: Lucas Connell <38635411+lucasconnellm@users.noreply.github.com> Date: Sat, 21 Feb 2026 13:13:31 -0600 Subject: [PATCH 10/10] chore(voice): satisfy no-empty in receive reader cleanup --- packages/voice/src/LiveKitRtcConnection.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/voice/src/LiveKitRtcConnection.ts b/packages/voice/src/LiveKitRtcConnection.ts index 3555b7a..e24fcf6 100644 --- a/packages/voice/src/LiveKitRtcConnection.ts +++ b/packages/voice/src/LiveKitRtcConnection.ts @@ -391,7 +391,9 @@ export class LiveKitRtcConnection extends EventEmitter { if (reader) { try { reader.releaseLock(); - } catch {} + } catch { + // Reader may already be released. + } reader = null; } } @@ -404,7 +406,9 @@ export class LiveKitRtcConnection extends EventEmitter { reader.cancel().catch(() => {}); try { reader.releaseLock(); - } catch {} + } catch { + // Reader may already be released. + } } audioStream.cancel().catch(() => {}); this.receiveSubscriptions.delete(participantId);