Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/salty-turkeys-stick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Ignore data track promise rejections after a subscription readable stream is discarded
5 changes: 4 additions & 1 deletion src/room/data-track/RemoteDataTrack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,14 @@ export default class RemoteDataTrack implements IRemoteTrack, IDataTrack {
*/
subscribe(options?: DataTrackSubscribeOptions): ReadableStream<DataTrackFrame> {
try {
const [stream] = this.manager.openSubscriptionStream(
const [stream, sfuSubscriptionComplete] = this.manager.openSubscriptionStream(
this.info.sid,
options?.signal,
options?.bufferSize,
);
// Prevent uncaught promise rejections from bubbling up if rejections occur after the
// readable stream is discarded.
sfuSubscriptionComplete.catch(() => {});
return stream;
} catch (err) {
// NOTE: Rethrow errors to break Throws<...> type boundary
Expand Down
70 changes: 70 additions & 0 deletions src/room/data-track/incoming/IncomingDataTrackManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -866,5 +866,75 @@ describe('DataTrackIncomingManager', () => {
// 8. Make sure the in flight stream is now complete
await expect(reader.read()).resolves.toStrictEqual({ value: undefined, done: true });
});

it(`should not produce an unhandled promise rejection when RemoteDataTrack.subscribe()'s signal is aborted`, async () => {
const manager = new IncomingDataTrackManager();
const managerEvents = subscribeToEvents<DataTrackIncomingManagerCallbacks>(manager, [
'sfuUpdateSubscription',
'trackPublished',
]);

const sid = 'data track sid';

// 1. Register the data track so we can get a RemoteDataTrack via trackPublished.
await manager.receiveSfuPublicationUpdates(
new Map([
[
'identity',
[{ sid, pubHandle: DataTrackHandle.fromNumber(5), name: 'test', usesE2ee: false }],
],
]),
);
const { track } = await managerEvents.waitFor('trackPublished');

// 2. Listen for unhandled rejections (coming from sfuSubscriptionComplete) and throw
// them so the test will terminate.
const onUnhandled = (reason: unknown) => {
throw reason;
};
process.on('unhandledRejection', onUnhandled);

try {
const controller = new AbortController();
const stream = track.subscribe({ signal: controller.signal });

// 3. Consume the stream the way a user would, catching the rejection.
const caughtByUser: unknown[] = [];
const consumerDone = (async () => {
try {
const reader = stream.getReader();
while (true) {
const { done } = await reader.read();
if (done) {
return;
}
}
} catch (err) {
caughtByUser.push(err);
}
})();

// Wait until subscribeRequest has kicked off so we abort during the
// 'pending' state — the path that rejects sfuSubscriptionComplete.
await managerEvents.waitFor('sfuUpdateSubscription');

// 4. Abort the subscription
controller.abort();
await consumerDone;

// Drain microtasks so any unhandledRejection has a chance to fire.
await new Promise((resolve) => setTimeout(resolve, 0));
await new Promise((resolve) => setTimeout(resolve, 0));

// 5. Make sure that no `unhandledrejection`s occur and get bubbled up as user
// facing errors.

// But, the error should still get raised by the user so they can catch it / do with it as
// they please.
expect(caughtByUser).toHaveLength(1);
} finally {
process.off('unhandledRejection', onUnhandled);
}
});
});
});
Loading