From e065e2a5e978715bd5404c92868a83b2566f4d18 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Mon, 22 Jun 2026 21:52:15 +0800 Subject: [PATCH 1/7] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E5=93=8D=E5=BA=94=E5=86=85=E5=AD=98=E6=B3=84=E6=BC=8F=E6=A0=B9?= =?UTF-8?q?=E5=9B=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 474 ++++++++++++------ src/lib/async-task-manager.ts | 56 ++- src/lib/langfuse/emit-proxy-trace.ts | 87 +++- src/lib/langfuse/trace-proxy-request.ts | 16 + .../async-task-manager-edge-runtime.test.ts | 68 +++ ...esponse-handler-client-abort-drain.test.ts | 71 +++ ...gemini-stream-passthrough-timeouts.test.ts | 63 ++- 7 files changed, 665 insertions(+), 170 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index cad152770..547fab67b 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -57,6 +57,184 @@ import { } from "./stream-finalization"; const CLIENT_ABORT_DRAIN_MAX_MS = 60_000; +const STREAM_STATS_MAX_BUFFER_BYTES = 10 * 1024 * 1024; +const STREAM_STATS_HEAD_BYTES = 1024 * 1024; +const STREAM_STATS_TAIL_BYTES = STREAM_STATS_MAX_BUFFER_BYTES - STREAM_STATS_HEAD_BYTES; +const STREAM_STATS_TAIL_CHUNKS = 8192; +const STREAM_STATS_TRUNCATED_MARKER = "\n\n: [cch_truncated]\n\n"; + +type BoundedStreamTextSnapshot = { + text: string; + truncated: boolean; + totalBytes: number; + bufferedBytes: number; + chunkCount: number; +}; + +// 流式统计只需要头部元信息和尾部 usage/final event。按字节保存窗口,避免 +// string[] 无界增长,也避免 subarray 持有超大原始 ArrayBuffer。 +class BoundedStreamTextAccumulator { + private readonly headChunks: Uint8Array[] = []; + private readonly tailChunks: Uint8Array[] = []; + private readonly tailChunkBytes: number[] = []; + private headBufferedBytes = 0; + private tailBufferedBytes = 0; + private tailHead = 0; + private tailMode = false; + private truncated = false; + private totalBytes = 0; + private chunksSeen = 0; + private finishedSnapshot: BoundedStreamTextSnapshot | null = null; + + get chunkCount(): number { + return this.chunksSeen; + } + + get totalByteCount(): number { + return this.totalBytes; + } + + get bufferedByteCount(): number { + return this.headBufferedBytes + this.tailBufferedBytes; + } + + get isTruncated(): boolean { + return this.truncated; + } + + pushBytes(value: Uint8Array): void { + if (!value || value.byteLength === 0) { + return; + } + + this.finishedSnapshot = null; + this.chunksSeen += 1; + this.totalBytes += value.byteLength; + + if (!this.tailMode && this.headBufferedBytes < STREAM_STATS_HEAD_BYTES) { + const remainingHeadBytes = STREAM_STATS_HEAD_BYTES - this.headBufferedBytes; + if (value.byteLength <= remainingHeadBytes) { + this.headChunks.push(value.slice()); + this.headBufferedBytes += value.byteLength; + return; + } + + if (remainingHeadBytes > 0) { + this.headChunks.push(value.slice(0, remainingHeadBytes)); + this.headBufferedBytes += remainingHeadBytes; + this.tailMode = true; + this.pushTailBytes(value.subarray(remainingHeadBytes)); + } else { + this.tailMode = true; + this.pushTailBytes(value); + } + return; + } + + this.tailMode = true; + this.pushTailBytes(value); + } + + finish(): BoundedStreamTextSnapshot { + if (this.finishedSnapshot) { + return this.finishedSnapshot; + } + + const headText = this.decodeChunks(this.headChunks, 0, this.headBufferedBytes); + const tailText = this.decodeChunks(this.tailChunks, this.tailHead, this.tailBufferedBytes); + const text = this.tailMode + ? this.truncated + ? `${headText}${STREAM_STATS_TRUNCATED_MARKER}${tailText}` + : `${headText}${tailText}` + : headText; + + this.finishedSnapshot = { + text, + truncated: this.truncated, + totalBytes: this.totalBytes, + bufferedBytes: this.headBufferedBytes + this.tailBufferedBytes, + chunkCount: this.chunksSeen, + }; + + return this.finishedSnapshot; + } + + private pushTailBytes(value: Uint8Array): void { + if (!value || value.byteLength === 0) { + return; + } + + if (value.byteLength > STREAM_STATS_TAIL_BYTES) { + this.tailChunks.length = 0; + this.tailChunkBytes.length = 0; + this.tailHead = 0; + const tail = value.slice(value.byteLength - STREAM_STATS_TAIL_BYTES); + this.tailChunks.push(tail); + this.tailChunkBytes.push(tail.byteLength); + this.tailBufferedBytes = tail.byteLength; + this.truncated = true; + return; + } + + const copy = value.slice(); + this.tailChunks.push(copy); + this.tailChunkBytes.push(copy.byteLength); + this.tailBufferedBytes += copy.byteLength; + + while ( + this.tailBufferedBytes > STREAM_STATS_TAIL_BYTES && + this.tailHead < this.tailChunkBytes.length + ) { + this.tailBufferedBytes -= this.tailChunkBytes[this.tailHead] ?? 0; + this.tailChunks[this.tailHead] = new Uint8Array(); + this.tailChunkBytes[this.tailHead] = 0; + this.tailHead += 1; + this.truncated = true; + } + + if (this.tailHead > 4096) { + this.tailChunks.splice(0, this.tailHead); + this.tailChunkBytes.splice(0, this.tailHead); + this.tailHead = 0; + } + + const keptCount = this.tailChunks.length - this.tailHead; + if (keptCount > STREAM_STATS_TAIL_CHUNKS) { + const joined = this.concatChunks(this.tailChunks, this.tailHead, this.tailBufferedBytes); + this.tailChunks.length = 0; + this.tailChunkBytes.length = 0; + this.tailHead = 0; + this.tailChunks.push(joined); + this.tailChunkBytes.push(joined.byteLength); + this.tailBufferedBytes = joined.byteLength; + } + } + + private decodeChunks(chunks: Uint8Array[], startIndex: number, totalBytes: number): string { + if (totalBytes <= 0) { + return ""; + } + return new TextDecoder().decode(this.concatChunks(chunks, startIndex, totalBytes)); + } + + private concatChunks(chunks: Uint8Array[], startIndex: number, totalBytes: number): Uint8Array { + if (totalBytes <= 0) { + return new Uint8Array(); + } + + const out = new Uint8Array(totalBytes); + let offset = 0; + for (let i = startIndex; i < chunks.length; i++) { + const chunk = chunks[i]; + if (!chunk || chunk.byteLength === 0) { + continue; + } + out.set(chunk, offset); + offset += chunk.byteLength; + } + return offset === totalBytes ? out : out.slice(0, offset); + } +} /** * Idempotent helper to release the agent pool reference count attached to a session. @@ -74,6 +252,44 @@ function releaseSessionAgent(session: ProxySession): void { } } +function bindTaskAbortToUpstreamResponse( + session: ProxySession, + abortController: AbortController, + taskId: string +): () => void { + const abortUpstream = () => { + const sessionWithController = session as typeof session & { + responseController?: AbortController; + }; + const upstreamController = sessionWithController.responseController; + if (!upstreamController || upstreamController.signal.aborted) { + return; + } + + const reason = + abortController.signal.reason instanceof Error + ? abortController.signal.reason + : new Error("async_task_aborted"); + try { + upstreamController.abort(reason); + } catch (error) { + logger.warn("[ResponseHandler] Failed to abort upstream response for async task", { + taskId, + error, + }); + } + }; + + abortController.signal.addEventListener("abort", abortUpstream, { once: true }); + if (abortController.signal.aborted) { + abortUpstream(); + } + + return () => { + abortController.signal.removeEventListener("abort", abortUpstream); + }; +} + function takeBeforeResponseBodySnapshotSource(session: ProxySession): Response | null { const snapshotSession = session as ProxySession & { detailSnapshotResponseBeforeSource?: Response | null; @@ -1096,6 +1312,12 @@ export class ProxyResponseHandler { const statusCode = response.status; const taskId = `non-stream-passthrough-${messageContext?.id || `unknown-${Date.now()}`}`; + const statsAbortController = new AbortController(); + const cleanupTaskAbortBinding = bindTaskAbortToUpstreamResponse( + session, + statsAbortController, + taskId + ); const statsPromise = (async () => { try { const responseText = await responseForStats.text(); @@ -1203,12 +1425,16 @@ export class ProxyResponseHandler { ); } } finally { + cleanupTaskAbortBinding(); releaseSessionAgent(session); AsyncTaskManager.cleanup(taskId); } })(); - AsyncTaskManager.register(taskId, statsPromise, "non-stream-passthrough-stats"); + AsyncTaskManager.register(taskId, statsPromise, { + taskType: "non-stream-passthrough-stats", + abortController: statsAbortController, + }); statsPromise.catch((error) => { if (session.sessionId && session.shouldPersistSessionDebugArtifacts()) { void discardBeforeResponseBodySnapshot(session); @@ -1272,6 +1498,11 @@ export class ProxyResponseHandler { // 使用 AsyncTaskManager 管理后台处理任务 const taskId = `non-stream-${messageContext?.id || `unknown-${Date.now()}`}`; const abortController = new AbortController(); + const cleanupTaskAbortBinding = bindTaskAbortToUpstreamResponse( + session, + abortController, + taskId + ); const cleanupClientAbortListener = bindClientAbortListener(session.clientAbortSignal, () => { AsyncTaskManager.cancel(taskId); abortController.abort(); @@ -1706,6 +1937,7 @@ export class ProxyResponseHandler { }); } } finally { + cleanupTaskAbortBinding(); cleanupClientAbortListener(); releaseSessionAgent(session); AsyncTaskManager.cleanup(taskId); @@ -1713,7 +1945,10 @@ export class ProxyResponseHandler { })(); // 注册任务并添加全局错误捕获 - AsyncTaskManager.register(taskId, processingPromise, "non-stream-processing"); + AsyncTaskManager.register(taskId, processingPromise, { + taskType: "non-stream-processing", + abortController, + }); processingPromise.catch(async (error) => { logger.error("ResponseHandler: Uncaught error in non-stream processing", { taskId, @@ -1778,6 +2013,12 @@ export class ProxyResponseHandler { const statusCode = response.status; const taskId = `stream-passthrough-${messageContext.id}`; + const statsAbortController = new AbortController(); + const cleanupTaskAbortBinding = bindTaskAbortToUpstreamResponse( + session, + statsAbortController, + taskId + ); const statsPromise = (async () => { const sessionWithCleanup = session as typeof session & { clearResponseTimeout?: () => void; @@ -1787,106 +2028,10 @@ export class ProxyResponseHandler { }; let reader: ReadableStreamDefaultReader | null = null; - // 保护:避免透传 stats 任务把超大响应体无界缓存在内存中(DoS/OOM 风险) - // 说明:用于统计/结算的内容采用“头部 + 尾部窗口”: - // - 头部保留前 MAX_STATS_HEAD_BYTES(便于解析可能前置的 metadata) - // - 尾部保留最近 MAX_STATS_TAIL_BYTES(便于解析结尾 usage/假 200 等) - // - 中间部分会被丢弃(wasTruncated=true),统计将退化为 best-effort - const MAX_STATS_BUFFER_BYTES = 10 * 1024 * 1024; // 10MB - const MAX_STATS_HEAD_BYTES = 1024 * 1024; // 1MB - const MAX_STATS_TAIL_BYTES = MAX_STATS_BUFFER_BYTES - MAX_STATS_HEAD_BYTES; - const MAX_STATS_TAIL_CHUNKS = 8192; - - const headChunks: string[] = []; - let headBufferedBytes = 0; - - const tailChunks: string[] = []; - const tailChunkBytes: number[] = []; - let tailHead = 0; - let tailBufferedBytes = 0; - let wasTruncated = false; - let inTailMode = false; - - const joinTailChunks = (): string => { - if (tailHead <= 0) return tailChunks.join(""); - return tailChunks.slice(tailHead).join(""); - }; - - const joinChunks = (): string => { - const headText = headChunks.join(""); - if (!inTailMode) { - return headText; - } - - const tailText = joinTailChunks(); - - // 用 SSE comment 标记被截断的中间段;parseSSEData 会忽略 ":" 开头的行 - if (wasTruncated) { - // 插入空行强制 flush event,避免“头+尾”拼接后跨 event 误拼接数据行 - return `${headText}\n\n: [cch_truncated]\n\n${tailText}`; - } - - return `${headText}${tailText}`; - }; - - const pushChunk = (text: string, bytes: number) => { - if (!text) return; - - const pushToTail = (tailText: string, tailBytes: number) => { - if (!tailText) return; - - tailChunks.push(tailText); - tailChunkBytes.push(tailBytes); - tailBufferedBytes += tailBytes; - - // 仅保留尾部窗口,避免内存无界增长 - while (tailBufferedBytes > MAX_STATS_TAIL_BYTES && tailHead < tailChunkBytes.length) { - tailBufferedBytes -= tailChunkBytes[tailHead] ?? 0; - tailChunks[tailHead] = ""; - tailChunkBytes[tailHead] = 0; - tailHead += 1; - wasTruncated = true; - } - - // 定期压缩数组,避免 head 指针过大导致 slice/join 性能退化 - if (tailHead > 4096) { - tailChunks.splice(0, tailHead); - tailChunkBytes.splice(0, tailHead); - tailHead = 0; - } - - // 防御:限制 chunk 数量,避免大量超小 chunk 导致对象/数组膨胀(即使总字节数已受限) - const keptCount = tailChunks.length - tailHead; - if (keptCount > MAX_STATS_TAIL_CHUNKS) { - const joined = joinTailChunks(); - tailChunks.length = 0; - tailChunkBytes.length = 0; - tailHead = 0; - tailChunks.push(joined); - tailChunkBytes.push(tailBufferedBytes); - } - }; - - // 优先填充 head;超过 head 上限后切到 tail(但不代表一定发生截断,只有 tail 溢出才算截断) - if (!inTailMode && headBufferedBytes < MAX_STATS_HEAD_BYTES) { - const remainingHeadBytes = MAX_STATS_HEAD_BYTES - headBufferedBytes; - if (remainingHeadBytes > 0 && bytes > remainingHeadBytes) { - const headPart = text.substring(0, remainingHeadBytes); - const tailPart = text.substring(remainingHeadBytes); - - pushChunk(headPart, remainingHeadBytes); - - inTailMode = true; - pushToTail(tailPart, bytes - remainingHeadBytes); - } else { - headChunks.push(text); - headBufferedBytes += bytes; - } - } else { - pushToTail(text, bytes); - } - }; - const decoder = new TextDecoder(); + const streamTextAccumulator = new BoundedStreamTextAccumulator(); + let lastStreamTextSnapshot: BoundedStreamTextSnapshot | null = null; + const getCollectedChunkCount = () => + lastStreamTextSnapshot?.chunkCount ?? streamTextAccumulator.chunkCount; let isFirstChunk = true; let streamEndedNormally = false; let responseTimeoutCleared = false; @@ -1912,11 +2057,10 @@ export class ProxyResponseHandler { providerId: provider.id, providerName: provider.name, idleTimeoutMs, - chunksCollected: headChunks.length + Math.max(0, tailChunks.length - tailHead), - headBufferedBytes, - tailBufferedBytes, - bufferedBytes: headBufferedBytes + tailBufferedBytes, - wasTruncated, + chunksCollected: getCollectedChunkCount(), + totalBytes: streamTextAccumulator.totalByteCount, + bufferedBytes: streamTextAccumulator.bufferedByteCount, + wasTruncated: streamTextAccumulator.isTruncated, }); // 终止上游连接:让透传到客户端的连接也尽快结束,避免永久悬挂占用资源 try { @@ -1945,10 +2089,14 @@ export class ProxyResponseHandler { } }; + const flushAndSnapshot = (): BoundedStreamTextSnapshot => { + const snapshot = streamTextAccumulator.finish(); + lastStreamTextSnapshot = snapshot; + return snapshot; + }; + const flushAndJoin = (): string => { - const flushed = decoder.decode(); - if (flushed) pushChunk(flushed, 0); - return joinChunks(); + return flushAndSnapshot().text; }; try { @@ -1989,25 +2137,7 @@ export class ProxyResponseHandler { clearResponseTimeoutOnce(chunkSize); } - // 尽量填满 head:边界 chunk 可能跨过 head 上限,按 byte 切分以避免 head 少于 1MB - if (!inTailMode && headBufferedBytes < MAX_STATS_HEAD_BYTES) { - const remainingHeadBytes = MAX_STATS_HEAD_BYTES - headBufferedBytes; - if (remainingHeadBytes > 0 && chunkSize > remainingHeadBytes) { - const headPart = value.subarray(0, remainingHeadBytes); - const tailPart = value.subarray(remainingHeadBytes); - - const headText = decoder.decode(headPart, { stream: true }); - pushChunk(headText, remainingHeadBytes); - - inTailMode = true; - const tailText = decoder.decode(tailPart, { stream: true }); - pushChunk(tailText, chunkSize - remainingHeadBytes); - } else { - pushChunk(decoder.decode(value, { stream: true }), chunkSize); - } - } else { - pushChunk(decoder.decode(value, { stream: true }), chunkSize); - } + streamTextAccumulator.pushBytes(value); } // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) @@ -2017,13 +2147,14 @@ export class ProxyResponseHandler { } clearIdleTimer(); - const allContent = flushAndJoin(); + const streamSnapshot = flushAndSnapshot(); + const allContent = streamSnapshot.text; const clientAborted = session.clientAbortSignal?.aborted ?? false; // 存储响应体到 Redis(5分钟过期) if ( session.sessionId && - !wasTruncated && + !streamSnapshot?.truncated && session.shouldPersistSessionDebugArtifacts() ) { void SessionManager.storeSessionResponse( @@ -2053,12 +2184,14 @@ export class ProxyResponseHandler { responseAfterSnapshotTask?.catch((err) => { logger.error("[ResponseHandler] Failed to store response after snapshot:", err); }); - } else if (session.sessionId && wasTruncated) { + } else if (session.sessionId && streamSnapshot?.truncated) { logger.warn("[ResponseHandler] Skip storing passthrough response: body too large", { taskId, providerId: provider.id, providerName: provider.name, - maxBytes: MAX_STATS_BUFFER_BYTES, + maxBytes: STREAM_STATS_MAX_BUFFER_BYTES, + totalBytes: streamSnapshot.totalBytes, + bufferedBytes: streamSnapshot.bufferedBytes, }); } @@ -2157,6 +2290,7 @@ export class ProxyResponseHandler { }); } } finally { + cleanupTaskAbortBinding(); clearIdleTimer(); // 兜底:在流结束/中断后清理首字节超时,避免定时器泄漏 // 注意:不应在流仍可能继续时清理(否则会让首字节超时失效) @@ -2223,7 +2357,10 @@ export class ProxyResponseHandler { } })(); - AsyncTaskManager.register(taskId, statsPromise, "stream-passthrough-stats"); + AsyncTaskManager.register(taskId, statsPromise, { + taskType: "stream-passthrough-stats", + abortController: statsAbortController, + }); statsPromise.catch((error) => { if (session.sessionId && session.shouldPersistSessionDebugArtifacts()) { void discardBeforeResponseBodySnapshot(session); @@ -2322,6 +2459,11 @@ export class ProxyResponseHandler { // 使用 AsyncTaskManager 管理后台处理任务 const taskId = `stream-${messageContext?.id || `unknown-${Date.now()}`}`; const abortController = new AbortController(); + const cleanupTaskAbortBinding = bindTaskAbortToUpstreamResponse( + session, + abortController, + taskId + ); const idleTimeoutMs = provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; const clientAbortDrainTimeoutMs = CLIENT_ABORT_DRAIN_MAX_MS; @@ -2329,7 +2471,10 @@ export class ProxyResponseHandler { // 提升 idleTimeoutId 到外部作用域,以便客户端断开时能清除 let idleTimeoutId: NodeJS.Timeout | null = null; let clientAbortDrainTimeoutId: NodeJS.Timeout | null = null; - const chunks: string[] = []; + const streamTextAccumulator = new BoundedStreamTextAccumulator(); + let lastStreamTextSnapshot: BoundedStreamTextSnapshot | null = null; + const getCollectedChunkCount = () => + lastStreamTextSnapshot?.chunkCount ?? streamTextAccumulator.chunkCount; const clearClientAbortDrainTimer = () => { if (clientAbortDrainTimeoutId) { clearTimeout(clientAbortDrainTimeoutId); @@ -2350,7 +2495,7 @@ export class ProxyResponseHandler { taskId, providerId: provider.id, idleTimeoutMs, - chunksCollected: chunks.length, + chunksCollected: getCollectedChunkCount(), }); // 1. 关闭客户端流(让客户端收到连接关闭通知,避免悬挂) @@ -2435,10 +2580,7 @@ export class ProxyResponseHandler { const processingPromise = (async () => { const reader = internalStream.getReader(); - const decoder = new TextDecoder(); - // 注意:即使 STORE_SESSION_RESPONSE_BODY=false(不写入 Redis),这里也会在内存中累积完整流内容: - // - 用于解析 usage/cost 与内部结算(例如“假 200”检测) - // 因此该开关仅影响“是否持久化”,不用于控制流式内存占用。 + // 统计/结算只保留有界的“头 + 尾”文本快照,避免长流式响应把进程堆撑满。 let usageForCost: UsageMetrics | null = null; let isFirstChunk = true; // 标记是否为第一块数据 @@ -2448,11 +2590,9 @@ export class ProxyResponseHandler { // 静默一直等到 60s drain 总上限。 const flushAndJoin = (): string => { - const flushed = decoder.decode(); - if (flushed) { - chunks.push(flushed); - } - return chunks.join(""); + const snapshot = streamTextAccumulator.finish(); + lastStreamTextSnapshot = snapshot; + return snapshot.text; }; const finalizeStream = async ( @@ -2473,8 +2613,14 @@ export class ProxyResponseHandler { const streamErrorMessage = finalized.errorMessage; const providerIdForPersistence = finalized.providerIdForPersistence; - // 存储响应体到 Redis(5分钟过期) - if (session.sessionId && session.shouldPersistSessionDebugArtifacts()) { + const streamSnapshot = lastStreamTextSnapshot; + + // 存储响应体到 Redis(5分钟过期)。截断后的统计快照不是完整正文,不能伪装成完整调试正文落盘。 + if ( + session.sessionId && + session.shouldPersistSessionDebugArtifacts() && + !streamSnapshot?.truncated + ) { const beforeBody = (await consumeBeforeResponseBodySnapshot(session)) ?? allContent; void SessionManager.storeSessionResponse( session.sessionId, @@ -2503,6 +2649,16 @@ export class ProxyResponseHandler { responseBeforeSnapshotTask?.catch((err) => { logger.error("[ResponseHandler] Failed to store response before snapshot:", err); }); + } else if (session.sessionId && streamSnapshot?.truncated) { + discardBeforeResponseBodySnapshot(session); + logger.warn("[ResponseHandler] Skip storing stream response: body too large", { + taskId, + providerId: provider.id, + providerName: provider.name, + maxBytes: STREAM_STATS_MAX_BUFFER_BYTES, + totalBytes: streamSnapshot.totalBytes, + bufferedBytes: streamSnapshot.bufferedBytes, + }); } const duration = Date.now() - session.startTime; @@ -2746,7 +2902,7 @@ export class ProxyResponseHandler { statusCode: effectiveStatusCode, durationMs: duration, isStreaming: true, - sseEventCount: chunks.length, + sseEventCount: getCollectedChunkCount(), errorMessage: streamErrorMessage ?? undefined, }); }; @@ -2760,7 +2916,7 @@ export class ProxyResponseHandler { taskId, providerId: provider.id, providerName: provider.name, - chunksCollected: chunks.length, + chunksCollected: getCollectedChunkCount(), }); break; // 提前终止 } @@ -2772,14 +2928,14 @@ export class ProxyResponseHandler { } if (value) { const chunkSize = value.length; - chunks.push(decoder.decode(value, { stream: true })); + streamTextAccumulator.pushBytes(value); // 每次收到数据后重置静默期计时器(首次收到数据时启动) startIdleTimer(); logger.trace("ResponseHandler: Idle timer reset (data received)", { taskId, providerId: provider.id, - chunksCollected: chunks.length, + chunksCollected: getCollectedChunkCount(), lastChunkSize: chunkSize, idleTimeoutMs: idleTimeoutMs === Infinity ? "disabled" : idleTimeoutMs, }); @@ -2852,7 +3008,7 @@ export class ProxyResponseHandler { providerId: provider.id, providerName: provider.name, messageId: messageContext.id, - chunksCollected: chunks.length, + chunksCollected: getCollectedChunkCount(), errorName: err.name, }); @@ -2887,7 +3043,7 @@ export class ProxyResponseHandler { providerId: provider.id, providerName: provider.name, messageId: messageContext.id, - chunksCollected: chunks.length, + chunksCollected: getCollectedChunkCount(), }); // 注意:无法重试,因为客户端已收到 HTTP 200 @@ -2926,7 +3082,7 @@ export class ProxyResponseHandler { providerId: provider.id, providerName: provider.name, messageId: messageContext.id, - chunksCollected: chunks.length, + chunksCollected: getCollectedChunkCount(), errorName: err.name, errorMessage: err.message || "(empty message)", }); @@ -2959,7 +3115,7 @@ export class ProxyResponseHandler { providerId: provider.id, providerName: provider.name, messageId: messageContext.id, - chunksCollected: chunks.length, + chunksCollected: getCollectedChunkCount(), errorName: err.name, reason: err.name === "ResponseAborted" @@ -2985,7 +3141,7 @@ export class ProxyResponseHandler { providerId: provider.id, providerName: provider.name, messageId: messageContext.id, - chunksCollected: chunks.length, + chunksCollected: getCollectedChunkCount(), errorName: err.name, errorMessage: err.message || "(empty message)", errorCode: (err as NodeJS.ErrnoException).code, @@ -3037,6 +3193,7 @@ export class ProxyResponseHandler { } } finally { // 确保资源释放 + cleanupTaskAbortBinding(); cleanupClientAbortListener(); clearClientAbortDrainTimer(); clearIdleTimer(); // 清除静默期计时器(防止泄漏) @@ -3054,7 +3211,10 @@ export class ProxyResponseHandler { })(); // 注册任务并添加全局错误捕获 - AsyncTaskManager.register(taskId, processingPromise, "stream-processing"); + AsyncTaskManager.register(taskId, processingPromise, { + taskType: "stream-processing", + abortController, + }); processingPromise.catch(async (error) => { logger.error("ResponseHandler: Uncaught error in stream processing", { taskId, diff --git a/src/lib/async-task-manager.ts b/src/lib/async-task-manager.ts index 7fa7184d2..a4ecb8b57 100644 --- a/src/lib/async-task-manager.ts +++ b/src/lib/async-task-manager.ts @@ -21,8 +21,17 @@ interface TaskInfo { abortController: AbortController; createdAt: number; taskType: string; + staleTimeoutMs: number; } +interface RegisterTaskOptions { + taskType?: string; + abortController?: AbortController; + staleTimeoutMs?: number; +} + +const DEFAULT_STALE_TASK_TIMEOUT_MS = 10 * 60 * 1000; + class AsyncTaskManagerClass { private tasks: Map = new Map(); private cleanupInterval: NodeJS.Timeout | null = null; @@ -74,25 +83,40 @@ class AsyncTaskManagerClass { * @param taskType 任务类型(用于日志) * @returns AbortController(可用于取消任务) */ - register(taskId: string, promise: Promise, taskType = "unknown"): AbortController { + register( + taskId: string, + promise: Promise, + taskTypeOrOptions: string | RegisterTaskOptions = "unknown" + ): AbortController { this.initializeIfNeeded(); + const options = + typeof taskTypeOrOptions === "string" ? { taskType: taskTypeOrOptions } : taskTypeOrOptions; + const taskType = options.taskType ?? "unknown"; + // 如果任务已存在,先取消旧任务 - if (this.tasks.has(taskId)) { + const oldTaskInfo = this.tasks.get(taskId); + if (oldTaskInfo) { logger.warn("[AsyncTaskManager] Task already exists, cancelling old task", { taskId, taskType, }); this.cancel(taskId); + this.cleanup(taskId, oldTaskInfo); } - const abortController = new AbortController(); + const abortController = options.abortController ?? new AbortController(); + const staleTimeoutMs = + options.staleTimeoutMs === undefined || options.staleTimeoutMs <= 0 + ? DEFAULT_STALE_TASK_TIMEOUT_MS + : options.staleTimeoutMs; const taskInfo: TaskInfo = { promise, abortController, createdAt: Date.now(), taskType, + staleTimeoutMs, }; this.tasks.set(taskId, taskInfo); @@ -126,7 +150,7 @@ class AsyncTaskManagerClass { } }) .finally(() => { - this.cleanup(taskId); + this.cleanup(taskId, taskInfo); }); logger.debug("[AsyncTaskManager] Task registered", { @@ -150,7 +174,9 @@ class AsyncTaskManagerClass { return; } - taskInfo.abortController.abort(); + if (!taskInfo.abortController.signal.aborted) { + taskInfo.abortController.abort(); + } logger.info("[AsyncTaskManager] Task cancelled", { taskId, @@ -164,7 +190,11 @@ class AsyncTaskManagerClass { * * @param taskId 任务唯一标识 */ - cleanup(taskId: string): void { + cleanup(taskId: string, expectedTask?: TaskInfo): boolean { + if (expectedTask && this.tasks.get(taskId) !== expectedTask) { + return false; + } + const deleted = this.tasks.delete(taskId); if (deleted) { logger.debug("[AsyncTaskManager] Task cleaned up", { @@ -172,6 +202,7 @@ class AsyncTaskManagerClass { remainingTasks: this.tasks.size, }); } + return deleted; } /** @@ -191,14 +222,18 @@ class AsyncTaskManagerClass { for (const [taskId, taskInfo] of this.tasks.entries()) { const age = now - taskInfo.createdAt; - // 如果任务超过 10 分钟还没完成,记录警告并取消 - if (age > staleThreshold) { - logger.warn("[AsyncTaskManager] Task timeout, cancelling", { + const staleTimeoutMs = taskInfo.staleTimeoutMs || staleThreshold; + + // 如果任务超过阈值还没完成,记录警告、取消并从 Map 断开强引用。 + if (age > staleTimeoutMs) { + logger.warn("[AsyncTaskManager] Task timeout, cancelling and detaching", { taskId, taskType: taskInfo.taskType, age, + staleTimeoutMs, }); this.cancel(taskId); + this.cleanup(taskId, taskInfo); } } } @@ -211,8 +246,9 @@ class AsyncTaskManagerClass { count: this.tasks.size, }); - for (const taskId of this.tasks.keys()) { + for (const [taskId, taskInfo] of Array.from(this.tasks.entries())) { this.cancel(taskId); + this.cleanup(taskId, taskInfo); } if (this.cleanupInterval) { diff --git a/src/lib/langfuse/emit-proxy-trace.ts b/src/lib/langfuse/emit-proxy-trace.ts index 64e4ffe78..68ee66ab2 100644 --- a/src/lib/langfuse/emit-proxy-trace.ts +++ b/src/lib/langfuse/emit-proxy-trace.ts @@ -3,6 +3,10 @@ import type { ProxySession } from "@/app/v1/_lib/proxy/session"; import { logger } from "@/lib/logger"; import type { CostBreakdown } from "@/lib/utils/cost-calculation"; +const LANGFUSE_RESPONSE_TEXT_MAX_CHARS = 1024 * 1024; +const LANGFUSE_RESPONSE_TEXT_EDGE_CHARS = 128 * 1024; +const LANGFUSE_TRUNCATED_MARKER = "\n\n[langfuse_response_truncated]\n\n"; + export interface EmitProxyLangfuseTraceData { responseHeaders: Headers; responseText: string; @@ -16,6 +20,81 @@ export interface EmitProxyLangfuseTraceData { errorMessage?: string; } +function truncateResponseTextForLangfuse(text: string): string { + if (text.length <= LANGFUSE_RESPONSE_TEXT_MAX_CHARS) { + return text; + } + + return `${text.slice(0, LANGFUSE_RESPONSE_TEXT_EDGE_CHARS)}${LANGFUSE_TRUNCATED_MARKER}${text.slice( + -LANGFUSE_RESPONSE_TEXT_EDGE_CHARS + )}`; +} + +function buildRequestMessagePreview(message: Record): Record { + return { + truncatedForLangfuse: true, + model: typeof message.model === "string" ? message.model : undefined, + stream: typeof message.stream === "boolean" ? message.stream : undefined, + max_tokens: typeof message.max_tokens === "number" ? message.max_tokens : undefined, + temperature: typeof message.temperature === "number" ? message.temperature : undefined, + messageCount: Array.isArray(message.messages) ? message.messages.length : undefined, + contentsCount: Array.isArray(message.contents) ? message.contents.length : undefined, + toolsCount: Array.isArray(message.tools) ? message.tools.length : undefined, + hasSystemPrompt: + (Array.isArray(message.system) && message.system.length > 0) || + (typeof message.system === "string" && message.system.length > 0), + }; +} + +function buildLangfuseSessionSnapshot(session: ProxySession): ProxySession { + const providerChain = session.getProviderChain().map((item) => ({ ...item })); + const specialSettings = session.getSpecialSettings(); + const cacheTtlResolved = session.getCacheTtlResolved(); + const context1mApplied = session.getContext1mApplied(); + const currentModel = session.getCurrentModel(); + const originalModel = session.getOriginalModel(); + const modelRedirected = session.isModelRedirected(); + const endpoint = session.getEndpoint(); + const requestSequence = session.getRequestSequence(); + const messagesLength = session.getMessagesLength(); + const forwardedRequestBody = + typeof session.forwardedRequestBody === "string" + ? truncateResponseTextForLangfuse(session.forwardedRequestBody) + : null; + const requestMessage = buildRequestMessagePreview(session.request.message); + + return { + startTime: session.startTime, + method: session.method, + headers: new Headers(session.headers), + request: { + message: requestMessage, + log: truncateResponseTextForLangfuse(session.request.log ?? ""), + note: session.request.note, + model: session.request.model, + imageRequestMetadata: null, + }, + userAgent: session.userAgent, + provider: session.provider, + messageContext: session.messageContext, + ttfbMs: session.ttfbMs, + forwardStartTime: session.forwardStartTime, + forwardedRequestBody, + sessionId: session.sessionId, + originalFormat: session.originalFormat, + getMessagesLength: () => messagesLength, + getEndpoint: () => endpoint, + getCurrentModel: () => currentModel, + getProviderChain: () => providerChain, + getRequestSequence: () => requestSequence, + getOriginalModel: () => originalModel, + isModelRedirected: () => modelRedirected, + getSpecialSettings: () => specialSettings, + getCacheTtlResolved: () => cacheTtlResolved, + getContext1mApplied: () => context1mApplied, + } as unknown as ProxySession; +} + /** * 异步发送代理请求的 Langfuse trace。 * @@ -27,15 +106,19 @@ export function emitProxyLangfuseTrace( ): void { if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return; + // 必须在异步 import 之前截断,避免动态加载/SDK 发送期间闭包继续强引用完整大响应。 + const responseText = truncateResponseTextForLangfuse(data.responseText); + const sessionSnapshot = buildLangfuseSessionSnapshot(session); + void import("@/lib/langfuse/trace-proxy-request") .then(({ traceProxyRequest }) => { void traceProxyRequest({ - session, + session: sessionSnapshot, responseHeaders: data.responseHeaders, durationMs: data.durationMs, statusCode: data.statusCode, isStreaming: data.isStreaming, - responseText: data.responseText, + responseText, usageMetrics: data.usageMetrics, costUsd: data.costUsd, costBreakdown: data.costBreakdown, diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts index 09a23f7fd..55d118ec8 100644 --- a/src/lib/langfuse/trace-proxy-request.ts +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -4,6 +4,9 @@ import { isLangfuseEnabled } from "@/lib/langfuse/index"; import { logger } from "@/lib/logger"; import type { CostBreakdown } from "@/lib/utils/cost-calculation"; +const LANGFUSE_JSON_PARSE_MAX_CHARS = 1024 * 1024; +const LANGFUSE_TEXT_PREVIEW_EDGE_CHARS = 128 * 1024; + function buildRequestBodySummary(session: ProxySession): Record { const msg = session.request.message as Record; return { @@ -126,6 +129,15 @@ function buildResponseOutput(ctx: TraceContext): unknown { return output; } +function buildLargeTextPreview(text: string): Record { + return { + truncated: true, + totalChars: text.length, + head: text.slice(0, LANGFUSE_TEXT_PREVIEW_EDGE_CHARS), + tail: text.slice(-LANGFUSE_TEXT_PREVIEW_EDGE_CHARS), + }; +} + /** * Send a trace to Langfuse for a completed proxy request. * Fully async and non-blocking. Errors are caught and logged. @@ -422,6 +434,10 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { } function tryParseJsonSafe(text: string): unknown { + if (text.length > LANGFUSE_JSON_PARSE_MAX_CHARS) { + return buildLargeTextPreview(text); + } + try { return JSON.parse(text); } catch { diff --git a/tests/unit/lib/async-task-manager-edge-runtime.test.ts b/tests/unit/lib/async-task-manager-edge-runtime.test.ts index 4ee32cf1e..369e1aca2 100644 --- a/tests/unit/lib/async-task-manager-edge-runtime.test.ts +++ b/tests/unit/lib/async-task-manager-edge-runtime.test.ts @@ -215,6 +215,37 @@ describe.sequential("AsyncTaskManager edge runtime", () => { await Promise.all([firstPromise, secondPromise]); }); + it("does not let an old task finalizer remove a newer task with the same taskId", async () => { + process.env.CI = "true"; + process.env.NEXT_RUNTIME = "nodejs"; + + const { AsyncTaskManager } = await import("@/lib/async-task-manager"); + + let resolveFirst: () => void; + const firstPromise = new Promise((resolve) => { + resolveFirst = resolve; + }); + AsyncTaskManager.register("t1", firstPromise); + + let resolveSecond: () => void; + const secondPromise = new Promise((resolve) => { + resolveSecond = resolve; + }); + AsyncTaskManager.register("t1", secondPromise); + + resolveFirst!(); + await firstPromise; + await new Promise((resolve) => queueMicrotask(() => resolve())); + + expect(AsyncTaskManager.getActiveTaskCount()).toBe(1); + + resolveSecond!(); + await secondPromise; + await new Promise((resolve) => queueMicrotask(() => resolve())); + + expect(AsyncTaskManager.getActiveTaskCount()).toBe(0); + }); + it("logs task cancelled when isClientAbortError returns true", async () => { process.env.CI = "true"; process.env.NEXT_RUNTIME = "nodejs"; @@ -283,6 +314,7 @@ describe.sequential("AsyncTaskManager edge runtime", () => { expect(controller.signal.aborted).toBe(true); expect(freshController.signal.aborted).toBe(false); + expect(AsyncTaskManager.getActiveTaskCount()).toBe(1); expect(vi.mocked(logger.warn)).toHaveBeenCalled(); resolveTask!(); @@ -290,6 +322,41 @@ describe.sequential("AsyncTaskManager edge runtime", () => { await Promise.all([taskPromise, freshPromise]); }); + it("cleanupCompletedTasks aborts a provided controller and detaches stale tasks", async () => { + process.env.CI = "true"; + process.env.NEXT_RUNTIME = "nodejs"; + + const { AsyncTaskManager } = await import("@/lib/async-task-manager"); + + let resolveTask: () => void; + const taskPromise = new Promise((resolve) => { + resolveTask = resolve; + }); + const controller = new AbortController(); + + const returnedController = AsyncTaskManager.register("stale-task", taskPromise, { + taskType: "stream-processing", + abortController: controller, + }); + expect(returnedController).toBe(controller); + + const managerAny = AsyncTaskManager as unknown as { + tasks: Map; + cleanupCompletedTasks: () => void; + }; + const info = managerAny.tasks.get("stale-task"); + expect(info).toBeDefined(); + info!.createdAt = Date.now() - 11 * 60 * 1000; + + managerAny.cleanupCompletedTasks(); + + expect(controller.signal.aborted).toBe(true); + expect(AsyncTaskManager.getActiveTaskCount()).toBe(0); + + resolveTask!(); + await taskPromise; + }); + it("cleanupAll cancels tasks and clears interval", async () => { process.env.CI = "true"; process.env.NEXT_RUNTIME = "nodejs"; @@ -313,6 +380,7 @@ describe.sequential("AsyncTaskManager edge runtime", () => { managerAny.cleanupAll(); expect(controller.signal.aborted).toBe(true); + expect(AsyncTaskManager.getActiveTaskCount()).toBe(0); expect(clearIntervalSpy).toHaveBeenCalledWith(intervalId); expect(managerAny.cleanupInterval).toBeNull(); diff --git a/tests/unit/proxy/response-handler-client-abort-drain.test.ts b/tests/unit/proxy/response-handler-client-abort-drain.test.ts index 7ce1d7f77..8a12a308a 100644 --- a/tests/unit/proxy/response-handler-client-abort-drain.test.ts +++ b/tests/unit/proxy/response-handler-client-abort-drain.test.ts @@ -4,6 +4,8 @@ import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler"; import { ProxySession } from "@/app/v1/_lib/proxy/session"; import { setDeferredStreamingFinalization } from "@/app/v1/_lib/proxy/stream-finalization"; import { AsyncTaskManager } from "@/lib/async-task-manager"; +import { emitProxyLangfuseTrace } from "@/lib/langfuse/emit-proxy-trace"; +import { SessionManager } from "@/lib/session-manager"; import { updateMessageRequestDetails, updateMessageRequestDuration } from "@/repository/message"; import type { Provider } from "@/types/provider"; @@ -82,6 +84,7 @@ vi.mock("@/lib/session-manager", () => ({ storeSessionUpstreamResponseMeta: vi.fn(), updateSessionProvider: vi.fn(), updateSessionUsage: vi.fn(), + updateSessionBindingSmart: vi.fn(async () => ({ updated: false, reason: "test" })), updateSessionWithCodexCacheKey: vi.fn(), }, })); @@ -273,6 +276,33 @@ function createResponsesSse(): Response { }); } +function createOversizedResponsesSse(): Response { + const oversizedDelta = "x".repeat(11 * 1024 * 1024); + const body = [ + `event: response.output_text.delta\ndata: ${JSON.stringify({ + type: "response.output_text.delta", + delta: oversizedDelta, + })}`, + `event: response.completed\ndata: ${JSON.stringify({ + type: "response.completed", + response: { + id: "resp_large", + model: "gpt-5.4-mini-2026-03-17", + usage: { + input_tokens: 463, + output_tokens: 11, + }, + }, + })}`, + "", + ].join("\n\n"); + + return new Response(body, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); +} + function createErroredResponsesSse(): Response { const encoder = new TextEncoder(); const stream = new ReadableStream({ @@ -559,6 +589,47 @@ describe("ProxyResponseHandler stream client abort finalization", () => { ); }); + it("keeps stream accounting bounded for oversized successful streams", async () => { + const controller = new AbortController(); + const session = createSession(controller.signal); + session.sessionId = "session_large"; + Object.assign(session, { + shouldPersistSessionDebugArtifacts: () => true, + }); + setDeferredStreamingFinalization(session, { + providerId: 1, + providerName: "avemujica-responses", + providerPriority: 1, + attemptNumber: 1, + totalProvidersAttempted: 1, + isFirstAttempt: true, + isFailoverSuccess: false, + endpointId: 42, + endpointUrl: "https://api.test.invalid/v1", + upstreamStatusCode: 200, + }); + + await ProxyResponseHandler.dispatch(session, createOversizedResponsesSse()); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + inputTokens: 463, + outputTokens: 11, + }) + ); + expect(SessionManager.storeSessionResponse).not.toHaveBeenCalled(); + + const traceCall = vi.mocked(emitProxyLangfuseTrace).mock.calls.at(-1); + expect(traceCall).toBeDefined(); + const traceData = traceCall?.[1]; + const responseText = traceData?.responseText ?? ""; + expect(responseText).toContain("[cch_truncated]"); + expect(responseText.length).toBeLessThan(10 * 1024 * 1024 + 1024); + }); + it("reclassifies a client-aborted stream as success when final usage was already received", async () => { const controller = new AbortController(); controller.abort(); diff --git a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts index 8142f190f..6822adbeb 100644 --- a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts +++ b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts @@ -6,6 +6,7 @@ import { resolveEndpointPolicy } from "@/app/v1/_lib/proxy/endpoint-policy"; import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler"; import { ProxySession } from "@/app/v1/_lib/proxy/session"; import { SessionManager } from "@/lib/session-manager"; +import { updateMessageRequestDetails } from "@/repository/message"; import type { Provider } from "@/types/provider"; const asyncTasks: Promise[] = []; @@ -75,7 +76,7 @@ vi.mock("@/repository/model-price", () => ({ vi.mock("@/lib/session-manager", () => ({ SessionManager: { storeSessionResponse: vi.fn(), - updateSessionUsage: vi.fn(), + updateSessionUsage: vi.fn(async () => undefined), clearSessionProvider: vi.fn(), storeSessionRequestPhaseSnapshot: vi.fn(async () => undefined), storeSessionResponsePhaseSnapshot: vi.fn(async () => undefined), @@ -662,4 +663,64 @@ describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => { await Promise.allSettled(asyncTasks); } }); + + test("Gemini 流式透传超大单 chunk 应保留尾部 usage 且不把截断快照作为完整正文存储", async () => { + asyncTasks.length = 0; + vi.mocked(SessionManager.storeSessionResponse).mockClear(); + vi.mocked(updateMessageRequestDetails).mockClear(); + + const clientAbortController = new AbortController(); + const provider = createProvider({ + firstByteTimeoutStreamingMs: 1000, + streamingIdleTimeoutMs: 0, + }); + const session = createSession({ + clientAbortSignal: clientAbortController.signal, + messageId: 77, + userId: 1, + }); + session.setProvider(provider); + session.setSessionId("gemini-large-single-chunk"); + ( + session as ProxySession & { + shouldPersistSessionDebugArtifacts?: () => boolean; + } + ).shouldPersistSessionDebugArtifacts = () => true; + + const hugeText = "x".repeat(11 * 1024 * 1024); + const bodyText = `data: {"text":"${hugeText}"}\n\ndata: {"usageMetadata":{"promptTokenCount":463,"candidatesTokenCount":11}}\n\n`; + const bodyBytes = new TextEncoder().encode(bodyText); + + const upstreamResponse = new Response( + new ReadableStream({ + start(controller) { + controller.enqueue(bodyBytes); + controller.close(); + }, + }), + { + status: 200, + headers: { "content-type": "text/event-stream" }, + } + ); + + const returned = await ( + ProxyResponseHandler as unknown as { + handleStream: (session: ProxySession, response: Response) => Promise; + } + ).handleStream(session, upstreamResponse); + + await returned.text(); + await Promise.allSettled(asyncTasks); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 77, + expect.objectContaining({ + statusCode: 200, + inputTokens: 463, + outputTokens: 11, + }) + ); + expect(SessionManager.storeSessionResponse).not.toHaveBeenCalled(); + }); }); From 92bb359b2bde4543a25609b7b6f7bff1c0d76987 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Mon, 22 Jun 2026 22:20:33 +0800 Subject: [PATCH 2/7] =?UTF-8?q?fix:=20=E4=BF=AE=E6=AD=A3=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=B4=BB=E5=8A=A8=E8=B6=85=E6=97=B6=E5=88=A4?= =?UTF-8?q?=E5=AE=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 15 +++--- src/lib/async-task-manager.ts | 28 +++++++++-- src/lib/langfuse/trace-proxy-request.ts | 15 +++++- tests/unit/langfuse/langfuse-trace.test.ts | 42 +++++++++++++++++ .../async-task-manager-edge-runtime.test.ts | 46 +++++++++++++++++-- ...nse-handler-abort-listener-cleanup.test.ts | 1 + .../response-handler-bill-non-success.test.ts | 1 + ...esponse-handler-client-abort-drain.test.ts | 1 + ...handler-endpoint-circuit-isolation.test.ts | 1 + ...gemini-stream-passthrough-timeouts.test.ts | 1 + ...ponse-handler-hedge-loser-priority.test.ts | 1 + .../response-handler-lease-decrement.test.ts | 1 + .../proxy/response-handler-non200.test.ts | 1 + 13 files changed, 134 insertions(+), 20 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 547fab67b..974b64baf 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -119,15 +119,10 @@ class BoundedStreamTextAccumulator { return; } - if (remainingHeadBytes > 0) { - this.headChunks.push(value.slice(0, remainingHeadBytes)); - this.headBufferedBytes += remainingHeadBytes; - this.tailMode = true; - this.pushTailBytes(value.subarray(remainingHeadBytes)); - } else { - this.tailMode = true; - this.pushTailBytes(value); - } + this.headChunks.push(value.slice(0, remainingHeadBytes)); + this.headBufferedBytes += remainingHeadBytes; + this.tailMode = true; + this.pushTailBytes(value.subarray(remainingHeadBytes)); return; } @@ -2138,6 +2133,7 @@ export class ProxyResponseHandler { } streamTextAccumulator.pushBytes(value); + AsyncTaskManager.touch(taskId); } // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) @@ -2929,6 +2925,7 @@ export class ProxyResponseHandler { if (value) { const chunkSize = value.length; streamTextAccumulator.pushBytes(value); + AsyncTaskManager.touch(taskId); // 每次收到数据后重置静默期计时器(首次收到数据时启动) startIdleTimer(); diff --git a/src/lib/async-task-manager.ts b/src/lib/async-task-manager.ts index a4ecb8b57..20f06ce6c 100644 --- a/src/lib/async-task-manager.ts +++ b/src/lib/async-task-manager.ts @@ -20,6 +20,7 @@ interface TaskInfo { promise: Promise; abortController: AbortController; createdAt: number; + lastActivityAt: number; taskType: string; staleTimeoutMs: number; } @@ -110,11 +111,13 @@ class AsyncTaskManagerClass { options.staleTimeoutMs === undefined || options.staleTimeoutMs <= 0 ? DEFAULT_STALE_TASK_TIMEOUT_MS : options.staleTimeoutMs; + const now = Date.now(); const taskInfo: TaskInfo = { promise, abortController, - createdAt: Date.now(), + createdAt: now, + lastActivityAt: now, taskType, staleTimeoutMs, }; @@ -162,6 +165,20 @@ class AsyncTaskManagerClass { return abortController; } + /** + * 标记任务仍在推进。流式任务每次读到 chunk 都应 touch,避免长时间活跃流被 + * wall-clock stale cleanup 误判为挂死任务。 + */ + touch(taskId: string): boolean { + const taskInfo = this.tasks.get(taskId); + if (!taskInfo) { + return false; + } + + taskInfo.lastActivityAt = Date.now(); + return true; + } + /** * 取消一个任务 * @@ -217,19 +234,20 @@ class AsyncTaskManagerClass { */ private cleanupCompletedTasks(): void { const now = Date.now(); - const staleThreshold = 10 * 60 * 1000; // 10 分钟 for (const [taskId, taskInfo] of this.tasks.entries()) { const age = now - taskInfo.createdAt; + const idleAge = now - taskInfo.lastActivityAt; - const staleTimeoutMs = taskInfo.staleTimeoutMs || staleThreshold; + const staleTimeoutMs = taskInfo.staleTimeoutMs || DEFAULT_STALE_TASK_TIMEOUT_MS; - // 如果任务超过阈值还没完成,记录警告、取消并从 Map 断开强引用。 - if (age > staleTimeoutMs) { + // 如果任务超过阈值没有任何进展,记录警告、取消并从 Map 断开强引用。 + if (idleAge > staleTimeoutMs) { logger.warn("[AsyncTaskManager] Task timeout, cancelling and detaching", { taskId, taskType: taskInfo.taskType, age, + idleAge, staleTimeoutMs, }); this.cancel(taskId); diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts index 55d118ec8..abbe359ea 100644 --- a/src/lib/langfuse/trace-proxy-request.ts +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -9,11 +9,22 @@ const LANGFUSE_TEXT_PREVIEW_EDGE_CHARS = 128 * 1024; function buildRequestBodySummary(session: ProxySession): Record { const msg = session.request.message as Record; + const hasSystemPrompt = + typeof msg.hasSystemPrompt === "boolean" + ? msg.hasSystemPrompt + : Array.isArray(msg.system) && msg.system.length > 0; + const toolsCount = + typeof msg.toolsCount === "number" + ? msg.toolsCount + : Array.isArray(msg.tools) + ? msg.tools.length + : 0; + return { model: session.request.model, messageCount: session.getMessagesLength(), - hasSystemPrompt: Array.isArray(msg.system) && msg.system.length > 0, - toolsCount: Array.isArray(msg.tools) ? msg.tools.length : 0, + hasSystemPrompt, + toolsCount, stream: msg.stream === true, maxTokens: typeof msg.max_tokens === "number" ? msg.max_tokens : undefined, temperature: typeof msg.temperature === "number" ? msg.temperature : undefined, diff --git a/tests/unit/langfuse/langfuse-trace.test.ts b/tests/unit/langfuse/langfuse-trace.test.ts index 3c3c105f8..91bd63899 100644 --- a/tests/unit/langfuse/langfuse-trace.test.ts +++ b/tests/unit/langfuse/langfuse-trace.test.ts @@ -350,6 +350,48 @@ describe("traceProxyRequest", () => { ); }); + test("should preserve request summary fields from lightweight Langfuse previews", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession({ + request: { + message: { + truncatedForLangfuse: true, + model: "claude-sonnet-4-20250514", + stream: true, + max_tokens: 1024, + temperature: 0.7, + messageCount: 3, + toolsCount: 2, + hasSystemPrompt: true, + }, + model: "claude-sonnet-4-20250514", + }, + getMessagesLength: () => 3, + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: true, + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].metadata.requestSummary).toEqual( + expect.objectContaining({ + model: "claude-sonnet-4-20250514", + messageCount: 3, + hasSystemPrompt: true, + toolsCount: 2, + stream: true, + maxTokens: 1024, + temperature: 0.7, + }) + ); + }); + test("should handle model redirect metadata", async () => { const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); diff --git a/tests/unit/lib/async-task-manager-edge-runtime.test.ts b/tests/unit/lib/async-task-manager-edge-runtime.test.ts index 369e1aca2..dbc21f36f 100644 --- a/tests/unit/lib/async-task-manager-edge-runtime.test.ts +++ b/tests/unit/lib/async-task-manager-edge-runtime.test.ts @@ -297,12 +297,14 @@ describe.sequential("AsyncTaskManager edge runtime", () => { const controller = AsyncTaskManager.register("stale-task", taskPromise, "custom_type"); const managerAny = AsyncTaskManager as unknown as { - tasks: Map; + tasks: Map; cleanupCompletedTasks: () => void; }; const info = managerAny.tasks.get("stale-task"); expect(info).toBeDefined(); - info!.createdAt = Date.now() - 11 * 60 * 1000; + const oldTimestamp = Date.now() - 11 * 60 * 1000; + info!.createdAt = oldTimestamp; + info!.lastActivityAt = oldTimestamp; let resolveFresh: () => void; const freshPromise = new Promise((resolve) => { @@ -322,6 +324,40 @@ describe.sequential("AsyncTaskManager edge runtime", () => { await Promise.all([taskPromise, freshPromise]); }); + it("does not cancel a long-running task that was recently touched", async () => { + process.env.CI = "true"; + process.env.NEXT_RUNTIME = "nodejs"; + + const { AsyncTaskManager } = await import("@/lib/async-task-manager"); + + let resolveTask: () => void; + const taskPromise = new Promise((resolve) => { + resolveTask = resolve; + }); + + const controller = AsyncTaskManager.register("active-stream", taskPromise, "stream-processing"); + + const managerAny = AsyncTaskManager as unknown as { + tasks: Map; + cleanupCompletedTasks: () => void; + }; + const info = managerAny.tasks.get("active-stream"); + expect(info).toBeDefined(); + const oldTimestamp = Date.now() - 11 * 60 * 1000; + info!.createdAt = oldTimestamp; + info!.lastActivityAt = oldTimestamp; + + expect(AsyncTaskManager.touch("active-stream")).toBe(true); + managerAny.cleanupCompletedTasks(); + + expect(controller.signal.aborted).toBe(false); + expect(AsyncTaskManager.getActiveTaskCount()).toBe(1); + expect(AsyncTaskManager.touch("missing-task")).toBe(false); + + resolveTask!(); + await taskPromise; + }); + it("cleanupCompletedTasks aborts a provided controller and detaches stale tasks", async () => { process.env.CI = "true"; process.env.NEXT_RUNTIME = "nodejs"; @@ -341,12 +377,14 @@ describe.sequential("AsyncTaskManager edge runtime", () => { expect(returnedController).toBe(controller); const managerAny = AsyncTaskManager as unknown as { - tasks: Map; + tasks: Map; cleanupCompletedTasks: () => void; }; const info = managerAny.tasks.get("stale-task"); expect(info).toBeDefined(); - info!.createdAt = Date.now() - 11 * 60 * 1000; + const oldTimestamp = Date.now() - 11 * 60 * 1000; + info!.createdAt = oldTimestamp; + info!.lastActivityAt = oldTimestamp; managerAny.cleanupCompletedTasks(); diff --git a/tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts b/tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts index 350c33d06..73f6933b9 100644 --- a/tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts +++ b/tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts @@ -22,6 +22,7 @@ vi.mock("@/lib/async-task-manager", () => ({ testState.asyncTasks.push(promise); return new AbortController(); }, + touch: () => true, cleanup: testState.cleanupTask, cancel: testState.cancelTask, }, diff --git a/tests/unit/proxy/response-handler-bill-non-success.test.ts b/tests/unit/proxy/response-handler-bill-non-success.test.ts index 86e0e441a..f4d04466e 100644 --- a/tests/unit/proxy/response-handler-bill-non-success.test.ts +++ b/tests/unit/proxy/response-handler-bill-non-success.test.ts @@ -17,6 +17,7 @@ vi.mock("@/lib/logger", () => ({ vi.mock("@/lib/async-task-manager", () => ({ AsyncTaskManager: { register: () => new AbortController(), + touch: () => true, cleanup: () => {}, cancel: () => {}, }, diff --git a/tests/unit/proxy/response-handler-client-abort-drain.test.ts b/tests/unit/proxy/response-handler-client-abort-drain.test.ts index 8a12a308a..805de1f5f 100644 --- a/tests/unit/proxy/response-handler-client-abort-drain.test.ts +++ b/tests/unit/proxy/response-handler-client-abort-drain.test.ts @@ -23,6 +23,7 @@ vi.mock("@/lib/async-task-manager", () => ({ asyncTasks.push(promise); return new AbortController(); }), + touch: vi.fn(() => true), cleanup: vi.fn(), cancel: vi.fn(), }, diff --git a/tests/unit/proxy/response-handler-endpoint-circuit-isolation.test.ts b/tests/unit/proxy/response-handler-endpoint-circuit-isolation.test.ts index 05b6bfa7a..5b6f88a28 100644 --- a/tests/unit/proxy/response-handler-endpoint-circuit-isolation.test.ts +++ b/tests/unit/proxy/response-handler-endpoint-circuit-isolation.test.ts @@ -21,6 +21,7 @@ vi.mock("@/lib/async-task-manager", () => ({ asyncTasks.push(promise); return new AbortController(); }, + touch: () => true, cleanup: () => {}, cancel: () => {}, }, diff --git a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts index 6822adbeb..abb28737c 100644 --- a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts +++ b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts @@ -42,6 +42,7 @@ vi.mock("@/lib/async-task-manager", () => ({ asyncTasks.push(promise); return new AbortController(); }, + touch: () => true, cleanup: () => {}, cancel: () => {}, }, diff --git a/tests/unit/proxy/response-handler-hedge-loser-priority.test.ts b/tests/unit/proxy/response-handler-hedge-loser-priority.test.ts index d22d19f36..9b0c1f7b7 100644 --- a/tests/unit/proxy/response-handler-hedge-loser-priority.test.ts +++ b/tests/unit/proxy/response-handler-hedge-loser-priority.test.ts @@ -31,6 +31,7 @@ vi.mock("@/lib/logger", () => ({ vi.mock("@/lib/async-task-manager", () => ({ AsyncTaskManager: { register: () => new AbortController(), + touch: vi.fn(() => true), cleanup: vi.fn(), cancel: vi.fn(), }, diff --git a/tests/unit/proxy/response-handler-lease-decrement.test.ts b/tests/unit/proxy/response-handler-lease-decrement.test.ts index 12a6fe282..54364245e 100644 --- a/tests/unit/proxy/response-handler-lease-decrement.test.ts +++ b/tests/unit/proxy/response-handler-lease-decrement.test.ts @@ -21,6 +21,7 @@ vi.mock("@/lib/async-task-manager", () => ({ asyncTasks.push(promise); return new AbortController(); }, + touch: () => true, cleanup: () => {}, cancel: () => {}, }, diff --git a/tests/unit/proxy/response-handler-non200.test.ts b/tests/unit/proxy/response-handler-non200.test.ts index 74e0bc909..ef62b25a7 100644 --- a/tests/unit/proxy/response-handler-non200.test.ts +++ b/tests/unit/proxy/response-handler-non200.test.ts @@ -23,6 +23,7 @@ vi.mock("@/lib/async-task-manager", () => ({ asyncTasks.push(promise); return new AbortController(); }, + touch: () => true, cleanup: () => {}, cancel: () => {}, }, From 0edd5828ca2e6e98d7bce534e725026ccbddda15 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Mon, 22 Jun 2026 22:47:13 +0800 Subject: [PATCH 3/7] =?UTF-8?q?fix:=20=E6=94=B6=E7=B4=A7=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=B8=85=E7=90=86=E4=B8=8E=E5=B0=BE=E9=83=A8=E8=A3=81=E5=89=AA?= =?UTF-8?q?=E8=AF=AD=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 29 +++++-- src/lib/async-task-manager.ts | 6 +- src/lib/langfuse/emit-proxy-trace.ts | 29 ++++--- ...esponse-handler-client-abort-drain.test.ts | 79 +++++++++++++++++++ 4 files changed, 123 insertions(+), 20 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 974b64baf..123ab7d4d 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -180,10 +180,27 @@ class BoundedStreamTextAccumulator { this.tailBufferedBytes > STREAM_STATS_TAIL_BYTES && this.tailHead < this.tailChunkBytes.length ) { - this.tailBufferedBytes -= this.tailChunkBytes[this.tailHead] ?? 0; - this.tailChunks[this.tailHead] = new Uint8Array(); - this.tailChunkBytes[this.tailHead] = 0; - this.tailHead += 1; + const overflowBytes = this.tailBufferedBytes - STREAM_STATS_TAIL_BYTES; + const oldestChunkBytes = this.tailChunkBytes[this.tailHead] ?? 0; + + if (oldestChunkBytes <= 0) { + this.tailHead += 1; + continue; + } + + if (overflowBytes >= oldestChunkBytes) { + this.tailBufferedBytes -= oldestChunkBytes; + this.tailChunks[this.tailHead] = new Uint8Array(); + this.tailChunkBytes[this.tailHead] = 0; + this.tailHead += 1; + this.truncated = true; + continue; + } + + const oldestChunk = this.tailChunks[this.tailHead]!; + this.tailChunks[this.tailHead] = oldestChunk.slice(overflowBytes); + this.tailChunkBytes[this.tailHead] = oldestChunkBytes - overflowBytes; + this.tailBufferedBytes -= overflowBytes; this.truncated = true; } @@ -1422,7 +1439,6 @@ export class ProxyResponseHandler { } finally { cleanupTaskAbortBinding(); releaseSessionAgent(session); - AsyncTaskManager.cleanup(taskId); } })(); @@ -1935,7 +1951,6 @@ export class ProxyResponseHandler { cleanupTaskAbortBinding(); cleanupClientAbortListener(); releaseSessionAgent(session); - AsyncTaskManager.cleanup(taskId); } })(); @@ -2349,7 +2364,6 @@ export class ProxyResponseHandler { }); } releaseSessionAgent(session); - AsyncTaskManager.cleanup(taskId); } })(); @@ -3203,7 +3217,6 @@ export class ProxyResponseHandler { }); } releaseSessionAgent(session); - AsyncTaskManager.cleanup(taskId); } })(); diff --git a/src/lib/async-task-manager.ts b/src/lib/async-task-manager.ts index 20f06ce6c..dbac41322 100644 --- a/src/lib/async-task-manager.ts +++ b/src/lib/async-task-manager.ts @@ -203,12 +203,12 @@ class AsyncTaskManagerClass { } /** - * 清理单个任务 + * 清理单个任务。必须带上注册时的任务实例,避免旧任务 finally 误删同 taskId 的新任务。 * * @param taskId 任务唯一标识 */ - cleanup(taskId: string, expectedTask?: TaskInfo): boolean { - if (expectedTask && this.tasks.get(taskId) !== expectedTask) { + private cleanup(taskId: string, expectedTask: TaskInfo): boolean { + if (this.tasks.get(taskId) !== expectedTask) { return false; } diff --git a/src/lib/langfuse/emit-proxy-trace.ts b/src/lib/langfuse/emit-proxy-trace.ts index 68ee66ab2..1d143c117 100644 --- a/src/lib/langfuse/emit-proxy-trace.ts +++ b/src/lib/langfuse/emit-proxy-trace.ts @@ -109,21 +109,32 @@ export function emitProxyLangfuseTrace( // 必须在异步 import 之前截断,避免动态加载/SDK 发送期间闭包继续强引用完整大响应。 const responseText = truncateResponseTextForLangfuse(data.responseText); const sessionSnapshot = buildLangfuseSessionSnapshot(session); + const { + responseHeaders, + durationMs, + statusCode, + isStreaming, + usageMetrics, + costUsd, + costBreakdown, + sseEventCount, + errorMessage, + } = data; void import("@/lib/langfuse/trace-proxy-request") .then(({ traceProxyRequest }) => { void traceProxyRequest({ session: sessionSnapshot, - responseHeaders: data.responseHeaders, - durationMs: data.durationMs, - statusCode: data.statusCode, - isStreaming: data.isStreaming, + responseHeaders, + durationMs, + statusCode, + isStreaming, responseText, - usageMetrics: data.usageMetrics, - costUsd: data.costUsd, - costBreakdown: data.costBreakdown, - sseEventCount: data.sseEventCount, - errorMessage: data.errorMessage, + usageMetrics, + costUsd, + costBreakdown, + sseEventCount, + errorMessage, }); }) .catch((err) => { diff --git a/tests/unit/proxy/response-handler-client-abort-drain.test.ts b/tests/unit/proxy/response-handler-client-abort-drain.test.ts index 805de1f5f..e43e3f554 100644 --- a/tests/unit/proxy/response-handler-client-abort-drain.test.ts +++ b/tests/unit/proxy/response-handler-client-abort-drain.test.ts @@ -304,6 +304,51 @@ function createOversizedResponsesSse(): Response { }); } +function createSplitTailBoundaryResponsesSse(): Response { + const encoder = new TextEncoder(); + const completedEvent = `event: response.completed\ndata: ${JSON.stringify({ + type: "response.completed", + response: { + id: "resp_split_tail", + model: "gpt-5.4-mini-2026-03-17", + usage: { + input_tokens: 463, + output_tokens: 11, + }, + }, + })}\n\n`; + const splitAt = Math.floor(completedEvent.length / 2); + const firstChunk = encoder.encode( + `event: response.output_text.delta\ndata: ${JSON.stringify({ + type: "response.output_text.delta", + delta: "x".repeat(9 * 1024 * 1024), + })}\n\n${completedEvent.slice(0, splitAt)}` + ); + const secondChunk = encoder.encode( + `${completedEvent.slice(splitAt)}event: response.output_text.delta\ndata: ${JSON.stringify({ + type: "response.output_text.delta", + delta: "y".repeat(2 * 1024 * 1024), + })}\n\n` + ); + const chunks = [firstChunk, secondChunk]; + let index = 0; + + const stream = new ReadableStream({ + pull(controller) { + if (index < chunks.length) { + controller.enqueue(chunks[index++]); + return; + } + controller.close(); + }, + }); + + return new Response(stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); +} + function createErroredResponsesSse(): Response { const encoder = new TextEncoder(); const stream = new ReadableStream({ @@ -631,6 +676,40 @@ describe("ProxyResponseHandler stream client abort finalization", () => { expect(responseText.length).toBeLessThan(10 * 1024 * 1024 + 1024); }); + it("keeps usage when a terminal responses event is split across tail chunk eviction", async () => { + const controller = new AbortController(); + const session = createSession(controller.signal); + session.sessionId = "session_split_tail"; + Object.assign(session, { + shouldPersistSessionDebugArtifacts: () => true, + }); + setDeferredStreamingFinalization(session, { + providerId: 1, + providerName: "avemujica-responses", + providerPriority: 1, + attemptNumber: 1, + totalProvidersAttempted: 1, + isFirstAttempt: true, + isFailoverSuccess: false, + endpointId: 42, + endpointUrl: "https://api.test.invalid/v1", + upstreamStatusCode: 200, + }); + + await ProxyResponseHandler.dispatch(session, createSplitTailBoundaryResponsesSse()); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + inputTokens: 463, + outputTokens: 11, + }) + ); + expect(SessionManager.storeSessionResponse).not.toHaveBeenCalled(); + }); + it("reclassifies a client-aborted stream as success when final usage was already received", async () => { const controller = new AbortController(); controller.abort(); From bd0e3a27acd507d4db4b95f8bd63ffe1b62182b1 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Mon, 22 Jun 2026 23:10:14 +0800 Subject: [PATCH 4/7] =?UTF-8?q?fix:=20=E4=BF=9D=E6=8C=81=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E5=BF=AB=E7=85=A7=E8=BF=9E=E7=BB=AD=20UTF-8=20=E8=A7=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 36 +++++++-- src/lib/async-task-manager.ts | 7 +- ...esponse-handler-client-abort-drain.test.ts | 79 ++++++++++++++++++- 3 files changed, 111 insertions(+), 11 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 123ab7d4d..65552de16 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -135,13 +135,7 @@ class BoundedStreamTextAccumulator { return this.finishedSnapshot; } - const headText = this.decodeChunks(this.headChunks, 0, this.headBufferedBytes); - const tailText = this.decodeChunks(this.tailChunks, this.tailHead, this.tailBufferedBytes); - const text = this.tailMode - ? this.truncated - ? `${headText}${STREAM_STATS_TRUNCATED_MARKER}${tailText}` - : `${headText}${tailText}` - : headText; + const text = this.createSnapshotText(); this.finishedSnapshot = { text, @@ -154,6 +148,20 @@ class BoundedStreamTextAccumulator { return this.finishedSnapshot; } + private createSnapshotText(): string { + if (!this.tailMode) { + return this.decodeChunks(this.headChunks, 0, this.headBufferedBytes); + } + + if (!this.truncated) { + return this.decodeContiguousBufferedBytes(); + } + + const headText = this.decodeChunks(this.headChunks, 0, this.headBufferedBytes); + const tailText = this.decodeChunks(this.tailChunks, this.tailHead, this.tailBufferedBytes); + return `${headText}${STREAM_STATS_TRUNCATED_MARKER}${tailText}`; + } + private pushTailBytes(value: Uint8Array): void { if (!value || value.byteLength === 0) { return; @@ -229,6 +237,20 @@ class BoundedStreamTextAccumulator { return new TextDecoder().decode(this.concatChunks(chunks, startIndex, totalBytes)); } + private decodeContiguousBufferedBytes(): string { + const totalBytes = this.headBufferedBytes + this.tailBufferedBytes; + if (totalBytes <= 0) { + return ""; + } + + const headBytes = this.concatChunks(this.headChunks, 0, this.headBufferedBytes); + const tailBytes = this.concatChunks(this.tailChunks, this.tailHead, this.tailBufferedBytes); + const out = new Uint8Array(headBytes.byteLength + tailBytes.byteLength); + out.set(headBytes, 0); + out.set(tailBytes, headBytes.byteLength); + return new TextDecoder().decode(out); + } + private concatChunks(chunks: Uint8Array[], startIndex: number, totalBytes: number): Uint8Array { if (totalBytes <= 0) { return new Uint8Array(); diff --git a/src/lib/async-task-manager.ts b/src/lib/async-task-manager.ts index dbac41322..912cf4140 100644 --- a/src/lib/async-task-manager.ts +++ b/src/lib/async-task-manager.ts @@ -70,7 +70,7 @@ class AsyncTaskManagerClass { this.cleanupAll(); }); - // 每分钟检查并清理超时任务(>10 分钟未完成,防止内存泄漏) + // 每分钟检查并清理空闲超时任务,防止挂死后台任务长期强引用上下文。 this.cleanupInterval = setInterval(() => { this.cleanupCompletedTasks(); }, 60000); @@ -225,12 +225,13 @@ class AsyncTaskManagerClass { /** * 检查并清理超时任务 * - * 遍历所有活跃任务,对于超过 10 分钟还未完成的任务: + * 遍历所有活跃任务,对于空闲时间超过任务级 staleTimeoutMs 的任务: * 1. 记录警告日志 * 2. 触发 AbortController 取消任务 * 3. 从任务 Map 中移除 * - * ⚠️ 注意:这不是清理"已完成"的任务,而是清理"超时未完成"的任务 + * 注意:这是清理"空闲超时"的任务。活跃流应在收到上游 chunk 时 + * 调用 touch() 更新 lastActivityAt,避免被误判为挂死任务。 */ private cleanupCompletedTasks(): void { const now = Date.now(); diff --git a/tests/unit/proxy/response-handler-client-abort-drain.test.ts b/tests/unit/proxy/response-handler-client-abort-drain.test.ts index e43e3f554..4f4ca4f99 100644 --- a/tests/unit/proxy/response-handler-client-abort-drain.test.ts +++ b/tests/unit/proxy/response-handler-client-abort-drain.test.ts @@ -10,6 +10,7 @@ import { updateMessageRequestDetails, updateMessageRequestDuration } from "@/rep import type { Provider } from "@/types/provider"; const asyncTasks: Promise[] = []; +const STREAM_STATS_HEAD_BYTES_FOR_TEST = 1024 * 1024; vi.mock("@/app/v1/_lib/proxy/response-fixer", () => ({ ResponseFixer: { @@ -75,7 +76,7 @@ vi.mock("@/lib/session-manager", () => ({ SessionManager: { clearSessionProvider: vi.fn(), extractCodexPromptCacheKey: vi.fn(), - storeSessionResponse: vi.fn(), + storeSessionResponse: vi.fn(async () => undefined), storeSessionRequestPhaseSnapshot: vi.fn(), storeSessionResponsePhaseSnapshot: vi.fn(), storeSessionRequestHeaders: vi.fn(), @@ -304,6 +305,42 @@ function createOversizedResponsesSse(): Response { }); } +function createUtf8SplitHeadTailResponsesSse(): Response { + const encoder = new TextEncoder(); + const eventPrefix = `event: response.output_text.delta\ndata: {"type":"response.output_text.delta","delta":"`; + const splitChar = "界"; + const prefixBytes = encoder.encode(eventPrefix).byteLength; + const fillBytes = STREAM_STATS_HEAD_BYTES_FOR_TEST - prefixBytes - 1; + if (fillBytes < 0) { + throw new Error("test event prefix is too large for the head window"); + } + + const completedEvent = `event: response.completed\ndata: ${JSON.stringify({ + type: "response.completed", + response: { + id: "resp_utf8_boundary", + model: "gpt-5.4-mini-2026-03-17", + usage: { + input_tokens: 463, + output_tokens: 11, + }, + }, + })}\n\n`; + const body = `${eventPrefix}${"a".repeat(fillBytes)}${splitChar}"}\n\n${completedEvent}`; + const chunk = encoder.encode(body); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(chunk); + controller.close(); + }, + }); + + return new Response(stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); +} + function createSplitTailBoundaryResponsesSse(): Response { const encoder = new TextEncoder(); const completedEvent = `event: response.completed\ndata: ${JSON.stringify({ @@ -676,6 +713,46 @@ describe("ProxyResponseHandler stream client abort finalization", () => { expect(responseText.length).toBeLessThan(10 * 1024 * 1024 + 1024); }); + it("decodes an untruncated stream as contiguous UTF-8 across the head/tail split", async () => { + const controller = new AbortController(); + const session = createSession(controller.signal); + session.sessionId = "session_utf8_boundary"; + Object.assign(session, { + shouldPersistSessionDebugArtifacts: () => true, + }); + setDeferredStreamingFinalization(session, { + providerId: 1, + providerName: "avemujica-responses", + providerPriority: 1, + attemptNumber: 1, + totalProvidersAttempted: 1, + isFirstAttempt: true, + isFailoverSuccess: false, + endpointId: 42, + endpointUrl: "https://api.test.invalid/v1", + upstreamStatusCode: 200, + }); + + await ProxyResponseHandler.dispatch(session, createUtf8SplitHeadTailResponsesSse()); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + inputTokens: 463, + outputTokens: 11, + }) + ); + + const traceCall = vi.mocked(emitProxyLangfuseTrace).mock.calls.at(-1); + expect(traceCall).toBeDefined(); + const responseText = traceCall?.[1].responseText ?? ""; + expect(responseText).toContain("界"); + expect(responseText).not.toContain("\uFFFD"); + expect(responseText).not.toContain("[cch_truncated]"); + }); + it("keeps usage when a terminal responses event is split across tail chunk eviction", async () => { const controller = new AbortController(); const session = createSession(controller.signal); From aa343361932a1978931f178139f5f7112400afd5 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Mon, 22 Jun 2026 23:46:43 +0800 Subject: [PATCH 5/7] =?UTF-8?q?fix:=20=E6=94=B6=E7=B4=A7=E9=9D=9E=E6=B5=81?= =?UTF-8?q?=E5=BC=8F=E5=93=8D=E5=BA=94=E4=BD=93=E8=AF=BB=E5=8F=96=E7=94=9F?= =?UTF-8?q?=E5=91=BD=E5=91=A8=E6=9C=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 63 ++++++++++++--- .../response-handler-lease-decrement.test.ts | 77 ++++++++++++++++++- 2 files changed, 130 insertions(+), 10 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 65552de16..dd2d538f5 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -324,6 +324,44 @@ function bindTaskAbortToUpstreamResponse( }; } +async function readResponseTextWithTaskActivity( + response: Response, + taskId: string +): Promise { + if (!response.body) { + AsyncTaskManager.touch(taskId); + return response.text(); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + const chunks: string[] = []; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + if (!value || value.byteLength === 0) { + continue; + } + + AsyncTaskManager.touch(taskId); + chunks.push(decoder.decode(value, { stream: true })); + } + + const finalText = decoder.decode(); + if (finalText) { + chunks.push(finalText); + } + AsyncTaskManager.touch(taskId); + return chunks.join(""); + } finally { + reader.releaseLock(); + } +} + function takeBeforeResponseBodySnapshotSource(session: ProxySession): Response | null { const snapshotSession = session as ProxySession & { detailSnapshotResponseBeforeSource?: Response | null; @@ -1299,17 +1337,17 @@ export class ProxyResponseHandler { const statusCode = response.status; let finalResponse = response; - const persistNonStreamAfterSnapshot = async (targetResponse: Response) => { + let finalResponseBodyForSnapshot: string | null = null; + const persistNonStreamAfterSnapshot = (targetResponse: Response, body: string) => { if (!session.sessionId || !session.shouldPersistSessionDebugArtifacts()) { return; } - const finalBody = await targetResponse.clone().text(); const responseAfterSnapshotTask = SessionManager.storeSessionResponsePhaseSnapshot?.( session.sessionId, "after", { - body: finalBody, + body, headers: targetResponse.headers, meta: { upstreamUrl: null, @@ -1354,7 +1392,7 @@ export class ProxyResponseHandler { ); const statsPromise = (async () => { try { - const responseText = await responseForStats.text(); + const responseText = await readResponseTextWithTaskActivity(responseForStats, taskId); const sessionWithCleanup = session as typeof session & { clearResponseTimeout?: () => void; @@ -1505,6 +1543,7 @@ export class ProxyResponseHandler { const responseData = JSON.parse(responseText) as GeminiResponse; const transformed = GeminiAdapter.transformResponse(responseData, false); + const transformedBody = JSON.stringify(transformed); logger.debug( "[ResponseHandler] Transformed Gemini non-stream response to client format", @@ -1516,7 +1555,8 @@ export class ProxyResponseHandler { ); // ⭐ 清理传输 headers(body 已从流转为 JSON 字符串) - finalResponse = new Response(JSON.stringify(transformed), { + finalResponseBodyForSnapshot = transformedBody; + finalResponse = new Response(transformedBody, { status: response.status, statusText: response.statusText, headers: cleanResponseHeaders(response.headers), @@ -1524,6 +1564,7 @@ export class ProxyResponseHandler { } catch (error) { logger.error("[ResponseHandler] Failed to transform Gemini non-stream response:", error); finalResponse = response; + finalResponseBodyForSnapshot = null; } } } @@ -1598,7 +1639,7 @@ export class ProxyResponseHandler { } // ⭐ 非流式:读取完整响应体(会等待所有数据下载完成) - const responseText = await responseForLog.text(); + const responseText = await readResponseTextWithTaskActivity(responseForLog, taskId); // ⭐ 响应体读取完成:清除响应超时定时器 const sessionWithCleanup = session as typeof session & { @@ -1684,6 +1725,13 @@ export class ProxyResponseHandler { responseBeforeSnapshotTask?.catch((err) => { logger.error("[ResponseHandler] Failed to store response before snapshot:", err); }); + + // after 快照复用本任务已经读取到的响应文本,避免再启动一个未受 + // AsyncTaskManager 管理的 clone().text() 读取分支。 + persistNonStreamAfterSnapshot( + finalResponse, + finalResponseBodyForSnapshot ?? responseText + ); } if (billableUsageMetrics && messageContext) { @@ -1998,9 +2046,6 @@ export class ProxyResponseHandler { }); }); - void persistNonStreamAfterSnapshot(finalResponse).catch((error) => { - logger.error("[ResponseHandler] Failed to persist non-stream after snapshot", { error }); - }); return finalResponse; } diff --git a/tests/unit/proxy/response-handler-lease-decrement.test.ts b/tests/unit/proxy/response-handler-lease-decrement.test.ts index 54364245e..28bc86d65 100644 --- a/tests/unit/proxy/response-handler-lease-decrement.test.ts +++ b/tests/unit/proxy/response-handler-lease-decrement.test.ts @@ -21,7 +21,7 @@ vi.mock("@/lib/async-task-manager", () => ({ asyncTasks.push(promise); return new AbortController(); }, - touch: () => true, + touch: vi.fn(() => true), cleanup: () => {}, cancel: () => {}, }, @@ -60,6 +60,7 @@ vi.mock("@/lib/session-manager", () => ({ SessionManager: { updateSessionUsage: vi.fn(async () => undefined), storeSessionResponse: vi.fn(), + storeSessionResponsePhaseSnapshot: vi.fn(async () => undefined), extractCodexPromptCacheKey: vi.fn(), updateSessionWithCodexCacheKey: vi.fn(), }, @@ -89,6 +90,7 @@ vi.mock("@/lib/proxy-status-tracker", () => ({ import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler"; import { ProxySession } from "@/app/v1/_lib/proxy/session"; +import { AsyncTaskManager } from "@/lib/async-task-manager"; import { SessionManager } from "@/lib/session-manager"; import { RateLimitService } from "@/lib/rate-limit"; import { SessionTracker } from "@/lib/session-tracker"; @@ -266,6 +268,38 @@ function createNonStreamResponse(usage: { input_tokens: number; output_tokens: n ); } +function createChunkedNonStreamResponse(usage: { + input_tokens: number; + output_tokens: number; +}): Response { + const body = JSON.stringify({ + type: "message", + usage, + }); + const encoder = new TextEncoder(); + const chunks = [ + encoder.encode(body.slice(0, 8)), + encoder.encode(body.slice(8, 24)), + encoder.encode(body.slice(24)), + ]; + let index = 0; + + const stream = new ReadableStream({ + pull(controller) { + if (index < chunks.length) { + controller.enqueue(chunks[index++]); + return; + } + controller.close(); + }, + }); + + return new Response(stream, { + status: 200, + headers: { "content-type": "application/json" }, + }); +} + function createStreamResponse(usage: { input_tokens: number; output_tokens: number }): Response { const sseText = `event: message_delta\ndata: ${JSON.stringify({ usage })}\n\n`; const encoder = new TextEncoder(); @@ -304,6 +338,7 @@ describe("Lease Budget Decrement after trackCostToRedis", () => { vi.mocked(updateMessageRequestDetails).mockResolvedValue(undefined); vi.mocked(updateMessageRequestDuration).mockResolvedValue(undefined); vi.mocked(SessionManager.storeSessionResponse).mockResolvedValue(undefined); + vi.mocked(SessionManager.storeSessionResponsePhaseSnapshot).mockResolvedValue(undefined); vi.mocked(RateLimitService.trackCost).mockResolvedValue(undefined); vi.mocked(RateLimitService.trackUserDailyCost).mockResolvedValue(undefined); vi.mocked(RateLimitService.decrementLeaseBudget).mockResolvedValue({ @@ -357,6 +392,46 @@ describe("Lease Budget Decrement after trackCostToRedis", () => { } }); + it("should refresh task activity while reading chunked non-stream response bodies", async () => { + const messageId = 5010; + const session = createSession({ + originalModel, + redirectedModel: originalModel, + sessionId: "sess-non-stream-chunked-touch", + messageId, + }); + + const response = createChunkedNonStreamResponse(usage); + const cloneSpy = vi.spyOn(response, "clone"); + + await ProxyResponseHandler.dispatch(session, response); + await drainAsyncTasks(); + + const taskId = `non-stream-${messageId}`; + const touchCalls = vi + .mocked(AsyncTaskManager.touch) + .mock.calls.filter(([calledTaskId]) => calledTaskId === taskId); + expect(touchCalls.length).toBeGreaterThanOrEqual(2); + expect(cloneSpy).toHaveBeenCalledTimes(1); + expect(SessionManager.storeSessionResponsePhaseSnapshot).toHaveBeenCalledWith( + session.sessionId, + "after", + expect.objectContaining({ + body: expect.stringContaining('"type":"message"'), + meta: expect.objectContaining({ statusCode: 200 }), + }), + session.requestSequence + ); + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + messageId, + expect.objectContaining({ + statusCode: 200, + inputTokens: usage.input_tokens, + outputTokens: usage.output_tokens, + }) + ); + }); + it("should call decrementLeaseBudget for all windows and entity types (stream)", async () => { const session = createSession({ originalModel, From 0cbef88260c22a09db584273e7b5738392925314 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Tue, 23 Jun 2026 00:11:14 +0800 Subject: [PATCH 6/7] =?UTF-8?q?fix:=20=E5=A4=8D=E5=88=B6=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E7=AA=97=E5=8F=A3=E5=AD=97=E8=8A=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 16 +++++++----- ...esponse-handler-client-abort-drain.test.ts | 25 ++++++++++++++++++- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index dd2d538f5..da6e71210 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -71,9 +71,13 @@ type BoundedStreamTextSnapshot = { chunkCount: number; }; +function copyUint8Range(value: Uint8Array, start = 0, end = value.byteLength): Uint8Array { + return new Uint8Array(value.subarray(start, end)); +} + // 流式统计只需要头部元信息和尾部 usage/final event。按字节保存窗口,避免 // string[] 无界增长,也避免 subarray 持有超大原始 ArrayBuffer。 -class BoundedStreamTextAccumulator { +export class BoundedStreamTextAccumulator { private readonly headChunks: Uint8Array[] = []; private readonly tailChunks: Uint8Array[] = []; private readonly tailChunkBytes: number[] = []; @@ -114,12 +118,12 @@ class BoundedStreamTextAccumulator { if (!this.tailMode && this.headBufferedBytes < STREAM_STATS_HEAD_BYTES) { const remainingHeadBytes = STREAM_STATS_HEAD_BYTES - this.headBufferedBytes; if (value.byteLength <= remainingHeadBytes) { - this.headChunks.push(value.slice()); + this.headChunks.push(copyUint8Range(value)); this.headBufferedBytes += value.byteLength; return; } - this.headChunks.push(value.slice(0, remainingHeadBytes)); + this.headChunks.push(copyUint8Range(value, 0, remainingHeadBytes)); this.headBufferedBytes += remainingHeadBytes; this.tailMode = true; this.pushTailBytes(value.subarray(remainingHeadBytes)); @@ -171,7 +175,7 @@ class BoundedStreamTextAccumulator { this.tailChunks.length = 0; this.tailChunkBytes.length = 0; this.tailHead = 0; - const tail = value.slice(value.byteLength - STREAM_STATS_TAIL_BYTES); + const tail = copyUint8Range(value, value.byteLength - STREAM_STATS_TAIL_BYTES); this.tailChunks.push(tail); this.tailChunkBytes.push(tail.byteLength); this.tailBufferedBytes = tail.byteLength; @@ -179,7 +183,7 @@ class BoundedStreamTextAccumulator { return; } - const copy = value.slice(); + const copy = copyUint8Range(value); this.tailChunks.push(copy); this.tailChunkBytes.push(copy.byteLength); this.tailBufferedBytes += copy.byteLength; @@ -206,7 +210,7 @@ class BoundedStreamTextAccumulator { } const oldestChunk = this.tailChunks[this.tailHead]!; - this.tailChunks[this.tailHead] = oldestChunk.slice(overflowBytes); + this.tailChunks[this.tailHead] = copyUint8Range(oldestChunk, overflowBytes); this.tailChunkBytes[this.tailHead] = oldestChunkBytes - overflowBytes; this.tailBufferedBytes -= overflowBytes; this.truncated = true; diff --git a/tests/unit/proxy/response-handler-client-abort-drain.test.ts b/tests/unit/proxy/response-handler-client-abort-drain.test.ts index 4f4ca4f99..e629c7e3a 100644 --- a/tests/unit/proxy/response-handler-client-abort-drain.test.ts +++ b/tests/unit/proxy/response-handler-client-abort-drain.test.ts @@ -1,6 +1,9 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { resolveEndpointPolicy } from "@/app/v1/_lib/proxy/endpoint-policy"; -import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler"; +import { + BoundedStreamTextAccumulator, + ProxyResponseHandler, +} from "@/app/v1/_lib/proxy/response-handler"; import { ProxySession } from "@/app/v1/_lib/proxy/session"; import { setDeferredStreamingFinalization } from "@/app/v1/_lib/proxy/stream-finalization"; import { AsyncTaskManager } from "@/lib/async-task-manager"; @@ -640,6 +643,26 @@ describe("ProxyResponseHandler stream client abort finalization", () => { vi.clearAllMocks(); }); + it("copies Buffer-backed stream windows before retaining stats snapshots", () => { + const accumulator = new BoundedStreamTextAccumulator(); + const headMarker = "head-copy-marker"; + const tailMarker = "tail-copy-marker"; + const originalChunk = Buffer.from(`${headMarker}${"x".repeat(11 * 1024 * 1024)}${tailMarker}`); + const originalLength = originalChunk.byteLength; + + accumulator.pushBytes(originalChunk); + originalChunk.fill("z"); + + const snapshot = accumulator.finish(); + + expect(snapshot.truncated).toBe(true); + expect(snapshot.totalBytes).toBe(originalLength); + expect(snapshot.bufferedBytes).toBe(10 * 1024 * 1024); + expect(snapshot.text).toContain(headMarker); + expect(snapshot.text).toContain(tailMarker); + expect(snapshot.text).not.toContain("zzzzzzzzzzzzzzzz"); + }); + it("finalizes a complete upstream responses stream as success when the downstream client already closed", async () => { const controller = new AbortController(); controller.abort(); From a0ee9d8d8866058ef0e73d02e7f49ec49535d38b Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Tue, 23 Jun 2026 00:46:44 +0800 Subject: [PATCH 7/7] =?UTF-8?q?fix:=20=E5=AF=B9=E9=BD=90=E5=93=8D=E5=BA=94?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=20stale=20=E8=B6=85=E6=97=B6=E8=AF=AD?= =?UTF-8?q?=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 32 ++++++++- ...esponse-handler-client-abort-drain.test.ts | 71 +++++++++++++++++++ ...gemini-stream-passthrough-timeouts.test.ts | 47 +++++++++++- 3 files changed, 146 insertions(+), 4 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index da6e71210..b9e2edee7 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -75,6 +75,24 @@ function copyUint8Range(value: Uint8Array, start = 0, end = value.byteLength): U return new Uint8Array(value.subarray(start, end)); } +function resolveNonStreamTaskStaleTimeoutMs(provider: Provider): number { + return provider.requestTimeoutNonStreamingMs > 0 + ? provider.requestTimeoutNonStreamingMs + : Number.POSITIVE_INFINITY; +} + +function resolveStreamTaskStaleTimeoutMs(provider: Provider): number { + if (provider.streamingIdleTimeoutMs <= 0) { + return Number.POSITIVE_INFINITY; + } + + if (provider.firstByteTimeoutStreamingMs > 0) { + return Math.max(provider.firstByteTimeoutStreamingMs, provider.streamingIdleTimeoutMs); + } + + return Number.POSITIVE_INFINITY; +} + // 流式统计只需要头部元信息和尾部 usage/final event。按字节保存窗口,避免 // string[] 无界增长,也避免 subarray 持有超大原始 ArrayBuffer。 export class BoundedStreamTextAccumulator { @@ -1509,6 +1527,7 @@ export class ProxyResponseHandler { AsyncTaskManager.register(taskId, statsPromise, { taskType: "non-stream-passthrough-stats", abortController: statsAbortController, + staleTimeoutMs: resolveNonStreamTaskStaleTimeoutMs(provider), }); statsPromise.catch((error) => { if (session.sessionId && session.shouldPersistSessionDebugArtifacts()) { @@ -2032,6 +2051,7 @@ export class ProxyResponseHandler { AsyncTaskManager.register(taskId, processingPromise, { taskType: "non-stream-processing", abortController, + staleTimeoutMs: resolveNonStreamTaskStaleTimeoutMs(provider), }); processingPromise.catch(async (error) => { logger.error("ResponseHandler: Uncaught error in non-stream processing", { @@ -2094,6 +2114,7 @@ export class ProxyResponseHandler { const statusCode = response.status; const taskId = `stream-passthrough-${messageContext.id}`; + const streamTaskStaleTimeoutMs = resolveStreamTaskStaleTimeoutMs(provider); const statsAbortController = new AbortController(); const cleanupTaskAbortBinding = bindTaskAbortToUpstreamResponse( session, @@ -2120,7 +2141,9 @@ export class ProxyResponseHandler { // 静默期 Watchdog:透传也需要支持中途卡住(无新数据推送) const idleTimeoutMs = - provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; + provider.streamingIdleTimeoutMs > 0 + ? provider.streamingIdleTimeoutMs + : Number.POSITIVE_INFINITY; let idleTimeoutId: NodeJS.Timeout | null = null; const clearIdleTimer = () => { if (idleTimeoutId) { @@ -2441,6 +2464,7 @@ export class ProxyResponseHandler { AsyncTaskManager.register(taskId, statsPromise, { taskType: "stream-passthrough-stats", abortController: statsAbortController, + staleTimeoutMs: streamTaskStaleTimeoutMs, }); statsPromise.catch((error) => { if (session.sessionId && session.shouldPersistSessionDebugArtifacts()) { @@ -2546,7 +2570,10 @@ export class ProxyResponseHandler { taskId ); const idleTimeoutMs = - provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; + provider.streamingIdleTimeoutMs > 0 + ? provider.streamingIdleTimeoutMs + : Number.POSITIVE_INFINITY; + const streamTaskStaleTimeoutMs = resolveStreamTaskStaleTimeoutMs(provider); const clientAbortDrainTimeoutMs = CLIENT_ABORT_DRAIN_MAX_MS; // 提升 idleTimeoutId 到外部作用域,以便客户端断开时能清除 @@ -3295,6 +3322,7 @@ export class ProxyResponseHandler { AsyncTaskManager.register(taskId, processingPromise, { taskType: "stream-processing", abortController, + staleTimeoutMs: streamTaskStaleTimeoutMs, }); processingPromise.catch(async (error) => { logger.error("ResponseHandler: Uncaught error in stream processing", { diff --git a/tests/unit/proxy/response-handler-client-abort-drain.test.ts b/tests/unit/proxy/response-handler-client-abort-drain.test.ts index e629c7e3a..094a8614c 100644 --- a/tests/unit/proxy/response-handler-client-abort-drain.test.ts +++ b/tests/unit/proxy/response-handler-client-abort-drain.test.ts @@ -281,6 +281,23 @@ function createResponsesSse(): Response { }); } +function createResponsesJson(): Response { + return new Response( + JSON.stringify({ + id: "resp_non_stream", + model: "gpt-5.4-mini-2026-03-17", + usage: { + input_tokens: 463, + output_tokens: 11, + }, + }), + { + status: 200, + headers: { "content-type": "application/json" }, + } + ); +} + function createOversizedResponsesSse(): Response { const oversizedDelta = "x".repeat(11 * 1024 * 1024); const body = [ @@ -663,6 +680,60 @@ describe("ProxyResponseHandler stream client abort finalization", () => { expect(snapshot.text).not.toContain("zzzzzzzzzzzzzzzz"); }); + it("does not apply the default stale cleanup when stream idle timeout is disabled", async () => { + const controller = new AbortController(); + const session = createSession(controller.signal); + session.provider.streamingIdleTimeoutMs = 0; + setDeferredStreamingFinalization(session, { + providerId: 1, + providerName: "avemujica-responses", + providerPriority: 1, + attemptNumber: 1, + totalProvidersAttempted: 1, + isFirstAttempt: true, + isFailoverSuccess: false, + endpointId: 42, + endpointUrl: "https://api.test.invalid/v1", + upstreamStatusCode: 200, + }); + + await ProxyResponseHandler.dispatch(session, createResponsesSse()); + await drainAsyncTasks(); + + const streamRegisterCall = vi.mocked(AsyncTaskManager.register).mock.calls.find((call) => { + const options = call[2] as { taskType?: string } | undefined; + return options?.taskType === "stream-processing"; + }); + + expect(streamRegisterCall).toBeDefined(); + expect(streamRegisterCall?.[2]).toEqual( + expect.objectContaining({ + staleTimeoutMs: Number.POSITIVE_INFINITY, + }) + ); + }); + + it("does not apply the default stale cleanup when non-stream request timeout is disabled", async () => { + const controller = new AbortController(); + const session = createSession(controller.signal); + session.provider.requestTimeoutNonStreamingMs = 0; + + await ProxyResponseHandler.dispatch(session, createResponsesJson()); + await drainAsyncTasks(); + + const nonStreamRegisterCall = vi.mocked(AsyncTaskManager.register).mock.calls.find((call) => { + const options = call[2] as { taskType?: string } | undefined; + return options?.taskType === "non-stream-processing"; + }); + + expect(nonStreamRegisterCall).toBeDefined(); + expect(nonStreamRegisterCall?.[2]).toEqual( + expect.objectContaining({ + staleTimeoutMs: Number.POSITIVE_INFINITY, + }) + ); + }); + it("finalizes a complete upstream responses stream as success when the downstream client already closed", async () => { const controller = new AbortController(); controller.abort(); diff --git a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts index abb28737c..da67fcf81 100644 --- a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts +++ b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts @@ -5,6 +5,7 @@ import { ProxyForwarder } from "@/app/v1/_lib/proxy/forwarder"; import { resolveEndpointPolicy } from "@/app/v1/_lib/proxy/endpoint-policy"; import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler"; import { ProxySession } from "@/app/v1/_lib/proxy/session"; +import { AsyncTaskManager } from "@/lib/async-task-manager"; import { SessionManager } from "@/lib/session-manager"; import { updateMessageRequestDetails } from "@/repository/message"; import type { Provider } from "@/types/provider"; @@ -38,10 +39,10 @@ vi.mock("@/app/v1/_lib/proxy/response-fixer", () => ({ vi.mock("@/lib/async-task-manager", () => ({ AsyncTaskManager: { - register: (_taskId: string, promise: Promise) => { + register: vi.fn((_taskId: string, promise: Promise) => { asyncTasks.push(promise); return new AbortController(); - }, + }), touch: () => true, cleanup: () => {}, cancel: () => {}, @@ -396,6 +397,48 @@ describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => { await Promise.allSettled(asyncTasks); }); + test("Gemini 流式透传禁用 idle timeout 时不应回落到默认 stale cleanup", async () => { + asyncTasks.length = 0; + vi.mocked(AsyncTaskManager.register).mockClear(); + + const provider = createProvider({ + firstByteTimeoutStreamingMs: 1000, + streamingIdleTimeoutMs: 0, + }); + const session = createSession({ + clientAbortSignal: new AbortController().signal, + messageId: 12, + userId: 22, + }); + session.setProvider(provider); + + const upstreamResponse = new Response('data: {"usageMetadata":{"promptTokenCount":1}}\n\n', { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const returned = await ( + ProxyResponseHandler as unknown as { + handleStream: (session: ProxySession, response: Response) => Promise; + } + ).handleStream(session, upstreamResponse); + + await returned.text(); + await Promise.allSettled(asyncTasks); + + const statsRegisterCall = vi.mocked(AsyncTaskManager.register).mock.calls.find((call) => { + const options = call[2] as { taskType?: string } | undefined; + return options?.taskType === "stream-passthrough-stats"; + }); + + expect(statsRegisterCall).toBeDefined(); + expect(statsRegisterCall?.[2]).toEqual( + expect.objectContaining({ + staleTimeoutMs: Number.POSITIVE_INFINITY, + }) + ); + }); + test("不应在仅收到 headers 时清除首字节超时:无首块数据时应在窗口内中断避免悬挂", async () => { asyncTasks.length = 0; const { baseUrl, close } = await startSseServer((_req, res) => {