修复 issue 1283 内存泄漏根因#1288
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthrough在 Changes有界流式统计、任务生命周期安全与 Langfuse 载荷防护
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces memory-bounding mechanisms for stream processing and Langfuse tracing to prevent OOM and DoS risks. Key changes include the implementation of a BoundedStreamTextAccumulator to retain only head and tail chunks of large streams, binding task aborts to upstream responses, and truncating large payloads before sending them to Langfuse. Feedback on the changes highlights a bug in buildRequestMessagePreview where omitting the system and tools fields causes downstream trace summaries to incorrectly report missing system prompts and zero tool counts, which can be resolved using the provided code suggestion.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| function buildRequestMessagePreview(message: Record<string, unknown>): Record<string, unknown> { | ||
| return { | ||
| truncatedForLangfuse: true, | ||
| model: typeof message.model === "string" ? message.model : undefined, | ||
| stream: typeof message.stream === "boolean" ? message.stream : undefined, | ||
| max_tokens: typeof message.max_tokens === "number" ? message.max_tokens : undefined, | ||
| temperature: typeof message.temperature === "number" ? message.temperature : undefined, | ||
| messageCount: Array.isArray(message.messages) ? message.messages.length : undefined, | ||
| contentsCount: Array.isArray(message.contents) ? message.contents.length : undefined, | ||
| toolsCount: Array.isArray(message.tools) ? message.tools.length : undefined, | ||
| hasSystemPrompt: | ||
| (Array.isArray(message.system) && message.system.length > 0) || | ||
| (typeof message.system === "string" && message.system.length > 0), | ||
| }; | ||
| } |
There was a problem hiding this comment.
The newly introduced buildRequestMessagePreview function truncates the request message to avoid holding large objects in memory. However, it completely omits the system and tools fields, only providing hasSystemPrompt and toolsCount.
This causes a silent bug in buildRequestBodySummary (inside trace-proxy-request.ts), which directly accesses msg.system and msg.tools to compute hasSystemPrompt and toolsCount. Because these fields are missing on the preview object, hasSystemPrompt will always evaluate to false and toolsCount will always evaluate to 0 in Langfuse traces for all snapshotted sessions.
To fix this without modifying trace-proxy-request.ts (which is outside the active diff hunk), we can return dummy arrays for system and tools in buildRequestMessagePreview that match the expected checks (Array.isArray and .length).
function buildRequestMessagePreview(message: Record<string, unknown>): Record<string, unknown> {
if (!message) {
return { truncatedForLangfuse: true };
}
const hasSystemPrompt =
(Array.isArray(message.system) && message.system.length > 0) ||
(typeof message.system === "string" && message.system.length > 0);
const toolsCount = Array.isArray(message.tools) ? message.tools.length : 0;
return {
truncatedForLangfuse: true,
model: typeof message.model === "string" ? message.model : undefined,
stream: typeof message.stream === "boolean" ? message.stream : undefined,
max_tokens: typeof message.max_tokens === "number" ? message.max_tokens : undefined,
temperature: typeof message.temperature === "number" ? message.temperature : undefined,
messageCount: Array.isArray(message.messages) ? message.messages.length : undefined,
contentsCount: Array.isArray(message.contents) ? message.contents.length : undefined,
system: hasSystemPrompt ? [null] : [],
tools: new Array(toolsCount).fill(null),
};
}There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e065e2a5e9
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| AsyncTaskManager.register(taskId, processingPromise, { | ||
| taskType: "stream-processing", | ||
| abortController, | ||
| }); |
There was a problem hiding this comment.
Avoid timing out active streams after ten minutes
Registering the stream task with the live abortController but no staleTimeoutMs leaves it on AsyncTaskManager's default 10-minute wall-clock timeout; the periodic cleanup will call cancel(), and the new abort binding then aborts the upstream response. A legitimate long-running SSE generation with continuous chunks (not idle) will therefore be cut off and finalized as a timeout once it crosses roughly 10 minutes. Use an infinite/stream-specific stale timeout or base staleness on lack of activity rather than task age.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review Summary
This PR addresses memory leak issues in the proxy response handling pipeline. The changes introduce bounded stream accumulation, proper task cleanup with strong reference detachment, and abort controller chaining to upstream responses. All changes are intentional and well-tested.
PR Size: L
- Lines changed: 835 (+665, -170)
- Files changed: 7
Issues Found
| Category | Critical | High | Medium | Low |
|---|---|---|---|---|
| Logic/Bugs | 0 | 0 | 0 | 0 |
| Security | 0 | 0 | 0 | 0 |
| Error Handling | 0 | 0 | 0 | 0 |
| Types | 0 | 0 | 0 | 0 |
| Comments/Docs | 0 | 0 | 0 | 0 |
| Tests | 0 | 0 | 0 | 0 |
| Simplification | 0 | 0 | 0 | 0 |
Review Coverage
- Logic and correctness - Clean
- Security (OWASP Top 10) - Clean
- Error handling - All errors logged, no silent failures
- Type safety - Type assertions are intentional with proper null safety
- Documentation accuracy - Comments accurately describe the design
- Test coverage - Good coverage of new functionality including edge cases
- Code clarity - Well-structured, complex algorithms properly explained
Key Changes Verified
- AsyncTaskManager: Fixed stale cleanup to properly detach from Map and avoid old task finalizers removing new tasks with same taskId
- BoundedStreamTextAccumulator: Well-designed bounded buffer (1MB head + 9MB tail) prevents unbounded memory growth
- AbortController chaining: properly propagates abort signals to upstream response readers
- Langfuse truncation: Response/request body truncation happens before async import to prevent closure-captured references
- Tests: Comprehensive new tests validate task cleanup behavior and large response handling
Minor Notes (No Action Required)
- Type assertion
as unknown as ProxySessioninbuildLangfuseSessionSnapshotis intentional - all methods used bytraceProxyRequestare properly stubbed - Null safety checks in place for
session.forwardedRequestBodyandsession.request.log
Automated review by Claude AI
There was a problem hiding this comment.
Code Review Summary
This PR addresses memory leak issues in the proxy response handling pipeline. The changes introduce bounded stream accumulation, proper task cleanup with strong reference detachment, and abort controller chaining to upstream responses. All changes are intentional and well-tested.
PR Size: L
- Lines changed: 835 (+665, -170)
- Files changed: 7
Issues Found
| Category | Critical | High | Medium | Low |
|---|---|---|---|---|
| Logic/Bugs | 0 | 0 | 0 | 0 |
| Security | 0 | 0 | 0 | 0 |
| Error Handling | 0 | 0 | 0 | 0 |
| Types | 0 | 0 | 0 | 0 |
| Comments/Docs | 0 | 0 | 0 | 0 |
| Tests | 0 | 0 | 0 | 0 |
| Simplification | 0 | 0 | 0 | 0 |
Review Coverage
- Logic and correctness - Clean
- Security (OWASP Top 10) - Clean
- Error handling - All errors logged, no silent failures
- Type safety - Type assertions are intentional with proper null safety
- Documentation accuracy - Comments accurately describe the design
- Test coverage - Good coverage of new functionality including edge cases
- Code clarity - Well-structured, complex algorithms properly explained
Key Changes Verified
- AsyncTaskManager: Fixed stale cleanup to properly detach from Map and avoid old task finalizers removing new tasks with same taskId
- BoundedStreamTextAccumulator: Well-designed bounded buffer (1MB head + 9MB tail) prevents unbounded memory growth
- AbortController chaining: Properly propagates abort signals to upstream response readers
- Langfuse truncation: Response/request body truncation happens before async import to prevent closure-captured references
- Tests: Comprehensive new tests validate task cleanup behavior and large response handling
Minor Notes (No Action Required)
- Type assertion in buildLangfuseSessionSnapshot is intentional - all methods used by traceProxyRequest are properly stubbed
- Null safety checks in place for optional session properties
Automated review by Claude AI
| return { | ||
| startTime: session.startTime, | ||
| method: session.method, | ||
| headers: new Headers(session.headers), | ||
| request: { | ||
| message: requestMessage, | ||
| log: truncateResponseTextForLangfuse(session.request.log ?? ""), | ||
| note: session.request.note, | ||
| model: session.request.model, | ||
| imageRequestMetadata: null, | ||
| }, | ||
| userAgent: session.userAgent, | ||
| provider: session.provider, | ||
| messageContext: session.messageContext, | ||
| ttfbMs: session.ttfbMs, | ||
| forwardStartTime: session.forwardStartTime, | ||
| forwardedRequestBody, | ||
| sessionId: session.sessionId, | ||
| originalFormat: session.originalFormat, | ||
| getMessagesLength: () => messagesLength, | ||
| getEndpoint: () => endpoint, | ||
| getCurrentModel: () => currentModel, | ||
| getProviderChain: () => providerChain, | ||
| getRequestSequence: () => requestSequence, | ||
| getOriginalModel: () => originalModel, | ||
| isModelRedirected: () => modelRedirected, | ||
| getSpecialSettings: () => specialSettings, | ||
| getCacheTtlResolved: () => cacheTtlResolved, | ||
| getContext1mApplied: () => context1mApplied, | ||
| } as unknown as ProxySession; |
There was a problem hiding this comment.
buildRequestBodySummary reads wrong keys from the snapshot object
buildLangfuseSessionSnapshot replaces session.request.message with the output of buildRequestMessagePreview, which stores hasSystemPrompt: bool and toolsCount: number as flat scalar fields. But buildRequestBodySummary in trace-proxy-request.ts reads msg.system (an array) to compute hasSystemPrompt and msg.tools (an array) to compute toolsCount. Because the preview object never sets system or tools, every Langfuse trace will show hasSystemPrompt: false and toolsCount: 0 regardless of what the original request contained, silently corrupting the requestSummary metadata for all requests after this change ships.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/lib/langfuse/emit-proxy-trace.ts
Line: 66-95
Comment:
**`buildRequestBodySummary` reads wrong keys from the snapshot object**
`buildLangfuseSessionSnapshot` replaces `session.request.message` with the output of `buildRequestMessagePreview`, which stores `hasSystemPrompt: bool` and `toolsCount: number` as flat scalar fields. But `buildRequestBodySummary` in `trace-proxy-request.ts` reads `msg.system` (an array) to compute `hasSystemPrompt` and `msg.tools` (an array) to compute `toolsCount`. Because the preview object never sets `system` or `tools`, every Langfuse trace will show `hasSystemPrompt: false` and `toolsCount: 0` regardless of what the original request contained, silently corrupting the `requestSummary` metadata for all requests after this change ships.
How can I resolve this? If you propose a fix, please make it concise.| chunkCount: number; | ||
| }; | ||
|
|
||
| // 流式统计只需要头部元信息和尾部 usage/final event。按字节保存窗口,避免 | ||
| // string[] 无界增长,也避免 subarray 持有超大原始 ArrayBuffer。 | ||
| class BoundedStreamTextAccumulator { | ||
| private readonly headChunks: Uint8Array[] = []; | ||
| private readonly tailChunks: Uint8Array[] = []; | ||
| private readonly tailChunkBytes: number[] = []; | ||
| private headBufferedBytes = 0; | ||
| private tailBufferedBytes = 0; | ||
| private tailHead = 0; | ||
| private tailMode = false; | ||
| private truncated = false; |
There was a problem hiding this comment.
Unreachable
else branch in pushBytes
Inside the guard if (!this.tailMode && this.headBufferedBytes < STREAM_STATS_HEAD_BYTES), remainingHeadBytes is computed as STREAM_STATS_HEAD_BYTES - this.headBufferedBytes. Because the outer condition already guarantees headBufferedBytes < STREAM_STATS_HEAD_BYTES, remainingHeadBytes is always strictly greater than 0, so the else { this.tailMode = true; this.pushTailBytes(value); } branch can never be reached. The code is harmless, but dead code in a performance-sensitive accumulator is misleading.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 71-84
Comment:
**Unreachable `else` branch in `pushBytes`**
Inside the guard `if (!this.tailMode && this.headBufferedBytes < STREAM_STATS_HEAD_BYTES)`, `remainingHeadBytes` is computed as `STREAM_STATS_HEAD_BYTES - this.headBufferedBytes`. Because the outer condition already guarantees `headBufferedBytes < STREAM_STATS_HEAD_BYTES`, `remainingHeadBytes` is always strictly greater than 0, so the `else { this.tailMode = true; this.pushTailBytes(value); }` branch can never be reached. The code is harmless, but dead code in a performance-sensitive accumulator is misleading.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/lib/async-task-manager.ts (1)
225-236: 🧹 Nitpick | 🔵 Trivial | 💤 Low value可复用已定义的常量以消除重复。
第 220 行的
staleThreshold与第 33 行的DEFAULT_STALE_TASK_TIMEOUT_MS值相同,可直接复用常量。♻️ 建议的改动
private cleanupCompletedTasks(): void { const now = Date.now(); - const staleThreshold = 10 * 60 * 1000; // 10 分钟 for (const [taskId, taskInfo] of this.tasks.entries()) { const age = now - taskInfo.createdAt; - const staleTimeoutMs = taskInfo.staleTimeoutMs || staleThreshold; + const staleTimeoutMs = taskInfo.staleTimeoutMs || DEFAULT_STALE_TASK_TIMEOUT_MS;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/lib/async-task-manager.ts` around lines 225 - 236, The variable staleThreshold is being used as a fallback value in the assignment at line 226, but it duplicates the constant DEFAULT_STALE_TASK_TIMEOUT_MS that is already defined. Replace all occurrences of staleThreshold with the existing constant DEFAULT_STALE_TASK_TIMEOUT_MS to eliminate the duplication. Update the expression that uses staleThreshold as the fallback value in the staleTimeoutMs assignment to use DEFAULT_STALE_TASK_TIMEOUT_MS instead.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/lib/async-task-manager.ts`:
- Around line 225-236: The variable staleThreshold is being used as a fallback
value in the assignment at line 226, but it duplicates the constant
DEFAULT_STALE_TASK_TIMEOUT_MS that is already defined. Replace all occurrences
of staleThreshold with the existing constant DEFAULT_STALE_TASK_TIMEOUT_MS to
eliminate the duplication. Update the expression that uses staleThreshold as the
fallback value in the staleTimeoutMs assignment to use
DEFAULT_STALE_TASK_TIMEOUT_MS instead.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: fab7ff45-d099-4368-aaa2-243cb53ce1ed
📒 Files selected for processing (7)
src/app/v1/_lib/proxy/response-handler.tssrc/lib/async-task-manager.tssrc/lib/langfuse/emit-proxy-trace.tssrc/lib/langfuse/trace-proxy-request.tstests/unit/lib/async-task-manager-edge-runtime.test.tstests/unit/proxy/response-handler-client-abort-drain.test.tstests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts
|
本轮根据深挖和自动审查反馈做了增量修正:
本地验证:
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 92bb359b2b
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| statusCode: data.statusCode, | ||
| isStreaming: data.isStreaming, | ||
| responseText: data.responseText, | ||
| responseText, |
There was a problem hiding this comment.
Stop capturing large Langfuse trace inputs
When Langfuse is enabled for a large response, this passes the truncated responseText, but the async callback still closes over the original data object via the adjacent data.* reads; that object still contains the full data.responseText, so the full response stays strongly reachable until the dynamic import and trace submission finish. Destructure the non-large fields before the import() and avoid capturing data so the truncation actually releases the original payload.
Useful? React with 👍 / 👎.
| while ( | ||
| this.tailBufferedBytes > STREAM_STATS_TAIL_BYTES && | ||
| this.tailHead < this.tailChunkBytes.length | ||
| ) { | ||
| this.tailBufferedBytes -= this.tailChunkBytes[this.tailHead] ?? 0; | ||
| this.tailChunks[this.tailHead] = new Uint8Array(); |
There was a problem hiding this comment.
Preserve the full byte tail when evicting chunks
For long streams delivered in uneven large chunks, this evicts the entire oldest chunk whenever the tail is just over the byte limit, so the retained text can shrink by several MB even though most of that chunk is still within the intended 9 MB tail window. If the terminal SSE usage/completion event is split across that boundary, parseUsageFromResponseText can miss the usage and final accounting/debug finalization loses tokens; trim only the overflow bytes from the oldest chunk instead of dropping it wholesale.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/lib/async-task-manager.ts (1)
210-213:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift
cleanup的可选实例参数会留下“误删新任务”的竞态窗口
expectedTask目前是可选的,导致cleanup(taskId)仍可无条件删除当前 Map 条目。结合src/app/v1/_lib/proxy/response-handler.ts(Line 1934-1946)里仍存在的AsyncTaskManager.cleanup(taskId),旧任务的 finally 仍可能删掉同taskId的新任务,造成新任务失去跟踪(后续cancel/超时清理失效)。建议将实例校验改为强制(
expectedTask必填),或拆分成仅内部可用的强制清理入口并迁移调用方,避免绕过实例一致性保护。🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/lib/async-task-manager.ts` around lines 210 - 213, The cleanup method in async-task-manager.ts has a race condition because expectedTask is optional, allowing cleanup(taskId) to unconditionally delete any task with that ID, which can cause a new task with the same taskId to be deleted by an old task's finally block. Make the expectedTask parameter mandatory (remove the question mark making it required) in the cleanup method signature to force all callers to provide the task instance for verification. Also update the calling code in response-handler.ts (around lines 1934-1946) where cleanup is called to pass the actual task instance as the second argument instead of calling it with only taskId.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@src/lib/async-task-manager.ts`:
- Around line 210-213: The cleanup method in async-task-manager.ts has a race
condition because expectedTask is optional, allowing cleanup(taskId) to
unconditionally delete any task with that ID, which can cause a new task with
the same taskId to be deleted by an old task's finally block. Make the
expectedTask parameter mandatory (remove the question mark making it required)
in the cleanup method signature to force all callers to provide the task
instance for verification. Also update the calling code in response-handler.ts
(around lines 1934-1946) where cleanup is called to pass the actual task
instance as the second argument instead of calling it with only taskId.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a70fbad5-a33d-42ff-8542-6749140be6d4
📒 Files selected for processing (13)
src/app/v1/_lib/proxy/response-handler.tssrc/lib/async-task-manager.tssrc/lib/langfuse/trace-proxy-request.tstests/unit/langfuse/langfuse-trace.test.tstests/unit/lib/async-task-manager-edge-runtime.test.tstests/unit/proxy/response-handler-abort-listener-cleanup.test.tstests/unit/proxy/response-handler-bill-non-success.test.tstests/unit/proxy/response-handler-client-abort-drain.test.tstests/unit/proxy/response-handler-endpoint-circuit-isolation.test.tstests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.tstests/unit/proxy/response-handler-hedge-loser-priority.test.tstests/unit/proxy/response-handler-lease-decrement.test.tstests/unit/proxy/response-handler-non200.test.ts
✅ Files skipped from review due to trivial changes (1)
- tests/unit/proxy/response-handler-non200.test.ts
🚧 Files skipped from review as they are similar to previous changes (4)
- tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts
- src/lib/langfuse/trace-proxy-request.ts
- tests/unit/proxy/response-handler-client-abort-drain.test.ts
- src/app/v1/_lib/proxy/response-handler.ts
|
本轮继续深挖内存泄漏链路后,补了 3 个更深层的风险点:
验证:
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/lib/async-task-manager.ts (1)
225-234: 🧹 Nitpick | 🔵 Trivial | 💤 Low value文档注释与实际逻辑不一致
注释描述的是旧的清理逻辑(基于 age 的固定 10 分钟超时),但代码已改为:
- 使用
idleAge(距上次活动时间)而非总存活时间- 超时阈值来自
taskInfo.staleTimeoutMs,可任务级配置建议更新注释以反映当前行为:
建议的文档更新
/** * 检查并清理超时任务 * - * 遍历所有活跃任务,对于超过 10 分钟还未完成的任务: + * 遍历所有活跃任务,对于空闲时间超过 staleTimeoutMs 的任务: * 1. 记录警告日志 * 2. 触发 AbortController 取消任务 * 3. 从任务 Map 中移除 * - * ⚠️ 注意:这不是清理"已完成"的任务,而是清理"超时未完成"的任务 + * ⚠️ 注意:这不是清理"已完成"的任务,而是清理"空闲超时"的任务。 + * 活跃流应定期调用 touch() 更新 lastActivityAt 以避免被误判为挂死。 */🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/lib/async-task-manager.ts` around lines 225 - 234, Update the JSDoc comment for the task cleanup function to accurately reflect the current implementation. Replace the reference to "超过 10 分钟还未完成的任务" (tasks not completed in over 10 minutes) with a description that specifies it uses idleAge (time since last activity) instead of total task age, and that the timeout threshold is determined by the configurable taskInfo.staleTimeoutMs property per task rather than a fixed 10-minute duration. Ensure the updated comment explains that tasks are cleaned up when they have been idle (inactive) beyond their configured staleTimeoutMs value.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/lib/async-task-manager.ts`:
- Around line 225-234: Update the JSDoc comment for the task cleanup function to
accurately reflect the current implementation. Replace the reference to "超过 10
分钟还未完成的任务" (tasks not completed in over 10 minutes) with a description that
specifies it uses idleAge (time since last activity) instead of total task age,
and that the timeout threshold is determined by the configurable
taskInfo.staleTimeoutMs property per task rather than a fixed 10-minute
duration. Ensure the updated comment explains that tasks are cleaned up when
they have been idle (inactive) beyond their configured staleTimeoutMs value.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b61f46ed-e80a-4b6a-b4b4-11f4ac71d9d5
📒 Files selected for processing (4)
src/app/v1/_lib/proxy/response-handler.tssrc/lib/async-task-manager.tssrc/lib/langfuse/emit-proxy-trace.tstests/unit/proxy/response-handler-client-abort-drain.test.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- src/lib/langfuse/emit-proxy-trace.ts
- src/app/v1/_lib/proxy/response-handler.ts
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0edd5828ca
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| const headText = this.decodeChunks(this.headChunks, 0, this.headBufferedBytes); | ||
| const tailText = this.decodeChunks(this.tailChunks, this.tailHead, this.tailBufferedBytes); |
There was a problem hiding this comment.
Decode contiguous UTF-8 windows as one buffer
For any stream that crosses the 1 MiB head/tail split without being truncated, a multibyte UTF-8 character can land across this boundary. Decoding headChunks and tailChunks with separate TextDecoder instances turns that one character into replacement characters even though the snapshot is marked complete, so the response persisted to session debug storage and sent to Langfuse no longer matches the upstream body. Decode the combined head+tail bytes when truncated is false, or carry decoder state across the split.
Useful? React with 👍 / 👎.
|
继续处理最新自动审查发现的有效问题:
本地验证:
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: bd0e3a27ac
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| AsyncTaskManager.register(taskId, processingPromise, { | ||
| taskType: "non-stream-processing", | ||
| abortController, |
There was a problem hiding this comment.
Avoid aborting active non-stream downloads as stale
When a non-stream response body is still being downloaded for more than 10 minutes (for example with the default requestTimeoutNonStreamingMs = 0 or a very large slow body), this task sits inside responseForLog.text() and never calls AsyncTaskManager.touch(). Registering the live abortController with the manager's default stale timeout lets cleanup abort the shared upstream responseController via bindTaskAbortToUpstreamResponse, cutting off the client even though bytes may still be flowing; the Gemini non-stream passthrough stats registration above has the same pattern. Touch while reading or set a stale timeout that matches the non-stream response timeout instead of the manager default.
Useful? React with 👍 / 👎.
|
补充一轮深挖后又处理了同一根因链上的两个非流式路径,已推送提交 这次补充修复:
重新验证:
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: aa34336193
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| return; | ||
| } | ||
|
|
||
| this.headChunks.push(value.slice(0, remainingHeadBytes)); |
There was a problem hiding this comment.
Copy Buffer chunks before retaining accumulator windows
When the upstream body comes through nodeStreamToWebStreamSafe, chunks can be Node Buffers because Buffer satisfies instanceof Uint8Array; on those chunks, slice() returns a view rather than copying. This stored “1 MiB” head slice (and the tail slices below) can therefore keep the entire large upstream chunk backing store alive until finalization, defeating the bounded 10 MiB accumulator for large single-chunk streams. Store an actual copy, e.g. from value.subarray(...), before retaining these windows.
Useful? React with 👍 / 👎.
|
补充一轮更深的内存持有链修复与验证:
额外复核:
验证:
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0cbef88260
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| AsyncTaskManager.register(taskId, processingPromise, { | ||
| taskType: "stream-processing", | ||
| abortController, | ||
| }); |
There was a problem hiding this comment.
Respect disabled stream idle timeouts
When provider.streamingIdleTimeoutMs <= 0, this stream path treats the idle watchdog as disabled (Infinity), but the task is still registered with the live abort controller and no staleTimeoutMs, so AsyncTaskManager applies its default 10-minute idle cleanup. Fresh evidence after the activity-based change is that touch() only runs when chunks arrive, so a legitimate SSE stream with a >10 minute silent gap after some output will be aborted by the generic cleanup despite the provider idle timeout being disabled; pass a stream-specific stale timeout or disable stale cleanup for this case.
Useful? React with 👍 / 👎.
|
本轮继续深挖并处理了最新自动审查指出的一个真实问题:流式 idle timeout 被显式禁用时, 修正点:
已验证:
|
|
@coderabbitai review |
|
✅ Action performedReview finished.
|
|
Codex Review: Didn't find any major issues. Hooray! Reviewed commit: ℹ️ About Codex in GitHubCodex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback". |
背景
修复 #1283 报告的代理进程内存快速上涨问题。本次不是按 issue bot 的候选做兜底处理,而是沿请求生命周期反查强引用链:后台任务注册、上游 Response 读取、流式统计缓冲、Session 调试落盘、Langfuse 异步观测。
根因
AsyncTaskManager的 stale cleanup 只abort(),没有从tasksMap 删除,导致 Promise/闭包/session/response buffer 仍被强引用;同时旧任务 finally 可能误删同 taskId 的新任务。ResponseHandler注册的后台任务 AbortController 没有接到真实上游responseController,所以 stale cleanup 不能中断正在等待的response.text()/reader.read()。string[]完整累积 SSE;STORE_SESSION_RESPONSE_BODY=false只影响 Redis 写入,不影响本次内存累积。ProxySession和大响应正文。修复
AsyncTaskManager.register()支持传入真实AbortController,stale cleanup 现在会取消并从 Map 断开强引用;cleanup 增加任务身份校验,避免旧 finally 清理新任务。ResponseHandler将任务 abort 绑定到上游 response controller,确保超时清理能真正唤醒并结束上游读取。验证
bunx vitest run tests/unit/lib/async-task-manager-edge-runtime.test.ts tests/unit/proxy/response-handler-client-abort-drain.test.ts tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts tests/unit/proxy/response-handler-lease-decrement.test.ts tests/unit/proxy/response-handler-non200.test.tsbun run typecheckbunx biome checkon changed filesbun run lintbun run buildbun run testtests/unit/k8s-cch-update-flow.test.ts通过,确认与本 PR 无关。Greptile Summary
This PR fixes the root causes of a memory leak in the agent proxy: stale
AsyncTaskManagertasks were never removed from theMap(keeping Promises, closures, sessions, and response buffers alive indefinitely), and stream stats accumulated response bodies in an unboundedstring[]. The Langfuse fire-and-forget path also held strong references to fullProxySessionobjects and large response bodies across a dynamic import boundary.AsyncTaskManager:cleanup()is now private and identity-checked so old task finalizers cannot delete newer tasks registered under the same ID;cleanupCompletedTasks()now callscleanup()afterabort(), breaking the strong-reference chain; a newtouch()/lastActivityAtmechanism prevents active streaming tasks from being falsely evicted.BoundedStreamTextAccumulator: replaces the unboundedstring[]in all streaming paths with a byte-level head (1 MiB) + circular-tail (9 MiB) window; truncated snapshots are no longer written to Redis as if they were complete response bodies.dynamic import, so the async closure no longer holds a reference to the full session or response body.Confidence Score: 4/5
Safe to merge — the core reference-leak fixes are correct and well-tested, with the non-streaming body accumulation path remaining as a known architectural limitation.
The
resolveStreamTaskStaleTimeoutMsfunction silently returnsInfinityfor providers that configure onlystreamingIdleTimeoutMswithoutfirstByteTimeoutStreamingMs, leaving tasks stuck before the first byte with no cleanup safety net. This is a narrow configuration gap rather than a regression, but it means the protection added by this PR is incomplete for those deployments.src/app/v1/_lib/proxy/response-handler.ts — specifically
resolveStreamTaskStaleTimeoutMsandreadResponseTextWithTaskActivity.Important Files Changed
Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant RH as ResponseHandler participant ATM as AsyncTaskManager participant ACC as BoundedStreamTextAccumulator participant RC as session.responseController RH->>ATM: "register(taskId, promise, {abortController, staleTimeoutMs})" Note over ATM: stores {abortController, lastActivityAt, staleTimeoutMs} RH->>RC: bindTaskAbortToUpstreamResponse() loop Per upstream chunk RH->>ACC: pushBytes(chunk) RH->>ATM: touch(taskId) updates lastActivityAt end alt Normal completion ACC->>RH: finish() BoundedStreamTextSnapshot ATM->>ATM: promise.finally cleanup(taskId, taskInfo) Note over ATM: removes entry from Map else Stale cleanup fires ATM->>ATM: cancel(taskId) abortController.abort() ATM->>ATM: cleanup(taskId, taskInfo) removes from Map ATM-->>RC: abort propagated via bindTaskAbortToUpstreamResponse RC-->>RH: pending reader.read() throws AbortError end%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant RH as ResponseHandler participant ATM as AsyncTaskManager participant ACC as BoundedStreamTextAccumulator participant RC as session.responseController RH->>ATM: "register(taskId, promise, {abortController, staleTimeoutMs})" Note over ATM: stores {abortController, lastActivityAt, staleTimeoutMs} RH->>RC: bindTaskAbortToUpstreamResponse() loop Per upstream chunk RH->>ACC: pushBytes(chunk) RH->>ATM: touch(taskId) updates lastActivityAt end alt Normal completion ACC->>RH: finish() BoundedStreamTextSnapshot ATM->>ATM: promise.finally cleanup(taskId, taskInfo) Note over ATM: removes entry from Map else Stale cleanup fires ATM->>ATM: cancel(taskId) abortController.abort() ATM->>ATM: cleanup(taskId, taskInfo) removes from Map ATM-->>RC: abort propagated via bindTaskAbortToUpstreamResponse RC-->>RH: pending reader.read() throws AbortError endReviews (7): Last reviewed commit: "fix: 对齐响应任务 stale 超时语义" | Re-trigger Greptile