-
Notifications
You must be signed in to change notification settings - Fork 4.2k
fix(ai): resolve race condition in parallel tool execution #11907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "ai": patch | ||
| --- | ||
|
|
||
| fix(ai): resolve race condition in parallel tool execution causing stream errors |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -150,13 +150,23 @@ export function runToolsTransformation<TOOLS extends ToolSet>({ | |
| const toolCallsByToolCallId = new Map<string, TypedToolCall<TOOLS>>(); | ||
|
|
||
| let canClose = false; | ||
| let closed = false; // Prevent race condition when multiple tools complete simultaneously | ||
| let finishChunk: | ||
| | (SingleRequestTextStreamPart<TOOLS> & { type: 'finish' }) | ||
| | undefined = undefined; | ||
|
|
||
| function attemptClose() { | ||
| // Prevent re-entry: if already closed, nothing to do | ||
| if (closed) { | ||
| return; | ||
| } | ||
|
|
||
| // close the tool results controller if no more outstanding tool calls | ||
| if (canClose && outstandingToolResults.size === 0) { | ||
| // Mark as closed BEFORE doing any work to prevent race conditions | ||
| // where multiple finally() blocks call attemptClose() simultaneously | ||
| closed = true; | ||
|
|
||
| // we delay sending the finish chunk until all tool results (incl. delayed ones) | ||
| // are received to ensure that the frontend receives tool results before a message | ||
| // finish event arrives. | ||
|
|
@@ -309,7 +319,9 @@ export function runToolsTransformation<TOOLS extends ToolSet>({ | |
|
|
||
| // Only execute tools that are not provider-executed: | ||
| if (tool.execute != null && toolCall.providerExecuted !== true) { | ||
| const toolExecutionId = generateId(); // use our own id to guarantee uniqueness | ||
| // Use toolCallId which is unique per tool call from the LLM | ||
| // (generateId() was returning the same value for multiple tools in a batch) | ||
| const toolExecutionId = toolCall.toolCallId; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. has this fix been ai generated?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| outstandingToolResults.add(toolExecutionId); | ||
|
|
||
| // Note: we don't await the tool execution here (by leaving out 'await' on recordSpan), | ||
|
|
@@ -324,17 +336,26 @@ export function runToolsTransformation<TOOLS extends ToolSet>({ | |
| abortSignal, | ||
| experimental_context, | ||
| onPreliminaryToolResult: result => { | ||
| toolResultsStreamController!.enqueue(result); | ||
| // Guard against enqueueing after stream is closed (parallel tool race) | ||
| if (!closed) { | ||
| toolResultsStreamController!.enqueue(result); | ||
| } | ||
| }, | ||
| }) | ||
| .then(result => { | ||
| toolResultsStreamController!.enqueue(result); | ||
| // Guard against enqueueing after stream is closed (parallel tool race) | ||
| if (!closed) { | ||
| toolResultsStreamController!.enqueue(result); | ||
| } | ||
| }) | ||
| .catch(error => { | ||
| toolResultsStreamController!.enqueue({ | ||
| type: 'error', | ||
| error, | ||
| }); | ||
| // Guard against enqueueing after stream is closed (parallel tool race) | ||
| if (!closed) { | ||
| toolResultsStreamController!.enqueue({ | ||
| type: 'error', | ||
| error, | ||
| }); | ||
| } | ||
| }) | ||
| .finally(() => { | ||
| outstandingToolResults.delete(toolExecutionId); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unguarded stream controller enqueue calls throw errors when stream is closed externally (via AbortSignal), causing uncaught exceptions and silently dropping results