-
-
Notifications
You must be signed in to change notification settings - Fork 6
LiveKit Receiving Logic #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d84120a
9927804
260b13d
e2b0cef
ce8d27a
baa1378
9852979
ca675b8
be4f39d
8793570
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "@fluxerjs/voice": minor | ||
| --- | ||
|
|
||
| Add LiveKit inbound audio receive APIs for participant subscriptions, `audioFrame` events, and speaking lifecycle events. Also add voice manager helpers for channel participant subscriptions and update docs/examples for transcription-oriented usage. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| import { describe, it, expect, vi } from 'vitest'; | ||
| import { LiveKitRtcConnection } from './LiveKitRtcConnection.js'; | ||
|
|
||
| function makeClient() { | ||
| return { | ||
| on: vi.fn(), | ||
| emit: vi.fn(), | ||
| sendToGateway: vi.fn(), | ||
| user: { id: 'bot' }, | ||
| rest: { post: vi.fn() }, | ||
| }; | ||
| } | ||
|
|
||
| describe('LiveKitRtcConnection receive api', () => { | ||
| it('returns inert subscription when room is not connected', () => { | ||
| const channel = { id: 'c1', guildId: 'g1' } as never; | ||
| const conn = new LiveKitRtcConnection(makeClient() as never, channel, 'bot'); | ||
|
|
||
| const sub = conn.subscribeParticipantAudio('u1'); | ||
|
|
||
| expect(sub.participantId).toBe('u1'); | ||
| expect(() => sub.stop()).not.toThrow(); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ import { | |
| GatewayVoiceStateUpdateDispatchData, | ||
| } from '@fluxerjs/types'; | ||
| import { | ||
| AudioStream, | ||
| Room, | ||
| RoomEvent, | ||
| AudioSource, | ||
|
|
@@ -18,6 +19,9 @@ import { | |
| VideoBufferType, | ||
| VideoFrame, | ||
| VideoSource, | ||
| type RemoteParticipant, | ||
| type RemoteTrack, | ||
| TrackKind, | ||
| } from '@livekit/rtc-node'; | ||
| import { buildLiveKitUrlForRtcSdk } from './livekit.js'; | ||
| import { parseOpusPacketBoundaries, concatUint8Arrays } from './opusUtils.js'; | ||
|
|
@@ -31,6 +35,7 @@ import type { VideoFrame as WebCodecsVideoFrame } from 'node-webcodecs'; | |
|
|
||
| const SAMPLE_RATE = 48000; | ||
| const CHANNELS = 1; | ||
| const RECEIVE_READ_TIMEOUT_MS = 100; | ||
|
|
||
| /** avcC box structure from mp4box (AVCConfigurationBox). */ | ||
| interface AvcCBox { | ||
|
|
@@ -177,6 +182,25 @@ export type LiveKitRtcConnectionEvents = VoiceConnectionEvents & { | |
| serverLeave: []; | ||
| /** Emitted when voice state should be synced (self_stream/self_video). VoiceManager listens. */ | ||
| requestVoiceStateSync: [payload: { self_stream?: boolean; self_video?: boolean }]; | ||
| /** Emitted when a remote participant starts speaking. */ | ||
| speakerStart: [payload: { participantId: string }]; | ||
| /** Emitted when a remote participant stops speaking. */ | ||
| speakerStop: [payload: { participantId: string }]; | ||
| /** Emitted for each decoded inbound audio frame. */ | ||
| audioFrame: [frame: LiveKitAudioFrame]; | ||
| }; | ||
|
|
||
| export type LiveKitAudioFrame = { | ||
| participantId: string; | ||
| trackSid?: string; | ||
| sampleRate: number; | ||
| channels: number; | ||
| samples: Int16Array; | ||
| }; | ||
|
|
||
| export type LiveKitReceiveSubscription = { | ||
| participantId: string; | ||
| stop: () => void; | ||
| }; | ||
|
|
||
| /** | ||
|
|
@@ -245,6 +269,10 @@ export class LiveKitRtcConnection extends EventEmitter { | |
| private lastServerEndpoint: string | null = null; | ||
| private lastServerToken: string | null = null; | ||
| private _disconnectEmitted = false; | ||
| private readonly receiveSubscriptions = new Map<string, LiveKitReceiveSubscription>(); | ||
| private readonly requestedSubscriptions = new Map<string, boolean>(); | ||
| private readonly participantTrackSids = new Map<string, string>(); | ||
| private readonly activeSpeakers = new Set<string>(); | ||
|
|
||
| /** | ||
| * @param client - The Fluxer client instance | ||
|
|
@@ -305,6 +333,130 @@ export class LiveKitRtcConnection extends EventEmitter { | |
| return this._volume ?? 100; | ||
| } | ||
|
|
||
| private isAudioTrack(track: RemoteTrack): boolean { | ||
| return track.kind === TrackKind.KIND_AUDIO; | ||
| } | ||
|
|
||
| private getParticipantId(participant: RemoteParticipant): string { | ||
| return participant.identity; | ||
| } | ||
|
|
||
| private subscribeParticipantTrack( | ||
| participant: RemoteParticipant, | ||
| track: RemoteTrack, | ||
| options: { autoSubscribe?: boolean } = {}, | ||
| ): void { | ||
| if (!this.isAudioTrack(track)) return; | ||
| const participantId = this.getParticipantId(participant); | ||
| if (!options.autoSubscribe && !this.requestedSubscriptions.has(participantId)) return; | ||
| const current = this.receiveSubscriptions.get(participantId); | ||
| if (current) current.stop(); | ||
|
|
||
| const audioStream = new AudioStream(track, { | ||
| sampleRate: SAMPLE_RATE, | ||
| numChannels: CHANNELS, | ||
| frameSizeMs: 10, | ||
| }); | ||
| let stopped = false; | ||
| let reader: ReturnType<AudioStream['getReader']> | null = null; | ||
|
|
||
| const pump = async () => { | ||
| try { | ||
| reader = audioStream.getReader(); | ||
| while (!stopped) { | ||
| let readTimeout: NodeJS.Timeout | null = null; | ||
| const next = await Promise.race([ | ||
| reader.read(), | ||
| new Promise<null>((resolve) => { | ||
| readTimeout = setTimeout(() => resolve(null), RECEIVE_READ_TIMEOUT_MS); | ||
| }), | ||
| ]); | ||
| if (readTimeout) clearTimeout(readTimeout); | ||
| if (next === null) continue; | ||
| const { done, value } = next; | ||
| if (done || !value) break; | ||
| this.emit('audioFrame', { | ||
| participantId, | ||
| trackSid: track.sid, | ||
| sampleRate: value.sampleRate, | ||
| channels: value.channels, | ||
| samples: value.data, | ||
| }); | ||
| } | ||
| } catch (err) { | ||
| if (!stopped) { | ||
| this.emit('error', err instanceof Error ? err : new Error(String(err))); | ||
| } | ||
| } finally { | ||
| if (reader) { | ||
| try { | ||
| reader.releaseLock(); | ||
| } catch { | ||
| // Reader may already be released. | ||
| } | ||
| reader = null; | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| const stop = () => { | ||
| if (stopped) return; | ||
| stopped = true; | ||
| if (reader) { | ||
| reader.cancel().catch(() => {}); | ||
| try { | ||
| reader.releaseLock(); | ||
| } catch { | ||
| // Reader may already be released. | ||
| } | ||
| } | ||
| audioStream.cancel().catch(() => {}); | ||
| this.receiveSubscriptions.delete(participantId); | ||
| }; | ||
|
|
||
| this.receiveSubscriptions.set(participantId, { participantId, stop }); | ||
| this.participantTrackSids.set(participantId, track.sid ?? ''); | ||
| void pump(); | ||
| } | ||
|
|
||
| subscribeParticipantAudio( | ||
| participantId: string, | ||
| options: { autoResubscribe?: boolean } = {}, | ||
| ): LiveKitReceiveSubscription { | ||
| const autoResubscribe = options.autoResubscribe === true; | ||
| const stop = () => { | ||
| this.receiveSubscriptions.get(participantId)?.stop(); | ||
| this.receiveSubscriptions.delete(participantId); | ||
| this.participantTrackSids.delete(participantId); | ||
| this.requestedSubscriptions.delete(participantId); | ||
| }; | ||
| this.requestedSubscriptions.set(participantId, autoResubscribe); | ||
|
|
||
| const room = this.room; | ||
| if (!room || !room.isConnected) return { participantId, stop }; | ||
|
|
||
| const participant = room.remoteParticipants.get(participantId); | ||
| if (!participant) return { participantId, stop }; | ||
|
|
||
| for (const pub of participant.trackPublications.values()) { | ||
| const maybeTrack = (pub as { track?: RemoteTrack }).track; | ||
| if (maybeTrack && this.isAudioTrack(maybeTrack)) { | ||
| this.subscribeParticipantTrack(participant, maybeTrack); | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| return { participantId, stop }; | ||
| } | ||
|
|
||
| private clearReceiveSubscriptions(): void { | ||
| for (const sub of this.receiveSubscriptions.values()) sub.stop(); | ||
| this.receiveSubscriptions.clear(); | ||
| this.requestedSubscriptions.clear(); | ||
| this.participantTrackSids.clear(); | ||
| this.activeSpeakers.clear(); | ||
| } | ||
|
Comment on lines
+422
to
+458
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Because Consider tracking explicit subscription intent and only starting the pump for those participants (and honor 🔧 Suggested gating approach@@
- private readonly receiveSubscriptions = new Map<string, LiveKitReceiveSubscription>();
+ private readonly receiveSubscriptions = new Map<string, LiveKitReceiveSubscription>();
+ private readonly receiveIntents = new Map<string, { autoResubscribe: boolean }>();
@@
subscribeParticipantAudio(
participantId: string,
options: { autoResubscribe?: boolean } = {},
): LiveKitReceiveSubscription {
+ this.receiveIntents.set(participantId, {
+ autoResubscribe: options.autoResubscribe ?? true,
+ });
const stop = () => {
+ this.receiveIntents.delete(participantId);
this.receiveSubscriptions.get(participantId)?.stop();
this.receiveSubscriptions.delete(participantId);
this.participantTrackSids.delete(participantId);
};
@@
room.on(RoomEvent.TrackSubscribed, (track, _publication, participant) => {
if (!this.isAudioTrack(track)) return;
+ const participantId = this.getParticipantId(participant);
+ if (!this.receiveIntents.has(participantId)) return;
this.subscribeParticipantTrack(participant, track);
});
@@
room.on(RoomEvent.TrackUnsubscribed, (track, _publication, participant) => {
if (!this.isAudioTrack(track)) return;
const participantId = this.getParticipantId(participant);
this.receiveSubscriptions.get(participantId)?.stop();
this.receiveSubscriptions.delete(participantId);
this.participantTrackSids.delete(participantId);
+ if (!this.receiveIntents.get(participantId)?.autoResubscribe) {
+ this.receiveIntents.delete(participantId);
+ }
});
@@
private clearReceiveSubscriptions(): void {
for (const sub of this.receiveSubscriptions.values()) sub.stop();
this.receiveSubscriptions.clear();
this.participantTrackSids.clear();
this.activeSpeakers.clear();
+ this.receiveIntents.clear();
}Also applies to: 473-512 🤖 Prompt for AI Agents |
||
|
|
||
| playOpus(_stream: NodeJS.ReadableStream): void { | ||
| this.emit( | ||
| 'error', | ||
|
|
@@ -353,7 +505,52 @@ export class LiveKitRtcConnection extends EventEmitter { | |
| this.debug('Room reconnected'); | ||
| }); | ||
|
|
||
| await room.connect(url, token, { autoSubscribe: false, dynacast: false }); | ||
| room.on(RoomEvent.TrackSubscribed, (track, _publication, participant) => { | ||
| if (!this.isAudioTrack(track)) return; | ||
| this.subscribeParticipantTrack(participant, track); | ||
| }); | ||
|
|
||
| room.on(RoomEvent.TrackUnsubscribed, (track, _publication, participant) => { | ||
| if (!this.isAudioTrack(track)) return; | ||
| const participantId = this.getParticipantId(participant); | ||
| this.receiveSubscriptions.get(participantId)?.stop(); | ||
| this.receiveSubscriptions.delete(participantId); | ||
| if (this.requestedSubscriptions.get(participantId) !== true) { | ||
| this.requestedSubscriptions.delete(participantId); | ||
| } | ||
| this.participantTrackSids.delete(participantId); | ||
| }); | ||
|
|
||
| room.on(RoomEvent.ParticipantDisconnected, (participant) => { | ||
| const participantId = this.getParticipantId(participant); | ||
| this.receiveSubscriptions.get(participantId)?.stop(); | ||
| this.receiveSubscriptions.delete(participantId); | ||
| if (this.requestedSubscriptions.get(participantId) !== true) { | ||
| this.requestedSubscriptions.delete(participantId); | ||
| } | ||
| this.participantTrackSids.delete(participantId); | ||
| if (this.activeSpeakers.delete(participantId)) { | ||
| this.emit('speakerStop', { participantId }); | ||
| } | ||
| }); | ||
|
|
||
| room.on(RoomEvent.ActiveSpeakersChanged, (speakers) => { | ||
| const next = new Set(speakers.map((speaker) => speaker.identity)); | ||
| for (const participantId of next) { | ||
| if (!this.activeSpeakers.has(participantId)) { | ||
| this.emit('speakerStart', { participantId }); | ||
| } | ||
| } | ||
| for (const participantId of this.activeSpeakers) { | ||
| if (!next.has(participantId)) { | ||
| this.emit('speakerStop', { participantId }); | ||
| } | ||
| } | ||
| this.activeSpeakers.clear(); | ||
| for (const participantId of next) this.activeSpeakers.add(participantId); | ||
| }); | ||
|
|
||
| await room.connect(url, token, { autoSubscribe: true, dynacast: false }); | ||
| this.lastServerEndpoint = raw; | ||
| this.lastServerToken = token; | ||
| this.debug('connected to room'); | ||
|
|
@@ -1523,6 +1720,7 @@ export class LiveKitRtcConnection extends EventEmitter { | |
| stop(): void { | ||
| this._playing = false; | ||
| this.stopVideo(); | ||
| this.clearReceiveSubscriptions(); | ||
| if (this.currentStream?.destroy) this.currentStream.destroy(); | ||
| this.currentStream = null; | ||
| if (this.audioTrack) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| import { describe, it, expect, vi } from 'vitest'; | ||
| import { VoiceManager } from './VoiceManager.js'; | ||
| import { LiveKitRtcConnection } from './LiveKitRtcConnection.js'; | ||
|
|
||
| function makeClient() { | ||
| return { | ||
| on: vi.fn(), | ||
| emit: vi.fn(), | ||
| sendToGateway: vi.fn(), | ||
| user: { id: 'bot' }, | ||
| rest: { post: vi.fn() }, | ||
| }; | ||
| } | ||
|
|
||
| describe('VoiceManager receive helpers', () => { | ||
| it('lists participants in a channel from voice state map', () => { | ||
| const manager = new VoiceManager(makeClient() as never); | ||
|
|
||
| manager.voiceStates.set('g1', new Map([['u1', 'c1'], ['u2', 'c1'], ['u3', 'c2']])); | ||
|
|
||
| expect(manager.listParticipantsInChannel('g1', 'c1')).toEqual(['u1', 'u2']); | ||
| expect(manager.listParticipantsInChannel('g1', 'missing')).toEqual([]); | ||
| }); | ||
|
|
||
| it('subscribes known channel participants for livekit connections', () => { | ||
| const client = makeClient(); | ||
| const manager = new VoiceManager(client as never); | ||
|
|
||
| const channel = { id: 'c1', guildId: 'g1' } as never; | ||
| const conn = new LiveKitRtcConnection(client as never, channel, 'bot'); | ||
| const subscribeSpy = vi | ||
| .spyOn(conn, 'subscribeParticipantAudio') | ||
| .mockImplementation((participantId) => ({ participantId, stop: vi.fn() })); | ||
|
|
||
| manager.voiceStates.set('g1', new Map([['bot', 'c1'], ['u1', 'c1'], ['u2', 'c1'], ['u3', 'c9']])); | ||
| (manager as unknown as { connections: Map<string, LiveKitRtcConnection> }).connections.set( | ||
| 'c1', | ||
| conn, | ||
| ); | ||
|
|
||
| const subs = manager.subscribeChannelParticipants('c1'); | ||
|
|
||
| expect(subscribeSpy).toHaveBeenCalledTimes(2); | ||
| expect(subs.map((s) => s.participantId)).toEqual(['u1', 'u2']); | ||
| }); | ||
| }); |
Uh oh!
There was an error while loading. Please reload this page.