Skip to content

修复 issue 1283 内存泄漏根因#1288

Open
tesgth032 wants to merge 7 commits into
ding113:devfrom
tesgth032:fix/issue-1283-memory-leak
Open

修复 issue 1283 内存泄漏根因#1288
tesgth032 wants to merge 7 commits into
ding113:devfrom
tesgth032:fix/issue-1283-memory-leak

Conversation

@tesgth032

@tesgth032 tesgth032 commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

背景

修复 #1283 报告的代理进程内存快速上涨问题。本次不是按 issue bot 的候选做兜底处理,而是沿请求生命周期反查强引用链:后台任务注册、上游 Response 读取、流式统计缓冲、Session 调试落盘、Langfuse 异步观测。

根因

  1. AsyncTaskManager 的 stale cleanup 只 abort(),没有从 tasks Map 删除,导致 Promise/闭包/session/response buffer 仍被强引用;同时旧任务 finally 可能误删同 taskId 的新任务。
  2. ResponseHandler 注册的后台任务 AbortController 没有接到真实上游 responseController,所以 stale cleanup 不能中断正在等待的 response.text() / reader.read()
  3. 普通流式统计使用 string[] 完整累积 SSE;STORE_SESSION_RESPONSE_BODY=false 只影响 Redis 写入,不影响本次内存累积。
  4. Langfuse fire-and-forget 在动态 import/SDK 调用期间会闭包持有完整 ProxySession 和大响应正文。
  5. 继续深挖发现 Gemini stream passthrough 仍有旧字符串窗口路径,超大单 chunk 会造成大字符串保留,并且可能丢尾部 usage。

修复

  • AsyncTaskManager.register() 支持传入真实 AbortController,stale cleanup 现在会取消并从 Map 断开强引用;cleanup 增加任务身份校验,避免旧 finally 清理新任务。
  • ResponseHandler 将任务 abort 绑定到上游 response controller,确保超时清理能真正唤醒并结束上游读取。
  • 引入有界流式统计累加器:保留头部 1MiB + 尾部 9MiB,尾部用于 usage/final event,截断快照不再伪装成完整正文写入 Redis/调试快照。
  • Langfuse 发送前同步构造轻量 session snapshot,并截断大响应/大请求日志,避免观测链路强引用完整请求对象。
  • Gemini 流式透传改用同一套有界字节窗口,覆盖超大单 chunk 仍保留尾部 usage 的场景。

验证

  • bunx vitest run tests/unit/lib/async-task-manager-edge-runtime.test.ts tests/unit/proxy/response-handler-client-abort-drain.test.ts tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts tests/unit/proxy/response-handler-lease-decrement.test.ts tests/unit/proxy/response-handler-non200.test.ts
    • 6 files passed, 53 tests passed
  • bun run typecheck
  • bunx biome check on changed files
  • bun run lint
    • 退出码 0;仅保留仓库既有 warnings
  • bun run build
    • 通过;仅保留既有 Edge Runtime warnings
  • bun run test
    • 全量结果为 1 个 WSL 冷启动超时用例失败、其余 726 passed / 2 skipped;单独复跑 tests/unit/k8s-cch-update-flow.test.ts 通过,确认与本 PR 无关。

Greptile Summary

This PR fixes the root causes of a memory leak in the agent proxy: stale AsyncTaskManager tasks were never removed from the Map (keeping Promises, closures, sessions, and response buffers alive indefinitely), and stream stats accumulated response bodies in an unbounded string[]. The Langfuse fire-and-forget path also held strong references to full ProxySession objects and large response bodies across a dynamic import boundary.

  • AsyncTaskManager: cleanup() is now private and identity-checked so old task finalizers cannot delete newer tasks registered under the same ID; cleanupCompletedTasks() now calls cleanup() after abort(), breaking the strong-reference chain; a new touch() / lastActivityAt mechanism prevents active streaming tasks from being falsely evicted.
  • BoundedStreamTextAccumulator: replaces the unbounded string[] in all streaming paths with a byte-level head (1 MiB) + circular-tail (9 MiB) window; truncated snapshots are no longer written to Redis as if they were complete response bodies.
  • Langfuse: session snapshot and response truncation are built synchronously before the dynamic import, so the async closure no longer holds a reference to the full session or response body.

Confidence Score: 4/5

Safe to merge — the core reference-leak fixes are correct and well-tested, with the non-streaming body accumulation path remaining as a known architectural limitation.

The resolveStreamTaskStaleTimeoutMs function silently returns Infinity for providers that configure only streamingIdleTimeoutMs without firstByteTimeoutStreamingMs, leaving tasks stuck before the first byte with no cleanup safety net. This is a narrow configuration gap rather than a regression, but it means the protection added by this PR is incomplete for those deployments.

src/app/v1/_lib/proxy/response-handler.ts — specifically resolveStreamTaskStaleTimeoutMs and readResponseTextWithTaskActivity.

Important Files Changed

Filename Overview
src/app/v1/_lib/proxy/response-handler.ts Core of the PR: introduces BoundedStreamTextAccumulator (head+tail byte-level window), readResponseTextWithTaskActivity (touch-based keepalive for non-stream reads), bindTaskAbortToUpstreamResponse (stale-cleanup-to-upstream-abort chain), and removes direct AsyncTaskManager.cleanup() calls from finally blocks in favour of the new identity-checked cleanup inside register(). Two gaps remain: resolveStreamTaskStaleTimeoutMs returns Infinity for streamingIdleTimeoutMs-only configs, and non-streaming path body accumulation is still unbounded.
src/lib/async-task-manager.ts Adds lastActivityAt tracking and touch(), makes cleanup() private with identity-check guard to prevent old task finalizers from deleting newer tasks with the same ID, and ensures cleanupCompletedTasks() removes the task from the Map (breaking the strong-reference chain) in addition to calling abort(). Logic is sound and well-tested.
src/lib/langfuse/emit-proxy-trace.ts buildLangfuseSessionSnapshot and truncateResponseTextForLangfuse are extracted before the dynamic import, breaking the closure over the full ProxySession and large response body. The lightweight message preview (buildRequestMessagePreview) is consistent with the updated buildRequestBodySummary in trace-proxy-request.ts.
src/lib/langfuse/trace-proxy-request.ts buildRequestBodySummary now reads hasSystemPrompt and toolsCount from the preview scalar fields first, falling back to the raw array fields. Fixes the previously-flagged corruption where the preview object fields were ignored. tryParseJsonSafe gains a 1 MiB guard to avoid parsing huge strings.
tests/unit/lib/async-task-manager-edge-runtime.test.ts New tests cover: old-task finalizer not deleting the replacement task, touch() preventing false stale cleanup, stale cleanup aborting and detaching a provided controller, and cleanupAll removing tasks from the Map.
tests/unit/proxy/response-handler-client-abort-drain.test.ts Comprehensive new coverage: BoundedStreamTextAccumulator copy-semantics on Buffer-backed data, staleTimeoutMs=Infinity propagation for disabled timeouts, oversized stream bounded accounting, UTF-8 boundary correctness across head/tail split, and tail-chunk eviction during split completion event.
tests/unit/langfuse/langfuse-trace.test.ts Adds a test verifying that requestSummary fields (hasSystemPrompt, toolsCount) are correctly preserved when the session message is a lightweight Langfuse preview object.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant RH as ResponseHandler
    participant ATM as AsyncTaskManager
    participant ACC as BoundedStreamTextAccumulator
    participant RC as session.responseController

    RH->>ATM: "register(taskId, promise, {abortController, staleTimeoutMs})"
    Note over ATM: stores {abortController, lastActivityAt, staleTimeoutMs}
    RH->>RC: bindTaskAbortToUpstreamResponse()

    loop Per upstream chunk
        RH->>ACC: pushBytes(chunk)
        RH->>ATM: touch(taskId) updates lastActivityAt
    end

    alt Normal completion
        ACC->>RH: finish() BoundedStreamTextSnapshot
        ATM->>ATM: promise.finally cleanup(taskId, taskInfo)
        Note over ATM: removes entry from Map
    else Stale cleanup fires
        ATM->>ATM: cancel(taskId) abortController.abort()
        ATM->>ATM: cleanup(taskId, taskInfo) removes from Map
        ATM-->>RC: abort propagated via bindTaskAbortToUpstreamResponse
        RC-->>RH: pending reader.read() throws AbortError
    end
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant RH as ResponseHandler
    participant ATM as AsyncTaskManager
    participant ACC as BoundedStreamTextAccumulator
    participant RC as session.responseController

    RH->>ATM: "register(taskId, promise, {abortController, staleTimeoutMs})"
    Note over ATM: stores {abortController, lastActivityAt, staleTimeoutMs}
    RH->>RC: bindTaskAbortToUpstreamResponse()

    loop Per upstream chunk
        RH->>ACC: pushBytes(chunk)
        RH->>ATM: touch(taskId) updates lastActivityAt
    end

    alt Normal completion
        ACC->>RH: finish() BoundedStreamTextSnapshot
        ATM->>ATM: promise.finally cleanup(taskId, taskInfo)
        Note over ATM: removes entry from Map
    else Stale cleanup fires
        ATM->>ATM: cancel(taskId) abortController.abort()
        ATM->>ATM: cleanup(taskId, taskInfo) removes from Map
        ATM-->>RC: abort propagated via bindTaskAbortToUpstreamResponse
        RC-->>RH: pending reader.read() throws AbortError
    end
Loading

Reviews (7): Last reviewed commit: "fix: 对齐响应任务 stale 超时语义" | Re-trigger Greptile

@coderabbitai

coderabbitai Bot commented Jun 22, 2026

Copy link
Copy Markdown

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

ProxyResponseHandler 中引入 BoundedStreamTextAccumulator 将 Gemini 流式透传与通用 SSE 流式路径的无界字符串缓冲替换为头尾有界累积;新增 bindTaskAbortToUpstreamResponse 将后台任务 abort 与上游 responseController 联动;AsyncTaskManager 加入任务级超时、实例一致性校验和 touch() 方法防止误删;Langfuse trace 路径增加响应文本截断与会话快照固化防护。

Changes

有界流式统计、任务生命周期安全与 Langfuse 载荷防护

Layer / File(s) Summary
AsyncTaskManager 任务模型与实例一致性清理
src/lib/async-task-manager.ts, tests/unit/lib/async-task-manager-edge-runtime.test.ts
TaskInfo 新增 lastActivityAtstaleTimeoutMs 字段;register 参数扩展为 taskTypeOrOptions 联合类型,同 taskId 冲突时先取消旧任务再执行实例级 cleanup;新增 touch(taskId) 更新活动时间;cleanupCompletedTasks 改用 idleAge = now - lastActivityAt 与任务级超时判定;cleanupAll 迭代 entries 快照执行一致性清理。测试新增同 taskId 竞态隔离、touch 防空闲清理、外部 AbortController abort 绑定与活动任务计数断言。
响应处理基础:有界累积器与 abort 绑定框架
src/app/v1/_lib/proxy/response-handler.ts
新增 BoundedStreamTextAccumulatorBoundedStreamTextSnapshot 按固定字节上限保留头尾窗口并在溢出时标记 truncated;新增 bindTaskAbortToUpstreamResponse() 将后台任务 abort 与上游 responseController 联动返回清理函数;新增 readResponseTextWithTaskActivity() 分块读取解码并定期触发 AsyncTaskManager.touch()
Gemini 非流式与通用非流式后台的 abort 绑定
src/app/v1/_lib/proxy/response-handler.ts
Gemini 非流式 stats 与通用非流式后台处理任务新增 statsAbortController 并调用 bindTaskAbortToUpstreamResponse() 进行上游绑定;使用 readResponseTextWithTaskActivity() 分块读取并刷新活动时间;persistNonStreamAfterSnapshot() 改为接收外部已读取的 body 避免额外 clone;新增 finalResponseBodyForSnapshot 缓存复用转换后的 JSON;finally 中调用 cleanupTaskAbortBinding() 移除 abort 监听;AsyncTaskManager.register 改为对象式入参 {taskType, abortController, staleTimeoutMs}
Gemini 流式透传:有界累积与截断策略
src/app/v1/_lib/proxy/response-handler.ts, tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts
引入 BoundedStreamTextAccumulator 替换原 decoder+head/tail 无界缓冲;读取循环改为 pushBytes() 并每次触发 AsyncTaskManager.touch(taskId);idle timeout 与 trace/log 改用 getCollectedChunkCount() 新统计口径;flush 改为调用 accumulator.finish() 产出快照;当 truncated=true 时改为不写 Redis 并记录体积告警;finally 补充 cleanupTaskAbortBinding() 与对象式任务注册。测试新增禁用 idle timeout 与超大单 chunk 计费场景。
通用 SSE 流式处理:累积器替换与统计口径统一
src/app/v1/_lib/proxy/response-handler.ts, tests/unit/proxy/response-handler-client-abort-drain.test.ts
引入 BoundedStreamTextAccumulatorgetCollectedChunkCount() 替换原 sseEventCount/chunksCollected 混合统计;tee 分支新增 cleanupTaskAbortBinding()flushAndJoin() 改为调用 accumulator.finish() 产出快照;finalizeStreamstreamSnapshot?.truncated 决定是否写 Redis,截断时记录 body too large 告警;所有异常/超时/中断分支日志统计切换为新口径;finally 补充 abort 绑定清理与对象式任务注册。测试新增 oversized、UTF-8 head/tail split、terminal event split-tail 边界场景验证。
Langfuse 大文本预览与会话快照固化防护
src/lib/langfuse/trace-proxy-request.ts, src/lib/langfuse/emit-proxy-trace.ts
trace-proxy-request.ts 新增 LANGFUSE_JSON_PARSE_MAX_CHARSLANGFUSE_TEXT_PREVIEW_EDGE_CHARS 常量、buildLargeTextPreview() 为超限文本生成结构化预览、修改 tryParseJsonSafe() 在超限时返回预览而非执行 JSON.parse;emit-proxy-trace.ts 新增 truncateResponseTextForLangfuse()buildRequestMessagePreview()buildLangfuseSessionSnapshot() 三个内部函数;动态 import 前先截断文本与固化快照以避免强引用大对象。
测试补齐:并发隔离与边界场景
tests/unit/proxy/response-handler-*.test.ts, tests/unit/langfuse/langfuse-trace.test.ts
response-handler 测试新增 createOversizedResponsesSse()createUtf8SplitHeadTailResponsesSse()createSplitTailBoundaryResponsesSse() SSE 构造器与三条用例验证超大流、UTF-8 连续性、尾部切分下的截断标记与计费;分块非流式读取验证 touch() 刷新频率与快照持久化;多个测试文件补齐 AsyncTaskManager.touch() mock;Langfuse 测试验证轻量预览下请求摘要字段保留。

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

  • ding113/claude-code-hub#759:同样修改 response-handler.ts 的 Gemini 流式透传统计管线的缓冲与超时/abort/finalization 逻辑,与本 PR 直接共享代码路径。
  • ding113/claude-code-hub#1113:同样重构 ProxyResponseHandler 流式/后台任务的 abort/cleanup 生命周期与上游响应控制器绑定,与本 PR 在 abort 处理逻辑上直接重叠。
  • ding113/claude-code-hub#1277:同样修改 response-handler.ts,将客户端 abort/超时行为与上游取消联动(本 PR 新增 abort-controller 绑定;该 PR 限制 drain 窗口),两者在响应流程上直接相关。

Suggested reviewers

  • ding113
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 12.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description check ✅ Passed PR 描述详细阐述了五个内存泄漏根因(AsyncTaskManager 清理、ResponseHandler 后台任务、流式统计缓冲、Langfuse 异步观测、Gemini 流式透传)及对应的修复方案,内容与大量代码改动完全相关。
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Title check ✅ Passed 标题准确反映了 PR 的主要目标——修复内存泄漏根因,与详细的改动内容高度相关且具体明确。

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added bug Something isn't working area:provider labels Jun 22, 2026
@coderabbitai coderabbitai Bot requested a review from ding113 June 22, 2026 13:56

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces memory-bounding mechanisms for stream processing and Langfuse tracing to prevent OOM and DoS risks. Key changes include the implementation of a BoundedStreamTextAccumulator to retain only head and tail chunks of large streams, binding task aborts to upstream responses, and truncating large payloads before sending them to Langfuse. Feedback on the changes highlights a bug in buildRequestMessagePreview where omitting the system and tools fields causes downstream trace summaries to incorrectly report missing system prompts and zero tool counts, which can be resolved using the provided code suggestion.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +33 to +47
function buildRequestMessagePreview(message: Record<string, unknown>): Record<string, unknown> {
return {
truncatedForLangfuse: true,
model: typeof message.model === "string" ? message.model : undefined,
stream: typeof message.stream === "boolean" ? message.stream : undefined,
max_tokens: typeof message.max_tokens === "number" ? message.max_tokens : undefined,
temperature: typeof message.temperature === "number" ? message.temperature : undefined,
messageCount: Array.isArray(message.messages) ? message.messages.length : undefined,
contentsCount: Array.isArray(message.contents) ? message.contents.length : undefined,
toolsCount: Array.isArray(message.tools) ? message.tools.length : undefined,
hasSystemPrompt:
(Array.isArray(message.system) && message.system.length > 0) ||
(typeof message.system === "string" && message.system.length > 0),
};
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The newly introduced buildRequestMessagePreview function truncates the request message to avoid holding large objects in memory. However, it completely omits the system and tools fields, only providing hasSystemPrompt and toolsCount.

This causes a silent bug in buildRequestBodySummary (inside trace-proxy-request.ts), which directly accesses msg.system and msg.tools to compute hasSystemPrompt and toolsCount. Because these fields are missing on the preview object, hasSystemPrompt will always evaluate to false and toolsCount will always evaluate to 0 in Langfuse traces for all snapshotted sessions.

To fix this without modifying trace-proxy-request.ts (which is outside the active diff hunk), we can return dummy arrays for system and tools in buildRequestMessagePreview that match the expected checks (Array.isArray and .length).

function buildRequestMessagePreview(message: Record<string, unknown>): Record<string, unknown> {
  if (!message) {
    return { truncatedForLangfuse: true };
  }

  const hasSystemPrompt =
    (Array.isArray(message.system) && message.system.length > 0) ||
    (typeof message.system === "string" && message.system.length > 0);
  const toolsCount = Array.isArray(message.tools) ? message.tools.length : 0;

  return {
    truncatedForLangfuse: true,
    model: typeof message.model === "string" ? message.model : undefined,
    stream: typeof message.stream === "boolean" ? message.stream : undefined,
    max_tokens: typeof message.max_tokens === "number" ? message.max_tokens : undefined,
    temperature: typeof message.temperature === "number" ? message.temperature : undefined,
    messageCount: Array.isArray(message.messages) ? message.messages.length : undefined,
    contentsCount: Array.isArray(message.contents) ? message.contents.length : undefined,
    system: hasSystemPrompt ? [null] : [],
    tools: new Array(toolsCount).fill(null),
  };
}

@github-actions github-actions Bot added the size/L Large PR (< 1000 lines) label Jun 22, 2026

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e065e2a5e9

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +3214 to +3217
AsyncTaskManager.register(taskId, processingPromise, {
taskType: "stream-processing",
abortController,
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid timing out active streams after ten minutes

Registering the stream task with the live abortController but no staleTimeoutMs leaves it on AsyncTaskManager's default 10-minute wall-clock timeout; the periodic cleanup will call cancel(), and the new abort binding then aborts the upstream response. A legitimate long-running SSE generation with continuous chunks (not idle) will therefore be cut off and finalized as a timeout once it crosses roughly 10 minutes. Use an infinite/stream-specific stale timeout or base staleness on lack of activity rather than task age.

Useful? React with 👍 / 👎.

@github-actions github-actions Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

This PR addresses memory leak issues in the proxy response handling pipeline. The changes introduce bounded stream accumulation, proper task cleanup with strong reference detachment, and abort controller chaining to upstream responses. All changes are intentional and well-tested.

PR Size: L

  • Lines changed: 835 (+665, -170)
  • Files changed: 7

Issues Found

Category Critical High Medium Low
Logic/Bugs 0 0 0 0
Security 0 0 0 0
Error Handling 0 0 0 0
Types 0 0 0 0
Comments/Docs 0 0 0 0
Tests 0 0 0 0
Simplification 0 0 0 0

Review Coverage

  • Logic and correctness - Clean
  • Security (OWASP Top 10) - Clean
  • Error handling - All errors logged, no silent failures
  • Type safety - Type assertions are intentional with proper null safety
  • Documentation accuracy - Comments accurately describe the design
  • Test coverage - Good coverage of new functionality including edge cases
  • Code clarity - Well-structured, complex algorithms properly explained

Key Changes Verified

  1. AsyncTaskManager: Fixed stale cleanup to properly detach from Map and avoid old task finalizers removing new tasks with same taskId
  2. BoundedStreamTextAccumulator: Well-designed bounded buffer (1MB head + 9MB tail) prevents unbounded memory growth
  3. AbortController chaining: properly propagates abort signals to upstream response readers
  4. Langfuse truncation: Response/request body truncation happens before async import to prevent closure-captured references
  5. Tests: Comprehensive new tests validate task cleanup behavior and large response handling

Minor Notes (No Action Required)

  • Type assertion as unknown as ProxySession in buildLangfuseSessionSnapshot is intentional - all methods used by traceProxyRequest are properly stubbed
  • Null safety checks in place for session.forwardedRequestBody and session.request.log

Automated review by Claude AI

@github-actions github-actions Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

This PR addresses memory leak issues in the proxy response handling pipeline. The changes introduce bounded stream accumulation, proper task cleanup with strong reference detachment, and abort controller chaining to upstream responses. All changes are intentional and well-tested.

PR Size: L

  • Lines changed: 835 (+665, -170)
  • Files changed: 7

Issues Found

Category Critical High Medium Low
Logic/Bugs 0 0 0 0
Security 0 0 0 0
Error Handling 0 0 0 0
Types 0 0 0 0
Comments/Docs 0 0 0 0
Tests 0 0 0 0
Simplification 0 0 0 0

Review Coverage

  • Logic and correctness - Clean
  • Security (OWASP Top 10) - Clean
  • Error handling - All errors logged, no silent failures
  • Type safety - Type assertions are intentional with proper null safety
  • Documentation accuracy - Comments accurately describe the design
  • Test coverage - Good coverage of new functionality including edge cases
  • Code clarity - Well-structured, complex algorithms properly explained

Key Changes Verified

  1. AsyncTaskManager: Fixed stale cleanup to properly detach from Map and avoid old task finalizers removing new tasks with same taskId
  2. BoundedStreamTextAccumulator: Well-designed bounded buffer (1MB head + 9MB tail) prevents unbounded memory growth
  3. AbortController chaining: Properly propagates abort signals to upstream response readers
  4. Langfuse truncation: Response/request body truncation happens before async import to prevent closure-captured references
  5. Tests: Comprehensive new tests validate task cleanup behavior and large response handling

Minor Notes (No Action Required)

  • Type assertion in buildLangfuseSessionSnapshot is intentional - all methods used by traceProxyRequest are properly stubbed
  • Null safety checks in place for optional session properties

Automated review by Claude AI

Comment on lines +66 to +95
return {
startTime: session.startTime,
method: session.method,
headers: new Headers(session.headers),
request: {
message: requestMessage,
log: truncateResponseTextForLangfuse(session.request.log ?? ""),
note: session.request.note,
model: session.request.model,
imageRequestMetadata: null,
},
userAgent: session.userAgent,
provider: session.provider,
messageContext: session.messageContext,
ttfbMs: session.ttfbMs,
forwardStartTime: session.forwardStartTime,
forwardedRequestBody,
sessionId: session.sessionId,
originalFormat: session.originalFormat,
getMessagesLength: () => messagesLength,
getEndpoint: () => endpoint,
getCurrentModel: () => currentModel,
getProviderChain: () => providerChain,
getRequestSequence: () => requestSequence,
getOriginalModel: () => originalModel,
isModelRedirected: () => modelRedirected,
getSpecialSettings: () => specialSettings,
getCacheTtlResolved: () => cacheTtlResolved,
getContext1mApplied: () => context1mApplied,
} as unknown as ProxySession;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 buildRequestBodySummary reads wrong keys from the snapshot object

buildLangfuseSessionSnapshot replaces session.request.message with the output of buildRequestMessagePreview, which stores hasSystemPrompt: bool and toolsCount: number as flat scalar fields. But buildRequestBodySummary in trace-proxy-request.ts reads msg.system (an array) to compute hasSystemPrompt and msg.tools (an array) to compute toolsCount. Because the preview object never sets system or tools, every Langfuse trace will show hasSystemPrompt: false and toolsCount: 0 regardless of what the original request contained, silently corrupting the requestSummary metadata for all requests after this change ships.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/lib/langfuse/emit-proxy-trace.ts
Line: 66-95

Comment:
**`buildRequestBodySummary` reads wrong keys from the snapshot object**

`buildLangfuseSessionSnapshot` replaces `session.request.message` with the output of `buildRequestMessagePreview`, which stores `hasSystemPrompt: bool` and `toolsCount: number` as flat scalar fields. But `buildRequestBodySummary` in `trace-proxy-request.ts` reads `msg.system` (an array) to compute `hasSystemPrompt` and `msg.tools` (an array) to compute `toolsCount`. Because the preview object never sets `system` or `tools`, every Langfuse trace will show `hasSystemPrompt: false` and `toolsCount: 0` regardless of what the original request contained, silently corrupting the `requestSummary` metadata for all requests after this change ships.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +71 to +84
chunkCount: number;
};

// 流式统计只需要头部元信息和尾部 usage/final event。按字节保存窗口,避免
// string[] 无界增长,也避免 subarray 持有超大原始 ArrayBuffer。
class BoundedStreamTextAccumulator {
private readonly headChunks: Uint8Array[] = [];
private readonly tailChunks: Uint8Array[] = [];
private readonly tailChunkBytes: number[] = [];
private headBufferedBytes = 0;
private tailBufferedBytes = 0;
private tailHead = 0;
private tailMode = false;
private truncated = false;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Unreachable else branch in pushBytes

Inside the guard if (!this.tailMode && this.headBufferedBytes < STREAM_STATS_HEAD_BYTES), remainingHeadBytes is computed as STREAM_STATS_HEAD_BYTES - this.headBufferedBytes. Because the outer condition already guarantees headBufferedBytes < STREAM_STATS_HEAD_BYTES, remainingHeadBytes is always strictly greater than 0, so the else { this.tailMode = true; this.pushTailBytes(value); } branch can never be reached. The code is harmless, but dead code in a performance-sensitive accumulator is misleading.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 71-84

Comment:
**Unreachable `else` branch in `pushBytes`**

Inside the guard `if (!this.tailMode && this.headBufferedBytes < STREAM_STATS_HEAD_BYTES)`, `remainingHeadBytes` is computed as `STREAM_STATS_HEAD_BYTES - this.headBufferedBytes`. Because the outer condition already guarantees `headBufferedBytes < STREAM_STATS_HEAD_BYTES`, `remainingHeadBytes` is always strictly greater than 0, so the `else { this.tailMode = true; this.pushTailBytes(value); }` branch can never be reached. The code is harmless, but dead code in a performance-sensitive accumulator is misleading.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/lib/async-task-manager.ts (1)

225-236: 🧹 Nitpick | 🔵 Trivial | 💤 Low value

可复用已定义的常量以消除重复。

第 220 行的 staleThreshold 与第 33 行的 DEFAULT_STALE_TASK_TIMEOUT_MS 值相同,可直接复用常量。

♻️ 建议的改动
   private cleanupCompletedTasks(): void {
     const now = Date.now();
-    const staleThreshold = 10 * 60 * 1000; // 10 分钟

     for (const [taskId, taskInfo] of this.tasks.entries()) {
       const age = now - taskInfo.createdAt;

-      const staleTimeoutMs = taskInfo.staleTimeoutMs || staleThreshold;
+      const staleTimeoutMs = taskInfo.staleTimeoutMs || DEFAULT_STALE_TASK_TIMEOUT_MS;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/lib/async-task-manager.ts` around lines 225 - 236, The variable
staleThreshold is being used as a fallback value in the assignment at line 226,
but it duplicates the constant DEFAULT_STALE_TASK_TIMEOUT_MS that is already
defined. Replace all occurrences of staleThreshold with the existing constant
DEFAULT_STALE_TASK_TIMEOUT_MS to eliminate the duplication. Update the
expression that uses staleThreshold as the fallback value in the staleTimeoutMs
assignment to use DEFAULT_STALE_TASK_TIMEOUT_MS instead.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/lib/async-task-manager.ts`:
- Around line 225-236: The variable staleThreshold is being used as a fallback
value in the assignment at line 226, but it duplicates the constant
DEFAULT_STALE_TASK_TIMEOUT_MS that is already defined. Replace all occurrences
of staleThreshold with the existing constant DEFAULT_STALE_TASK_TIMEOUT_MS to
eliminate the duplication. Update the expression that uses staleThreshold as the
fallback value in the staleTimeoutMs assignment to use
DEFAULT_STALE_TASK_TIMEOUT_MS instead.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: fab7ff45-d099-4368-aaa2-243cb53ce1ed

📥 Commits

Reviewing files that changed from the base of the PR and between f96d00f and e065e2a.

📒 Files selected for processing (7)
  • src/app/v1/_lib/proxy/response-handler.ts
  • src/lib/async-task-manager.ts
  • src/lib/langfuse/emit-proxy-trace.ts
  • src/lib/langfuse/trace-proxy-request.ts
  • tests/unit/lib/async-task-manager-edge-runtime.test.ts
  • tests/unit/proxy/response-handler-client-abort-drain.test.ts
  • tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts

@tesgth032

Copy link
Copy Markdown
Contributor Author

本轮根据深挖和自动审查反馈做了增量修正:

  • AsyncTaskManager 的 stale 判定从创建时间改为 lastActivityAt,并在两条真实流式读取循环收到 upstream chunk 时 touch(taskId)。这样仍能清理长期无进展的挂死后台任务,但不会误杀持续输出的合法长 SSE 流。
  • 修正 Langfuse 轻量 session snapshot 的摘要读取:traceProxyRequest 现在识别 preview 中的 hasSystemPrompt/toolsCount,避免为了降内存而丢失 metadata。
  • 移除了流式 bounded accumulator 里的不可达分支,并同步补齐相关测试 mock。

本地验证:

  • bunx vitest run tests/unit/lib/async-task-manager-edge-runtime.test.ts tests/unit/langfuse/langfuse-trace.test.ts tests/unit/proxy/response-handler-client-abort-drain.test.ts tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts tests/unit/proxy/response-handler-lease-decrement.test.ts tests/unit/proxy/response-handler-non200.test.ts:91 passed
  • bun run typecheck:通过
  • bunx biome check ...changed files...:通过
  • bun run test:727 passed, 2 skipped;6523 passed, 13 skipped
  • bun run build:通过(仅保留仓库已有 Edge Runtime warning)

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 92bb359b2b

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

statusCode: data.statusCode,
isStreaming: data.isStreaming,
responseText: data.responseText,
responseText,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Stop capturing large Langfuse trace inputs

When Langfuse is enabled for a large response, this passes the truncated responseText, but the async callback still closes over the original data object via the adjacent data.* reads; that object still contains the full data.responseText, so the full response stays strongly reachable until the dynamic import and trace submission finish. Destructure the non-large fields before the import() and avoid capturing data so the truncation actually releases the original payload.

Useful? React with 👍 / 👎.

Comment on lines +179 to +184
while (
this.tailBufferedBytes > STREAM_STATS_TAIL_BYTES &&
this.tailHead < this.tailChunkBytes.length
) {
this.tailBufferedBytes -= this.tailChunkBytes[this.tailHead] ?? 0;
this.tailChunks[this.tailHead] = new Uint8Array();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve the full byte tail when evicting chunks

For long streams delivered in uneven large chunks, this evicts the entire oldest chunk whenever the tail is just over the byte limit, so the retained text can shrink by several MB even though most of that chunk is still within the intended 9 MB tail window. If the terminal SSE usage/completion event is split across that boundary, parseUsageFromResponseText can miss the usage and final accounting/debug finalization loses tokens; trim only the overflow bytes from the oldest chunk instead of dropping it wholesale.

Useful? React with 👍 / 👎.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/lib/async-task-manager.ts (1)

210-213: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

cleanup 的可选实例参数会留下“误删新任务”的竞态窗口

expectedTask 目前是可选的,导致 cleanup(taskId) 仍可无条件删除当前 Map 条目。结合 src/app/v1/_lib/proxy/response-handler.ts(Line 1934-1946)里仍存在的 AsyncTaskManager.cleanup(taskId),旧任务的 finally 仍可能删掉同 taskId 的新任务,造成新任务失去跟踪(后续 cancel/超时清理失效)。

建议将实例校验改为强制(expectedTask 必填),或拆分成仅内部可用的强制清理入口并迁移调用方,避免绕过实例一致性保护。

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/lib/async-task-manager.ts` around lines 210 - 213, The cleanup method in
async-task-manager.ts has a race condition because expectedTask is optional,
allowing cleanup(taskId) to unconditionally delete any task with that ID, which
can cause a new task with the same taskId to be deleted by an old task's finally
block. Make the expectedTask parameter mandatory (remove the question mark
making it required) in the cleanup method signature to force all callers to
provide the task instance for verification. Also update the calling code in
response-handler.ts (around lines 1934-1946) where cleanup is called to pass the
actual task instance as the second argument instead of calling it with only
taskId.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@src/lib/async-task-manager.ts`:
- Around line 210-213: The cleanup method in async-task-manager.ts has a race
condition because expectedTask is optional, allowing cleanup(taskId) to
unconditionally delete any task with that ID, which can cause a new task with
the same taskId to be deleted by an old task's finally block. Make the
expectedTask parameter mandatory (remove the question mark making it required)
in the cleanup method signature to force all callers to provide the task
instance for verification. Also update the calling code in response-handler.ts
(around lines 1934-1946) where cleanup is called to pass the actual task
instance as the second argument instead of calling it with only taskId.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a70fbad5-a33d-42ff-8542-6749140be6d4

📥 Commits

Reviewing files that changed from the base of the PR and between e065e2a and 92bb359.

📒 Files selected for processing (13)
  • src/app/v1/_lib/proxy/response-handler.ts
  • src/lib/async-task-manager.ts
  • src/lib/langfuse/trace-proxy-request.ts
  • tests/unit/langfuse/langfuse-trace.test.ts
  • tests/unit/lib/async-task-manager-edge-runtime.test.ts
  • tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts
  • tests/unit/proxy/response-handler-bill-non-success.test.ts
  • tests/unit/proxy/response-handler-client-abort-drain.test.ts
  • tests/unit/proxy/response-handler-endpoint-circuit-isolation.test.ts
  • tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts
  • tests/unit/proxy/response-handler-hedge-loser-priority.test.ts
  • tests/unit/proxy/response-handler-lease-decrement.test.ts
  • tests/unit/proxy/response-handler-non200.test.ts
✅ Files skipped from review due to trivial changes (1)
  • tests/unit/proxy/response-handler-non200.test.ts
🚧 Files skipped from review as they are similar to previous changes (4)
  • tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts
  • src/lib/langfuse/trace-proxy-request.ts
  • tests/unit/proxy/response-handler-client-abort-drain.test.ts
  • src/app/v1/_lib/proxy/response-handler.ts

@tesgth032

Copy link
Copy Markdown
Contributor Author

本轮继续深挖内存泄漏链路后,补了 3 个更深层的风险点:

  1. AsyncTaskManager.cleanup 改为私有且强制传入注册时的 TaskInfo 实例,彻底避免旧任务 finally 或外部调用误删同 taskId 的新任务。
  2. Langfuse fire-and-forget 路径在动态 import() 前只保留截断响应和轻量字段,异步回调不再闭包捕获原始 data 对象,避免大响应被 SDK 加载/发送链路强引用。
  3. 流式响应尾部窗口改为按溢出字节精确裁剪最老 chunk,而不是整块丢弃;新增测试覆盖终止 usage 事件跨 tail eviction 边界拆分时仍能被解析。

验证:

  • bunx vitest run tests/unit/lib/async-task-manager-edge-runtime.test.ts tests/unit/proxy/response-handler-client-abort-drain.test.ts tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts tests/unit/langfuse/langfuse-trace.test.ts
    • 4 个文件通过,72 个测试通过
  • bun run typecheck
  • bunx biome check src/lib/async-task-manager.ts src/app/v1/_lib/proxy/response-handler.ts src/lib/langfuse/emit-proxy-trace.ts src/lib/langfuse/trace-proxy-request.ts tests/unit/proxy/response-handler-client-abort-drain.test.ts
  • git diff --check
  • bun run test
    • 727 个文件通过,2 个跳过;6524 个测试通过,13 个跳过
  • bun run build

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/lib/async-task-manager.ts (1)

225-234: 🧹 Nitpick | 🔵 Trivial | 💤 Low value

文档注释与实际逻辑不一致

注释描述的是旧的清理逻辑(基于 age 的固定 10 分钟超时),但代码已改为:

  1. 使用 idleAge(距上次活动时间)而非总存活时间
  2. 超时阈值来自 taskInfo.staleTimeoutMs,可任务级配置

建议更新注释以反映当前行为:

建议的文档更新
  /**
   * 检查并清理超时任务
   *
-  * 遍历所有活跃任务,对于超过 10 分钟还未完成的任务:
+  * 遍历所有活跃任务,对于空闲时间超过 staleTimeoutMs 的任务:
   * 1. 记录警告日志
   * 2. 触发 AbortController 取消任务
   * 3. 从任务 Map 中移除
   *
-  * ⚠️ 注意:这不是清理"已完成"的任务,而是清理"超时未完成"的任务
+  * ⚠️ 注意:这不是清理"已完成"的任务,而是清理"空闲超时"的任务。
+  * 活跃流应定期调用 touch() 更新 lastActivityAt 以避免被误判为挂死。
   */
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/lib/async-task-manager.ts` around lines 225 - 234, Update the JSDoc
comment for the task cleanup function to accurately reflect the current
implementation. Replace the reference to "超过 10 分钟还未完成的任务" (tasks not completed
in over 10 minutes) with a description that specifies it uses idleAge (time
since last activity) instead of total task age, and that the timeout threshold
is determined by the configurable taskInfo.staleTimeoutMs property per task
rather than a fixed 10-minute duration. Ensure the updated comment explains that
tasks are cleaned up when they have been idle (inactive) beyond their configured
staleTimeoutMs value.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/lib/async-task-manager.ts`:
- Around line 225-234: Update the JSDoc comment for the task cleanup function to
accurately reflect the current implementation. Replace the reference to "超过 10
分钟还未完成的任务" (tasks not completed in over 10 minutes) with a description that
specifies it uses idleAge (time since last activity) instead of total task age,
and that the timeout threshold is determined by the configurable
taskInfo.staleTimeoutMs property per task rather than a fixed 10-minute
duration. Ensure the updated comment explains that tasks are cleaned up when
they have been idle (inactive) beyond their configured staleTimeoutMs value.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b61f46ed-e80a-4b6a-b4b4-11f4ac71d9d5

📥 Commits

Reviewing files that changed from the base of the PR and between 92bb359 and 0edd582.

📒 Files selected for processing (4)
  • src/app/v1/_lib/proxy/response-handler.ts
  • src/lib/async-task-manager.ts
  • src/lib/langfuse/emit-proxy-trace.ts
  • tests/unit/proxy/response-handler-client-abort-drain.test.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/lib/langfuse/emit-proxy-trace.ts
  • src/app/v1/_lib/proxy/response-handler.ts

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0edd5828ca

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +138 to +139
const headText = this.decodeChunks(this.headChunks, 0, this.headBufferedBytes);
const tailText = this.decodeChunks(this.tailChunks, this.tailHead, this.tailBufferedBytes);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Decode contiguous UTF-8 windows as one buffer

For any stream that crosses the 1 MiB head/tail split without being truncated, a multibyte UTF-8 character can land across this boundary. Decoding headChunks and tailChunks with separate TextDecoder instances turns that one character into replacement characters even though the snapshot is marked complete, so the response persisted to session debug storage and sent to Langfuse no longer matches the upstream body. Decode the combined head+tail bytes when truncated is false, or carry decoder state across the split.

Useful? React with 👍 / 👎.

@tesgth032

Copy link
Copy Markdown
Contributor Author

继续处理最新自动审查发现的有效问题:

  1. BoundedStreamTextAccumulator.finish() 在未截断但已经跨过 head/tail 分界时,改为将保留的连续 byte window 一次性解码,避免 UTF-8 多字节字符刚好跨 1MiB 分界时被分别解码成替换字符。
  2. 截断路径仍保持 head + [cch_truncated] + tail 语义,不把非连续 byte window 伪装成完整正文。
  3. 更新 AsyncTaskManager 清理注释,明确当前 stale 判定基于 lastActivityAt/staleTimeoutMs,不是固定 wall-clock age。
  4. 新增回归测试:构造单 chunk SSE,使字符 的 UTF-8 字节跨 head/tail 分界,断言 trace responseText 保留原字符、不包含 U+FFFD、且不标记截断。

本地验证:

  • bunx vitest run tests/unit/proxy/response-handler-client-abort-drain.test.ts
    • 1 个文件通过,12 个测试通过
  • bunx vitest run tests/unit/lib/async-task-manager-edge-runtime.test.ts tests/unit/proxy/response-handler-client-abort-drain.test.ts tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts tests/unit/langfuse/langfuse-trace.test.ts
    • 4 个文件通过,73 个测试通过
  • bun run typecheck
  • bunx biome check src/lib/async-task-manager.ts src/app/v1/_lib/proxy/response-handler.ts tests/unit/proxy/response-handler-client-abort-drain.test.ts
  • git diff --check
  • bun run test
    • 727 个文件通过,2 个跳过;6525 个测试通过,13 个跳过
  • bun run build

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: bd0e3a27ac

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +1980 to +1982
AsyncTaskManager.register(taskId, processingPromise, {
taskType: "non-stream-processing",
abortController,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid aborting active non-stream downloads as stale

When a non-stream response body is still being downloaded for more than 10 minutes (for example with the default requestTimeoutNonStreamingMs = 0 or a very large slow body), this task sits inside responseForLog.text() and never calls AsyncTaskManager.touch(). Registering the live abortController with the manager's default stale timeout lets cleanup abort the shared upstream responseController via bindTaskAbortToUpstreamResponse, cutting off the client even though bytes may still be flowing; the Gemini non-stream passthrough stats registration above has the same pattern. Touch while reading or set a stale timeout that matches the non-stream response timeout instead of the manager default.

Useful? React with 👍 / 👎.

@tesgth032

Copy link
Copy Markdown
Contributor Author

补充一轮深挖后又处理了同一根因链上的两个非流式路径,已推送提交 aa343361

这次补充修复:

  • 非流式后台读取不再直接 response.text():改为 reader 循环读取,并在每个真实 chunk 到达时 AsyncTaskManager.touch(taskId)。这样慢速但仍活跃的非流式响应不会被 10 分钟 idle stale cleanup 误判为挂死并中断。
  • 非流式 after 快照不再额外启动一个未纳入 AsyncTaskManagerclone().text() fire-and-forget 读取分支。普通非流式快照复用主后台任务已经读完的 responseText;Gemini 非透传转换则复用转换时已经生成的最终响应 body。这样移除了一个会强引用响应体/会话且没有取消生命周期的隐藏读流路径。
  • 回归测试覆盖了分块非流式响应:验证读取过程中会刷新 task activity,验证 after 快照仍能落库,并断言原响应只被克隆一次,防止以后把 after 快照又改回独立读流。

重新验证:

  • bunx vitest run tests/unit/proxy/response-handler-lease-decrement.test.ts 通过,8 tests。
  • bunx vitest run tests/unit/lib/async-task-manager-edge-runtime.test.ts tests/unit/proxy/response-handler-client-abort-drain.test.ts tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts tests/unit/proxy/response-handler-lease-decrement.test.ts tests/unit/langfuse/langfuse-trace.test.ts 通过,81 tests。
  • bun run typecheck 通过。
  • bunx biome check src/app/v1/_lib/proxy/response-handler.ts tests/unit/proxy/response-handler-lease-decrement.test.ts 通过。
  • git diff --check 通过。
  • bun run test 通过,727 files / 6526 tests。
  • bun run build 通过;仍有仓库既有的 Next workspace root 与 Edge Runtime 静态警告,本次未引入新的构建失败。

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: aa34336193

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

return;
}

this.headChunks.push(value.slice(0, remainingHeadBytes));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Copy Buffer chunks before retaining accumulator windows

When the upstream body comes through nodeStreamToWebStreamSafe, chunks can be Node Buffers because Buffer satisfies instanceof Uint8Array; on those chunks, slice() returns a view rather than copying. This stored “1 MiB” head slice (and the tail slices below) can therefore keep the entire large upstream chunk backing store alive until finalization, defeating the bounded 10 MiB accumulator for large single-chunk streams. Store an actual copy, e.g. from value.subarray(...), before retaining these windows.

Useful? React with 👍 / 👎.

@tesgth032

Copy link
Copy Markdown
Contributor Author

补充一轮更深的内存持有链修复与验证:

  • 修复流式统计累积器在 Node Buffer 输入下使用 slice() 保留视图的问题。nodeStreamToWebStreamSafe 会把 Buffer 原样作为 Uint8Array 透传,Buffer.slice() 不是拷贝而是视图,因此原来的 1MiB head / 9MiB tail 窗口可能继续持有整个大块 upstream backing store,破坏“统计窗口有界”的真实内存上限。
  • 新增 copyUint8Range(),所有跨 chunk 生命周期保存的 head/tail 窗口都改为 new Uint8Array(value.subarray(...)) 的真实字节副本;tail 局部裁剪后也继续保持这个不变量。
  • 补了回归测试:向 BoundedStreamTextAccumulator 推入超过窗口上限的 Node Buffer 后立即改写原始 Buffer,断言快照仍保留原始 head/tail marker。这个测试能直接抓住“保留视图而非拷贝”的问题。

额外复核:

  • nodeStreamToWebStreamSafe 保持原样透传 Buffer 是合理的,转换层不应为所有流量付全量拷贝成本;真正需要修的是跨请求生命周期保存窗口的累积器。
  • ResponseFixerChunkBuffermaxFixSize 上限且只短暂保存到可处理边界/flush,超过上限会透传,不属于同级的长生命周期 backing store 持有链。

验证:

  • bunx vitest run tests/unit/proxy/response-handler-client-abort-drain.test.ts
  • bunx vitest run tests/unit/proxy/response-handler-client-abort-drain.test.ts tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts tests/unit/proxy/response-handler-lease-decrement.test.ts tests/unit/lib/async-task-manager-edge-runtime.test.ts tests/unit/langfuse/langfuse-trace.test.ts
  • bunx vitest run src/app/v1/_lib/proxy/response-fixer/response-fixer.test.ts src/app/v1/_lib/proxy/node-stream-to-web.test.ts
  • bunx biome check src/app/v1/_lib/proxy/response-handler.ts tests/unit/proxy/response-handler-client-abort-drain.test.ts
  • bun run typecheck
  • git diff --check
  • bun run test:727 files passed,6527 tests passed,13 skipped
  • bun run build:通过;仅保留项目既有的 workspace root / Edge Runtime Node API 静态警告

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0cbef88260

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +3295 to +3298
AsyncTaskManager.register(taskId, processingPromise, {
taskType: "stream-processing",
abortController,
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Respect disabled stream idle timeouts

When provider.streamingIdleTimeoutMs <= 0, this stream path treats the idle watchdog as disabled (Infinity), but the task is still registered with the live abort controller and no staleTimeoutMs, so AsyncTaskManager applies its default 10-minute idle cleanup. Fresh evidence after the activity-based change is that touch() only runs when chunks arrive, so a legitimate SSE stream with a >10 minute silent gap after some output will be aborted by the generic cleanup despite the provider idle timeout being disabled; pass a stream-specific stale timeout or disable stale cleanup for this case.

Useful? React with 👍 / 👎.

@tesgth032

Copy link
Copy Markdown
Contributor Author

本轮继续深挖并处理了最新自动审查指出的一个真实问题:流式 idle timeout 被显式禁用时,AsyncTaskManager 的默认 stale cleanup 仍可能在 10 分钟空闲后中断合法长连接。

修正点:

  • response-handler 现在按供应商配置显式传入任务级 staleTimeoutMs,避免 AsyncTaskManager 默认值偷偷变成隐藏业务超时。
  • 流式任务:
    • streamingIdleTimeoutMs <= 0 时,stale cleanup 设为 Infinity,不会中断禁用 idle timeout 的长静默流。
    • idle timeout 启用且 first-byte timeout 启用时,stale 上限取二者最大值,避免任务清理抢在业务超时前触发。
    • idle timeout 启用但 first-byte timeout 禁用时,不用 stale cleanup 伪造首字节超时,保持“不限制首字节”的既有语义。
  • 非流式任务:
    • requestTimeoutNonStreamingMs <= 0 时同样设为 Infinity,避免默认 stale cleanup 改写“不限制总超时”的配置语义。
  • 补充回归测试覆盖普通流式、Gemini 流式透传、非流式三条路径,确保禁用超时不会回落到默认 stale cleanup。

已验证:

  • bunx biome check src/app/v1/_lib/proxy/response-handler.ts tests/unit/proxy/response-handler-client-abort-drain.test.ts tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts
  • bunx vitest run tests/unit/proxy/response-handler-client-abort-drain.test.ts tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts tests/unit/proxy/response-handler-lease-decrement.test.ts tests/unit/lib/async-task-manager-edge-runtime.test.ts
  • bun run typecheck
  • git diff --check
  • bun run test:727 passed / 2 skipped,6530 passed / 13 skipped
  • bun run build:通过,仍有仓库既有的 Next workspace root 与 Edge Runtime Node API 静态警告

@tesgth032

Copy link
Copy Markdown
Contributor Author

@coderabbitai review
@codex review

@coderabbitai

coderabbitai Bot commented Jun 22, 2026

Copy link
Copy Markdown

@tesgth032 I'll kick off a full review of the PR now.

✅ Action performed

Review finished.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@tesgth032 tesgth032 changed the title 修复 issue 1283 内存泄漏根因 [未完成] 修复 issue 1283 内存泄漏根因 Jun 22, 2026
@chatgpt-codex-connector

Copy link
Copy Markdown

Codex Review: Didn't find any major issues. Hooray!

Reviewed commit: a0ee9d8d88

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:provider bug Something isn't working size/L Large PR (< 1000 lines)

Projects

Status: Backlog

Development

Successfully merging this pull request may close these issues.

1 participant