Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mean-crabs-learn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@fluxerjs/voice": minor
---

Add LiveKit inbound audio receive APIs for participant subscriptions, `audioFrame` events, and speaking lifecycle events. Also add voice manager helpers for channel participant subscriptions and update docs/examples for transcription-oriented usage.
20 changes: 19 additions & 1 deletion packages/voice/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,30 @@ pnpm add @fluxerjs/voice @fluxerjs/core
## Usage

```javascript
import { getVoiceManager } from '@fluxerjs/voice';
import { getVoiceManager, LiveKitRtcConnection } from '@fluxerjs/voice';

const voiceManager = getVoiceManager(client);
const connection = await voiceManager.join(channel);
await connection.play(streamUrl);

// Inbound transcription / speech-to-text pipeline
if (connection instanceof LiveKitRtcConnection) {
const subs = voiceManager.subscribeChannelParticipants(channel.id);
connection.on('speakerStart', ({ participantId }) => {
console.log('speaker start', participantId);
});
connection.on('speakerStop', ({ participantId }) => {
console.log('speaker stop', participantId);
});
connection.on('audioFrame', (frame) => {
// frame.samples is Int16 PCM suitable for WAV/STT pipelines
console.log(frame.participantId, frame.sampleRate, frame.channels, frame.samples.length);
});

// cleanup subscriptions when done
for (const sub of subs) sub.stop();
}

connection.stop();
voiceManager.leave(guildId);
```
Expand Down
24 changes: 24 additions & 0 deletions packages/voice/src/LiveKitRtcConnection.receive.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { describe, it, expect, vi } from 'vitest';
import { LiveKitRtcConnection } from './LiveKitRtcConnection.js';

function makeClient() {
return {
on: vi.fn(),
emit: vi.fn(),
sendToGateway: vi.fn(),
user: { id: 'bot' },
rest: { post: vi.fn() },
};
}

describe('LiveKitRtcConnection receive api', () => {
it('returns inert subscription when room is not connected', () => {
const channel = { id: 'c1', guildId: 'g1' } as never;
const conn = new LiveKitRtcConnection(makeClient() as never, channel, 'bot');

const sub = conn.subscribeParticipantAudio('u1');

expect(sub.participantId).toBe('u1');
expect(() => sub.stop()).not.toThrow();
});
});
200 changes: 199 additions & 1 deletion packages/voice/src/LiveKitRtcConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
GatewayVoiceStateUpdateDispatchData,
} from '@fluxerjs/types';
import {
AudioStream,
Room,
RoomEvent,
AudioSource,
Expand All @@ -18,6 +19,9 @@ import {
VideoBufferType,
VideoFrame,
VideoSource,
type RemoteParticipant,
type RemoteTrack,
TrackKind,
} from '@livekit/rtc-node';
import { buildLiveKitUrlForRtcSdk } from './livekit.js';
import { parseOpusPacketBoundaries, concatUint8Arrays } from './opusUtils.js';
Expand All @@ -31,6 +35,7 @@ import type { VideoFrame as WebCodecsVideoFrame } from 'node-webcodecs';

const SAMPLE_RATE = 48000;
const CHANNELS = 1;
const RECEIVE_READ_TIMEOUT_MS = 100;

/** avcC box structure from mp4box (AVCConfigurationBox). */
interface AvcCBox {
Expand Down Expand Up @@ -177,6 +182,25 @@ export type LiveKitRtcConnectionEvents = VoiceConnectionEvents & {
serverLeave: [];
/** Emitted when voice state should be synced (self_stream/self_video). VoiceManager listens. */
requestVoiceStateSync: [payload: { self_stream?: boolean; self_video?: boolean }];
/** Emitted when a remote participant starts speaking. */
speakerStart: [payload: { participantId: string }];
/** Emitted when a remote participant stops speaking. */
speakerStop: [payload: { participantId: string }];
/** Emitted for each decoded inbound audio frame. */
audioFrame: [frame: LiveKitAudioFrame];
};

export type LiveKitAudioFrame = {
participantId: string;
trackSid?: string;
sampleRate: number;
channels: number;
samples: Int16Array;
};

export type LiveKitReceiveSubscription = {
participantId: string;
stop: () => void;
};

/**
Expand Down Expand Up @@ -245,6 +269,10 @@ export class LiveKitRtcConnection extends EventEmitter {
private lastServerEndpoint: string | null = null;
private lastServerToken: string | null = null;
private _disconnectEmitted = false;
private readonly receiveSubscriptions = new Map<string, LiveKitReceiveSubscription>();
private readonly requestedSubscriptions = new Map<string, boolean>();
private readonly participantTrackSids = new Map<string, string>();
private readonly activeSpeakers = new Set<string>();

/**
* @param client - The Fluxer client instance
Expand Down Expand Up @@ -305,6 +333,130 @@ export class LiveKitRtcConnection extends EventEmitter {
return this._volume ?? 100;
}

private isAudioTrack(track: RemoteTrack): boolean {
return track.kind === TrackKind.KIND_AUDIO;
}

private getParticipantId(participant: RemoteParticipant): string {
return participant.identity;
}

private subscribeParticipantTrack(
participant: RemoteParticipant,
track: RemoteTrack,
options: { autoSubscribe?: boolean } = {},
): void {
if (!this.isAudioTrack(track)) return;
const participantId = this.getParticipantId(participant);
if (!options.autoSubscribe && !this.requestedSubscriptions.has(participantId)) return;
const current = this.receiveSubscriptions.get(participantId);
if (current) current.stop();

const audioStream = new AudioStream(track, {
sampleRate: SAMPLE_RATE,
numChannels: CHANNELS,
frameSizeMs: 10,
});
let stopped = false;
let reader: ReturnType<AudioStream['getReader']> | null = null;

const pump = async () => {
try {
reader = audioStream.getReader();
while (!stopped) {
let readTimeout: NodeJS.Timeout | null = null;
const next = await Promise.race([
reader.read(),
new Promise<null>((resolve) => {
readTimeout = setTimeout(() => resolve(null), RECEIVE_READ_TIMEOUT_MS);
}),
]);
if (readTimeout) clearTimeout(readTimeout);
if (next === null) continue;
const { done, value } = next;
if (done || !value) break;
this.emit('audioFrame', {
participantId,
trackSid: track.sid,
sampleRate: value.sampleRate,
channels: value.channels,
samples: value.data,
});
}
} catch (err) {
if (!stopped) {
this.emit('error', err instanceof Error ? err : new Error(String(err)));
}
} finally {
if (reader) {
try {
reader.releaseLock();
} catch {
// Reader may already be released.
}
reader = null;
}
}
};

const stop = () => {
if (stopped) return;
stopped = true;
if (reader) {
reader.cancel().catch(() => {});
try {
reader.releaseLock();
} catch {
// Reader may already be released.
}
}
audioStream.cancel().catch(() => {});
this.receiveSubscriptions.delete(participantId);
};

this.receiveSubscriptions.set(participantId, { participantId, stop });
this.participantTrackSids.set(participantId, track.sid ?? '');
void pump();
}

subscribeParticipantAudio(
participantId: string,
options: { autoResubscribe?: boolean } = {},
): LiveKitReceiveSubscription {
const autoResubscribe = options.autoResubscribe === true;
const stop = () => {
this.receiveSubscriptions.get(participantId)?.stop();
this.receiveSubscriptions.delete(participantId);
this.participantTrackSids.delete(participantId);
this.requestedSubscriptions.delete(participantId);
};
this.requestedSubscriptions.set(participantId, autoResubscribe);

const room = this.room;
if (!room || !room.isConnected) return { participantId, stop };

const participant = room.remoteParticipants.get(participantId);
if (!participant) return { participantId, stop };

for (const pub of participant.trackPublications.values()) {
const maybeTrack = (pub as { track?: RemoteTrack }).track;
if (maybeTrack && this.isAudioTrack(maybeTrack)) {
this.subscribeParticipantTrack(participant, maybeTrack);
break;
}
}

return { participantId, stop };
}

private clearReceiveSubscriptions(): void {
for (const sub of this.receiveSubscriptions.values()) sub.stop();
this.receiveSubscriptions.clear();
this.requestedSubscriptions.clear();
this.participantTrackSids.clear();
this.activeSpeakers.clear();
}
Comment on lines +422 to +458
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

subscribeParticipantAudio doesn’t actually gate subscriptions.

Because TrackSubscribed always calls subscribeParticipantTrack (and autoSubscribe: true is set), inbound audio decoding starts for every participant even when no subscription was requested. This makes subscribeParticipantAudio and autoResubscribe effectively no-ops and can drive unexpected CPU usage + audioFrame emissions in large channels.

Consider tracking explicit subscription intent and only starting the pump for those participants (and honor autoResubscribe on unsubscribe):

🔧 Suggested gating approach
@@
-  private readonly receiveSubscriptions = new Map<string, LiveKitReceiveSubscription>();
+  private readonly receiveSubscriptions = new Map<string, LiveKitReceiveSubscription>();
+  private readonly receiveIntents = new Map<string, { autoResubscribe: boolean }>();
@@
   subscribeParticipantAudio(
     participantId: string,
     options: { autoResubscribe?: boolean } = {},
   ): LiveKitReceiveSubscription {
+    this.receiveIntents.set(participantId, {
+      autoResubscribe: options.autoResubscribe ?? true,
+    });
     const stop = () => {
+      this.receiveIntents.delete(participantId);
       this.receiveSubscriptions.get(participantId)?.stop();
       this.receiveSubscriptions.delete(participantId);
       this.participantTrackSids.delete(participantId);
     };
@@
       room.on(RoomEvent.TrackSubscribed, (track, _publication, participant) => {
         if (!this.isAudioTrack(track)) return;
+        const participantId = this.getParticipantId(participant);
+        if (!this.receiveIntents.has(participantId)) return;
         this.subscribeParticipantTrack(participant, track);
       });
@@
       room.on(RoomEvent.TrackUnsubscribed, (track, _publication, participant) => {
         if (!this.isAudioTrack(track)) return;
         const participantId = this.getParticipantId(participant);
         this.receiveSubscriptions.get(participantId)?.stop();
         this.receiveSubscriptions.delete(participantId);
         this.participantTrackSids.delete(participantId);
+        if (!this.receiveIntents.get(participantId)?.autoResubscribe) {
+          this.receiveIntents.delete(participantId);
+        }
       });
@@
   private clearReceiveSubscriptions(): void {
     for (const sub of this.receiveSubscriptions.values()) sub.stop();
     this.receiveSubscriptions.clear();
     this.participantTrackSids.clear();
     this.activeSpeakers.clear();
+    this.receiveIntents.clear();
   }

Also applies to: 473-512

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/voice/src/LiveKitRtcConnection.ts` around lines 387 - 423,
subscribeParticipantAudio currently doesn't prevent automatic audio work because
TrackSubscribed always calls subscribeParticipantTrack with autoSubscribe:true;
update the logic to track explicit subscription intent (e.g., store a flag in
this.receiveSubscriptions or a new this.requestedSubscriptions set keyed by
participantId inside subscribeParticipantAudio and unsubscribe flows) and modify
the TrackSubscribed/subscribeParticipantTrack codepath to only start
decoding/processing (the "pump") when the participantId is present in that
intent store or when autoSubscribe is deliberately enabled; also ensure
unsubscribe respects options.autoResubscribe by clearing the intent store only
when autoResubscribe is false so that automatic re-subscription behavior is
preserved.


playOpus(_stream: NodeJS.ReadableStream): void {
this.emit(
'error',
Expand Down Expand Up @@ -353,7 +505,52 @@ export class LiveKitRtcConnection extends EventEmitter {
this.debug('Room reconnected');
});

await room.connect(url, token, { autoSubscribe: false, dynacast: false });
room.on(RoomEvent.TrackSubscribed, (track, _publication, participant) => {
if (!this.isAudioTrack(track)) return;
this.subscribeParticipantTrack(participant, track);
});

room.on(RoomEvent.TrackUnsubscribed, (track, _publication, participant) => {
if (!this.isAudioTrack(track)) return;
const participantId = this.getParticipantId(participant);
this.receiveSubscriptions.get(participantId)?.stop();
this.receiveSubscriptions.delete(participantId);
if (this.requestedSubscriptions.get(participantId) !== true) {
this.requestedSubscriptions.delete(participantId);
}
this.participantTrackSids.delete(participantId);
});

room.on(RoomEvent.ParticipantDisconnected, (participant) => {
const participantId = this.getParticipantId(participant);
this.receiveSubscriptions.get(participantId)?.stop();
this.receiveSubscriptions.delete(participantId);
if (this.requestedSubscriptions.get(participantId) !== true) {
this.requestedSubscriptions.delete(participantId);
}
this.participantTrackSids.delete(participantId);
if (this.activeSpeakers.delete(participantId)) {
this.emit('speakerStop', { participantId });
}
});

room.on(RoomEvent.ActiveSpeakersChanged, (speakers) => {
const next = new Set(speakers.map((speaker) => speaker.identity));
for (const participantId of next) {
if (!this.activeSpeakers.has(participantId)) {
this.emit('speakerStart', { participantId });
}
}
for (const participantId of this.activeSpeakers) {
if (!next.has(participantId)) {
this.emit('speakerStop', { participantId });
}
}
this.activeSpeakers.clear();
for (const participantId of next) this.activeSpeakers.add(participantId);
});

await room.connect(url, token, { autoSubscribe: true, dynacast: false });
this.lastServerEndpoint = raw;
this.lastServerToken = token;
this.debug('connected to room');
Expand Down Expand Up @@ -1523,6 +1720,7 @@ export class LiveKitRtcConnection extends EventEmitter {
stop(): void {
this._playing = false;
this.stopVideo();
this.clearReceiveSubscriptions();
if (this.currentStream?.destroy) this.currentStream.destroy();
this.currentStream = null;
if (this.audioTrack) {
Expand Down
46 changes: 46 additions & 0 deletions packages/voice/src/VoiceManager.receive.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { describe, it, expect, vi } from 'vitest';
import { VoiceManager } from './VoiceManager.js';
import { LiveKitRtcConnection } from './LiveKitRtcConnection.js';

function makeClient() {
return {
on: vi.fn(),
emit: vi.fn(),
sendToGateway: vi.fn(),
user: { id: 'bot' },
rest: { post: vi.fn() },
};
}

describe('VoiceManager receive helpers', () => {
it('lists participants in a channel from voice state map', () => {
const manager = new VoiceManager(makeClient() as never);

manager.voiceStates.set('g1', new Map([['u1', 'c1'], ['u2', 'c1'], ['u3', 'c2']]));

expect(manager.listParticipantsInChannel('g1', 'c1')).toEqual(['u1', 'u2']);
expect(manager.listParticipantsInChannel('g1', 'missing')).toEqual([]);
});

it('subscribes known channel participants for livekit connections', () => {
const client = makeClient();
const manager = new VoiceManager(client as never);

const channel = { id: 'c1', guildId: 'g1' } as never;
const conn = new LiveKitRtcConnection(client as never, channel, 'bot');
const subscribeSpy = vi
.spyOn(conn, 'subscribeParticipantAudio')
.mockImplementation((participantId) => ({ participantId, stop: vi.fn() }));

manager.voiceStates.set('g1', new Map([['bot', 'c1'], ['u1', 'c1'], ['u2', 'c1'], ['u3', 'c9']]));
(manager as unknown as { connections: Map<string, LiveKitRtcConnection> }).connections.set(
'c1',
conn,
);

const subs = manager.subscribeChannelParticipants('c1');

expect(subscribeSpy).toHaveBeenCalledTimes(2);
expect(subs.map((s) => s.participantId)).toEqual(['u1', 'u2']);
});
});
Loading