diff --git a/.changeset/mean-crabs-learn.md b/.changeset/mean-crabs-learn.md new file mode 100644 index 0000000..30de0c5 --- /dev/null +++ b/.changeset/mean-crabs-learn.md @@ -0,0 +1,5 @@ +--- +"@fluxerjs/voice": minor +--- + +Add LiveKit inbound audio receive APIs for participant subscriptions, `audioFrame` events, and speaking lifecycle events. Also add voice manager helpers for channel participant subscriptions and update docs/examples for transcription-oriented usage. diff --git a/packages/voice/README.md b/packages/voice/README.md index 60f3eaa..49c1685 100644 --- a/packages/voice/README.md +++ b/packages/voice/README.md @@ -11,12 +11,30 @@ pnpm add @fluxerjs/voice @fluxerjs/core ## Usage ```javascript -import { getVoiceManager } from '@fluxerjs/voice'; +import { getVoiceManager, LiveKitRtcConnection } from '@fluxerjs/voice'; const voiceManager = getVoiceManager(client); const connection = await voiceManager.join(channel); await connection.play(streamUrl); +// Inbound transcription / speech-to-text pipeline +if (connection instanceof LiveKitRtcConnection) { + const subs = voiceManager.subscribeChannelParticipants(channel.id); + connection.on('speakerStart', ({ participantId }) => { + console.log('speaker start', participantId); + }); + connection.on('speakerStop', ({ participantId }) => { + console.log('speaker stop', participantId); + }); + connection.on('audioFrame', (frame) => { + // frame.samples is Int16 PCM suitable for WAV/STT pipelines + console.log(frame.participantId, frame.sampleRate, frame.channels, frame.samples.length); + }); + + // cleanup subscriptions when done + for (const sub of subs) sub.stop(); +} + connection.stop(); voiceManager.leave(guildId); ``` diff --git a/packages/voice/src/LiveKitRtcConnection.receive.test.ts b/packages/voice/src/LiveKitRtcConnection.receive.test.ts new file mode 100644 index 0000000..adfb5c8 --- /dev/null +++ b/packages/voice/src/LiveKitRtcConnection.receive.test.ts @@ -0,0 +1,24 @@ +import { describe, it, expect, vi } from 'vitest'; +import { LiveKitRtcConnection } from './LiveKitRtcConnection.js'; + +function makeClient() { + return { + on: vi.fn(), + emit: vi.fn(), + sendToGateway: vi.fn(), + user: { id: 'bot' }, + rest: { post: vi.fn() }, + }; +} + +describe('LiveKitRtcConnection receive api', () => { + it('returns inert subscription when room is not connected', () => { + const channel = { id: 'c1', guildId: 'g1' } as never; + const conn = new LiveKitRtcConnection(makeClient() as never, channel, 'bot'); + + const sub = conn.subscribeParticipantAudio('u1'); + + expect(sub.participantId).toBe('u1'); + expect(() => sub.stop()).not.toThrow(); + }); +}); diff --git a/packages/voice/src/LiveKitRtcConnection.ts b/packages/voice/src/LiveKitRtcConnection.ts index 253af2d..e24fcf6 100644 --- a/packages/voice/src/LiveKitRtcConnection.ts +++ b/packages/voice/src/LiveKitRtcConnection.ts @@ -7,6 +7,7 @@ import { GatewayVoiceStateUpdateDispatchData, } from '@fluxerjs/types'; import { + AudioStream, Room, RoomEvent, AudioSource, @@ -18,6 +19,9 @@ import { VideoBufferType, VideoFrame, VideoSource, + type RemoteParticipant, + type RemoteTrack, + TrackKind, } from '@livekit/rtc-node'; import { buildLiveKitUrlForRtcSdk } from './livekit.js'; import { parseOpusPacketBoundaries, concatUint8Arrays } from './opusUtils.js'; @@ -31,6 +35,7 @@ import type { VideoFrame as WebCodecsVideoFrame } from 'node-webcodecs'; const SAMPLE_RATE = 48000; const CHANNELS = 1; +const RECEIVE_READ_TIMEOUT_MS = 100; /** avcC box structure from mp4box (AVCConfigurationBox). */ interface AvcCBox { @@ -177,6 +182,25 @@ export type LiveKitRtcConnectionEvents = VoiceConnectionEvents & { serverLeave: []; /** Emitted when voice state should be synced (self_stream/self_video). VoiceManager listens. */ requestVoiceStateSync: [payload: { self_stream?: boolean; self_video?: boolean }]; + /** Emitted when a remote participant starts speaking. */ + speakerStart: [payload: { participantId: string }]; + /** Emitted when a remote participant stops speaking. */ + speakerStop: [payload: { participantId: string }]; + /** Emitted for each decoded inbound audio frame. */ + audioFrame: [frame: LiveKitAudioFrame]; +}; + +export type LiveKitAudioFrame = { + participantId: string; + trackSid?: string; + sampleRate: number; + channels: number; + samples: Int16Array; +}; + +export type LiveKitReceiveSubscription = { + participantId: string; + stop: () => void; }; /** @@ -245,6 +269,10 @@ export class LiveKitRtcConnection extends EventEmitter { private lastServerEndpoint: string | null = null; private lastServerToken: string | null = null; private _disconnectEmitted = false; + private readonly receiveSubscriptions = new Map(); + private readonly requestedSubscriptions = new Map(); + private readonly participantTrackSids = new Map(); + private readonly activeSpeakers = new Set(); /** * @param client - The Fluxer client instance @@ -305,6 +333,130 @@ export class LiveKitRtcConnection extends EventEmitter { return this._volume ?? 100; } + private isAudioTrack(track: RemoteTrack): boolean { + return track.kind === TrackKind.KIND_AUDIO; + } + + private getParticipantId(participant: RemoteParticipant): string { + return participant.identity; + } + + private subscribeParticipantTrack( + participant: RemoteParticipant, + track: RemoteTrack, + options: { autoSubscribe?: boolean } = {}, + ): void { + if (!this.isAudioTrack(track)) return; + const participantId = this.getParticipantId(participant); + if (!options.autoSubscribe && !this.requestedSubscriptions.has(participantId)) return; + const current = this.receiveSubscriptions.get(participantId); + if (current) current.stop(); + + const audioStream = new AudioStream(track, { + sampleRate: SAMPLE_RATE, + numChannels: CHANNELS, + frameSizeMs: 10, + }); + let stopped = false; + let reader: ReturnType | null = null; + + const pump = async () => { + try { + reader = audioStream.getReader(); + while (!stopped) { + let readTimeout: NodeJS.Timeout | null = null; + const next = await Promise.race([ + reader.read(), + new Promise((resolve) => { + readTimeout = setTimeout(() => resolve(null), RECEIVE_READ_TIMEOUT_MS); + }), + ]); + if (readTimeout) clearTimeout(readTimeout); + if (next === null) continue; + const { done, value } = next; + if (done || !value) break; + this.emit('audioFrame', { + participantId, + trackSid: track.sid, + sampleRate: value.sampleRate, + channels: value.channels, + samples: value.data, + }); + } + } catch (err) { + if (!stopped) { + this.emit('error', err instanceof Error ? err : new Error(String(err))); + } + } finally { + if (reader) { + try { + reader.releaseLock(); + } catch { + // Reader may already be released. + } + reader = null; + } + } + }; + + const stop = () => { + if (stopped) return; + stopped = true; + if (reader) { + reader.cancel().catch(() => {}); + try { + reader.releaseLock(); + } catch { + // Reader may already be released. + } + } + audioStream.cancel().catch(() => {}); + this.receiveSubscriptions.delete(participantId); + }; + + this.receiveSubscriptions.set(participantId, { participantId, stop }); + this.participantTrackSids.set(participantId, track.sid ?? ''); + void pump(); + } + + subscribeParticipantAudio( + participantId: string, + options: { autoResubscribe?: boolean } = {}, + ): LiveKitReceiveSubscription { + const autoResubscribe = options.autoResubscribe === true; + const stop = () => { + this.receiveSubscriptions.get(participantId)?.stop(); + this.receiveSubscriptions.delete(participantId); + this.participantTrackSids.delete(participantId); + this.requestedSubscriptions.delete(participantId); + }; + this.requestedSubscriptions.set(participantId, autoResubscribe); + + const room = this.room; + if (!room || !room.isConnected) return { participantId, stop }; + + const participant = room.remoteParticipants.get(participantId); + if (!participant) return { participantId, stop }; + + for (const pub of participant.trackPublications.values()) { + const maybeTrack = (pub as { track?: RemoteTrack }).track; + if (maybeTrack && this.isAudioTrack(maybeTrack)) { + this.subscribeParticipantTrack(participant, maybeTrack); + break; + } + } + + return { participantId, stop }; + } + + private clearReceiveSubscriptions(): void { + for (const sub of this.receiveSubscriptions.values()) sub.stop(); + this.receiveSubscriptions.clear(); + this.requestedSubscriptions.clear(); + this.participantTrackSids.clear(); + this.activeSpeakers.clear(); + } + playOpus(_stream: NodeJS.ReadableStream): void { this.emit( 'error', @@ -353,7 +505,52 @@ export class LiveKitRtcConnection extends EventEmitter { this.debug('Room reconnected'); }); - await room.connect(url, token, { autoSubscribe: false, dynacast: false }); + room.on(RoomEvent.TrackSubscribed, (track, _publication, participant) => { + if (!this.isAudioTrack(track)) return; + this.subscribeParticipantTrack(participant, track); + }); + + room.on(RoomEvent.TrackUnsubscribed, (track, _publication, participant) => { + if (!this.isAudioTrack(track)) return; + const participantId = this.getParticipantId(participant); + this.receiveSubscriptions.get(participantId)?.stop(); + this.receiveSubscriptions.delete(participantId); + if (this.requestedSubscriptions.get(participantId) !== true) { + this.requestedSubscriptions.delete(participantId); + } + this.participantTrackSids.delete(participantId); + }); + + room.on(RoomEvent.ParticipantDisconnected, (participant) => { + const participantId = this.getParticipantId(participant); + this.receiveSubscriptions.get(participantId)?.stop(); + this.receiveSubscriptions.delete(participantId); + if (this.requestedSubscriptions.get(participantId) !== true) { + this.requestedSubscriptions.delete(participantId); + } + this.participantTrackSids.delete(participantId); + if (this.activeSpeakers.delete(participantId)) { + this.emit('speakerStop', { participantId }); + } + }); + + room.on(RoomEvent.ActiveSpeakersChanged, (speakers) => { + const next = new Set(speakers.map((speaker) => speaker.identity)); + for (const participantId of next) { + if (!this.activeSpeakers.has(participantId)) { + this.emit('speakerStart', { participantId }); + } + } + for (const participantId of this.activeSpeakers) { + if (!next.has(participantId)) { + this.emit('speakerStop', { participantId }); + } + } + this.activeSpeakers.clear(); + for (const participantId of next) this.activeSpeakers.add(participantId); + }); + + await room.connect(url, token, { autoSubscribe: true, dynacast: false }); this.lastServerEndpoint = raw; this.lastServerToken = token; this.debug('connected to room'); @@ -1523,6 +1720,7 @@ export class LiveKitRtcConnection extends EventEmitter { stop(): void { this._playing = false; this.stopVideo(); + this.clearReceiveSubscriptions(); if (this.currentStream?.destroy) this.currentStream.destroy(); this.currentStream = null; if (this.audioTrack) { diff --git a/packages/voice/src/VoiceManager.receive.test.ts b/packages/voice/src/VoiceManager.receive.test.ts new file mode 100644 index 0000000..c6710d8 --- /dev/null +++ b/packages/voice/src/VoiceManager.receive.test.ts @@ -0,0 +1,46 @@ +import { describe, it, expect, vi } from 'vitest'; +import { VoiceManager } from './VoiceManager.js'; +import { LiveKitRtcConnection } from './LiveKitRtcConnection.js'; + +function makeClient() { + return { + on: vi.fn(), + emit: vi.fn(), + sendToGateway: vi.fn(), + user: { id: 'bot' }, + rest: { post: vi.fn() }, + }; +} + +describe('VoiceManager receive helpers', () => { + it('lists participants in a channel from voice state map', () => { + const manager = new VoiceManager(makeClient() as never); + + manager.voiceStates.set('g1', new Map([['u1', 'c1'], ['u2', 'c1'], ['u3', 'c2']])); + + expect(manager.listParticipantsInChannel('g1', 'c1')).toEqual(['u1', 'u2']); + expect(manager.listParticipantsInChannel('g1', 'missing')).toEqual([]); + }); + + it('subscribes known channel participants for livekit connections', () => { + const client = makeClient(); + const manager = new VoiceManager(client as never); + + const channel = { id: 'c1', guildId: 'g1' } as never; + const conn = new LiveKitRtcConnection(client as never, channel, 'bot'); + const subscribeSpy = vi + .spyOn(conn, 'subscribeParticipantAudio') + .mockImplementation((participantId) => ({ participantId, stop: vi.fn() })); + + manager.voiceStates.set('g1', new Map([['bot', 'c1'], ['u1', 'c1'], ['u2', 'c1'], ['u3', 'c9']])); + (manager as unknown as { connections: Map }).connections.set( + 'c1', + conn, + ); + + const subs = manager.subscribeChannelParticipants('c1'); + + expect(subscribeSpy).toHaveBeenCalledTimes(2); + expect(subs.map((s) => s.participantId)).toEqual(['u1', 'u2']); + }); +}); diff --git a/packages/voice/src/VoiceManager.ts b/packages/voice/src/VoiceManager.ts index 2555d37..c08e4d2 100644 --- a/packages/voice/src/VoiceManager.ts +++ b/packages/voice/src/VoiceManager.ts @@ -9,7 +9,10 @@ import { GatewayVoiceStateUpdateDispatchData, } from '@fluxerjs/types'; import { VoiceConnection } from './VoiceConnection.js'; -import { LiveKitRtcConnection } from './LiveKitRtcConnection.js'; +import { + LiveKitRtcConnection, + type LiveKitReceiveSubscription, +} from './LiveKitRtcConnection.js'; import { isLiveKitEndpoint } from './livekit.js'; import { Collection } from '@fluxerjs/collection'; @@ -93,6 +96,38 @@ export class VoiceManager extends EventEmitter { return guildMap.get(userId) ?? null; } + /** + * List participant user IDs currently in a specific voice channel. + */ + listParticipantsInChannel(guildId: string, channelId: string): string[] { + const guildMap = this.voiceStates.get(guildId); + if (!guildMap) return []; + const participants: string[] = []; + for (const [userId, voiceChannelId] of guildMap.entries()) { + if (voiceChannelId === channelId) participants.push(userId); + } + return participants; + } + + /** + * Subscribe to inbound audio for all known participants currently in a voice channel. + * Only supported for LiveKit connections. + */ + subscribeChannelParticipants( + channelId: string, + opts?: { autoResubscribe?: boolean }, + ): LiveKitReceiveSubscription[] { + const conn = this.connections.get(channelId); + if (!(conn instanceof LiveKitRtcConnection)) return []; + const guildId = conn.channel.guildId; + const participants = this.listParticipantsInChannel(guildId, channelId).filter( + (participantId) => participantId !== this.client.user?.id, + ); + return participants.map((participantId) => + conn.subscribeParticipantAudio(participantId, opts), + ); + } + private handleVoiceStateUpdate(data: GatewayVoiceStateUpdateDispatchData): void { const guildId = data.guild_id ?? ''; if (!guildId) return; diff --git a/packages/voice/src/index.ts b/packages/voice/src/index.ts index de5d9af..5509a73 100644 --- a/packages/voice/src/index.ts +++ b/packages/voice/src/index.ts @@ -3,6 +3,8 @@ export { VoiceConnection, type VoiceConnectionEvents } from './VoiceConnection.j export { LiveKitRtcConnection, type LiveKitRtcConnectionEvents, + type LiveKitAudioFrame, + type LiveKitReceiveSubscription, type VideoPlayOptions, } from './LiveKitRtcConnection.js'; import { Client } from '@fluxerjs/core';