diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index e4cb75244..cad152770 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -56,6 +56,8 @@ import { peekDeferredStreamingFinalization, } from "./stream-finalization"; +const CLIENT_ABORT_DRAIN_MAX_MS = 60_000; + /** * Idempotent helper to release the agent pool reference count attached to a session. * Prevents double-release by clearing the callback after first invocation. @@ -2300,7 +2302,7 @@ export class ProxyResponseHandler { } } - // ⭐ 使用 TransformStream 包装流,以便在 idle timeout 时能关闭客户端流 + // 使用 TransformStream 包装流,以便在 idle timeout 时能关闭客户端流 // 这解决了 tee() 后 internalStream abort 不影响 clientStream 的问题 let streamController: TransformStreamDefaultController | null = null; const controllableStream = processedStream.pipeThrough( @@ -2322,17 +2324,76 @@ export class ProxyResponseHandler { const abortController = new AbortController(); const idleTimeoutMs = provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; - const clientAbortDrainTimeoutMs = idleTimeoutMs === Infinity ? 60_000 : idleTimeoutMs; + const clientAbortDrainTimeoutMs = CLIENT_ABORT_DRAIN_MAX_MS; - // ⭐ 提升 idleTimeoutId 到外部作用域,以便客户端断开时能清除 + // 提升 idleTimeoutId 到外部作用域,以便客户端断开时能清除 let idleTimeoutId: NodeJS.Timeout | null = null; let clientAbortDrainTimeoutId: NodeJS.Timeout | null = null; + const chunks: string[] = []; const clearClientAbortDrainTimer = () => { if (clientAbortDrainTimeoutId) { clearTimeout(clientAbortDrainTimeoutId); clientAbortDrainTimeoutId = null; } }; + const clearIdleTimer = () => { + if (idleTimeoutId) { + clearTimeout(idleTimeoutId); + idleTimeoutId = null; + } + }; + const startIdleTimer = () => { + if (idleTimeoutMs === Infinity) return; // 禁用时跳过 + clearIdleTimer(); // 清除旧的 + idleTimeoutId = setTimeout(() => { + logger.warn("ResponseHandler: Streaming idle timeout triggered", { + taskId, + providerId: provider.id, + idleTimeoutMs, + chunksCollected: chunks.length, + }); + + // 1. 关闭客户端流(让客户端收到连接关闭通知,避免悬挂) + try { + if (streamController) { + streamController.error(new Error("Streaming idle timeout")); + logger.debug("ResponseHandler: Client stream closed due to idle timeout", { + taskId, + providerId: provider.id, + }); + } + } catch (e) { + logger.warn("ResponseHandler: Failed to close client stream", { + taskId, + providerId: provider.id, + error: e, + }); + } + + // 2. 终止上游连接(避免资源泄漏) + try { + const sessionWithController = session as typeof session & { + responseController?: AbortController; + }; + if (sessionWithController.responseController) { + sessionWithController.responseController.abort(new Error("streaming_idle")); + logger.debug("ResponseHandler: Upstream connection aborted due to idle timeout", { + taskId, + providerId: provider.id, + }); + } + } catch (e) { + logger.warn("ResponseHandler: Failed to abort upstream connection", { + taskId, + providerId: provider.id, + error: e, + }); + } + + // 3. 终止后台读取任务 + abortController.abort(new Error("streaming_idle")); + }, idleTimeoutMs); + }; const cleanupClientAbortListener = bindClientAbortListener(session.clientAbortSignal, () => { logger.debug("ResponseHandler: Client disconnected, cleaning up", { taskId, @@ -2344,6 +2405,9 @@ export class ProxyResponseHandler { // still drain buffered final usage and record the request as successful. // Idle/response timeout paths still abort via abortController. clearClientAbortDrainTimer(); + if (!idleTimeoutId) { + startIdleTimer(); + } clientAbortDrainTimeoutId = setTimeout(() => { logger.info("ResponseHandler: Client abort drain window exceeded", { taskId, @@ -2375,71 +2439,13 @@ export class ProxyResponseHandler { // 注意:即使 STORE_SESSION_RESPONSE_BODY=false(不写入 Redis),这里也会在内存中累积完整流内容: // - 用于解析 usage/cost 与内部结算(例如“假 200”检测) // 因此该开关仅影响“是否持久化”,不用于控制流式内存占用。 - const chunks: string[] = []; let usageForCost: UsageMetrics | null = null; - let isFirstChunk = true; // ⭐ 标记是否为第一块数据 + let isFirstChunk = true; // 标记是否为第一块数据 - const startIdleTimer = () => { - if (idleTimeoutMs === Infinity) return; // 禁用时跳过 - clearIdleTimer(); // 清除旧的 - idleTimeoutId = setTimeout(() => { - logger.warn("ResponseHandler: Streaming idle timeout triggered", { - taskId, - providerId: provider.id, - idleTimeoutMs, - chunksCollected: chunks.length, - }); - - // ⭐ 1. 关闭客户端流(让客户端收到连接关闭通知,避免悬挂) - try { - if (streamController) { - streamController.error(new Error("Streaming idle timeout")); - logger.debug("ResponseHandler: Client stream closed due to idle timeout", { - taskId, - providerId: provider.id, - }); - } - } catch (e) { - logger.warn("ResponseHandler: Failed to close client stream", { - taskId, - providerId: provider.id, - error: e, - }); - } - - // ⭐ 2. 终止上游连接(避免资源泄漏) - try { - const sessionWithController = session as typeof session & { - responseController?: AbortController; - }; - if (sessionWithController.responseController) { - sessionWithController.responseController.abort(new Error("streaming_idle")); - logger.debug("ResponseHandler: Upstream connection aborted due to idle timeout", { - taskId, - providerId: provider.id, - }); - } - } catch (e) { - logger.warn("ResponseHandler: Failed to abort upstream connection", { - taskId, - providerId: provider.id, - error: e, - }); - } - - // ⭐ 3. 终止后台读取任务 - abortController.abort(new Error("streaming_idle")); - }, idleTimeoutMs); - }; - const clearIdleTimer = () => { - if (idleTimeoutId) { - clearTimeout(idleTimeoutId); - idleTimeoutId = null; - } - }; - - // ⭐ 不在首次读取前启动 idle timer(避免与首字节超时职责重叠) - // idle timer 仅在首块数据到达后启动,用于检测流中途静默 + // 不在首次读取前启动 idle timer(避免与首字节超时职责重叠) + // idle timer 仅在首块数据到达后启动,用于检测流中途静默。 + // 客户端断开后例外:后台 drain 也会启动 idle timer,避免 pre-body + // 静默一直等到 60s drain 总上限。 const flushAndJoin = (): string => { const flushed = decoder.decode(); @@ -2768,7 +2774,7 @@ export class ProxyResponseHandler { const chunkSize = value.length; chunks.push(decoder.decode(value, { stream: true })); - // ⭐ 每次收到数据后重置静默期计时器(首次收到数据时启动) + // 每次收到数据后重置静默期计时器(首次收到数据时启动) startIdleTimer(); logger.trace("ResponseHandler: Idle timer reset (data received)", { taskId, @@ -2778,7 +2784,7 @@ export class ProxyResponseHandler { idleTimeoutMs: idleTimeoutMs === Infinity ? "disabled" : idleTimeoutMs, }); - // ⭐ 流式:读到第一块数据后立即清除响应超时定时器 + // 流式:读到第一块数据后立即清除响应超时定时器 if (isFirstChunk) { session.recordTtfb(); isFirstChunk = false; @@ -2797,7 +2803,7 @@ export class ProxyResponseHandler { } } - // ⭐ 流式读取完成:清除静默期计时器 + // 流式读取完成:清除静默期计时器 clearIdleTimer(); const allContent = flushAndJoin(); const clientAborted = session.clientAbortSignal?.aborted ?? false; @@ -2890,7 +2896,12 @@ export class ProxyResponseHandler { // 结算并消费 deferred meta,确保 provider chain/熔断归因完整 try { const allContent = flushAndJoin(); - await finalizeStream(allContent, false, false, "STREAM_IDLE_TIMEOUT"); + await finalizeStream( + allContent, + false, + clientAborted, + clientAborted ? "CLIENT_ABORTED" : "STREAM_IDLE_TIMEOUT" + ); } catch (finalizeError) { logger.error("ResponseHandler: Failed to finalize idle-timeout stream", { taskId, @@ -3028,7 +3039,7 @@ export class ProxyResponseHandler { // 确保资源释放 cleanupClientAbortListener(); clearClientAbortDrainTimer(); - clearIdleTimer(); // ⭐ 清除静默期计时器(防止泄漏) + clearIdleTimer(); // 清除静默期计时器(防止泄漏) try { reader.releaseLock(); } catch (releaseError) { diff --git a/tests/unit/proxy/response-handler-client-abort-drain.test.ts b/tests/unit/proxy/response-handler-client-abort-drain.test.ts index 8f3efa29b..7ce1d7f77 100644 --- a/tests/unit/proxy/response-handler-client-abort-drain.test.ts +++ b/tests/unit/proxy/response-handler-client-abort-drain.test.ts @@ -327,6 +327,65 @@ function createHangingResponsesSse(upstreamSignal: AbortSignal): Response { }); } +function createPreBodyHangingResponsesSse(upstreamSignal: AbortSignal): Response { + const stream = new ReadableStream({ + start(controller) { + upstreamSignal.addEventListener( + "abort", + () => { + const error = new Error("streaming_idle"); + error.name = "AbortError"; + controller.error(error); + }, + { once: true } + ); + }, + }); + + return new Response(stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); +} + +function createActiveHangingResponsesSse(upstreamSignal: AbortSignal): Response { + const encoder = new TextEncoder(); + let index = 0; + let intervalId: ReturnType | null = null; + + const encodeChunk = (delta: string) => + encoder.encode( + `event: response.output_text.delta\ndata: ${JSON.stringify({ + type: "response.output_text.delta", + delta, + })}\n\n` + ); + + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encodeChunk("短")); + intervalId = setInterval(() => { + controller.enqueue(encodeChunk(`持续-${++index}`)); + }, 4_000); + upstreamSignal.addEventListener( + "abort", + () => { + if (intervalId) clearInterval(intervalId); + const error = new Error("client_abort_drain_timeout"); + error.name = "AbortError"; + controller.error(error); + }, + { once: true } + ); + }, + }); + + return new Response(stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); +} + function createCompletedThenErroredResponsesSse(): Response { const encoder = new TextEncoder(); const chunks = [ @@ -642,12 +701,157 @@ describe("ProxyResponseHandler stream client abort finalization", () => { ); }); - it("bounds client-abort drain when the upstream stream hangs", async () => { + it("keeps client-abort drain independent from a small idle timeout while chunks are active", async () => { + vi.useFakeTimers(); + try { + const clientController = new AbortController(); + const upstreamController = new AbortController(); + const session = createSession(clientController.signal); + session.provider.streamingIdleTimeoutMs = 5_000; + Object.assign(session, { responseController: upstreamController }); + setDeferredStreamingFinalization(session, { + providerId: 1, + providerName: "avemujica-responses", + providerPriority: 1, + attemptNumber: 1, + totalProvidersAttempted: 1, + isFirstAttempt: true, + isFailoverSuccess: false, + endpointId: 42, + endpointUrl: "https://api.test.invalid/v1", + upstreamStatusCode: 200, + }); + + await ProxyResponseHandler.dispatch( + session, + createActiveHangingResponsesSse(upstreamController.signal) + ); + clientController.abort(); + + await vi.advanceTimersByTimeAsync(59_000); + expect(upstreamController.signal.aborted).toBe(false); + + await vi.advanceTimersByTimeAsync(1_000); + const tasks = asyncTasks.splice(0, asyncTasks.length); + await Promise.allSettled(tasks); + + expect(upstreamController.signal.aborted).toBe(true); + expect(AsyncTaskManager.cancel).not.toHaveBeenCalled(); + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 499, + errorMessage: "CLIENT_ABORTED", + }) + ); + } finally { + vi.useRealTimers(); + } + }); + + it("uses idle timeout for client-aborted streams that hang before the first chunk", async () => { + vi.useFakeTimers(); + try { + const clientController = new AbortController(); + const upstreamController = new AbortController(); + const session = createSession(clientController.signal); + session.provider.streamingIdleTimeoutMs = 5_000; + Object.assign(session, { responseController: upstreamController }); + setDeferredStreamingFinalization(session, { + providerId: 1, + providerName: "avemujica-responses", + providerPriority: 1, + attemptNumber: 1, + totalProvidersAttempted: 1, + isFirstAttempt: true, + isFailoverSuccess: false, + endpointId: 42, + endpointUrl: "https://api.test.invalid/v1", + upstreamStatusCode: 200, + }); + + await ProxyResponseHandler.dispatch( + session, + createPreBodyHangingResponsesSse(upstreamController.signal) + ); + clientController.abort(); + + await vi.advanceTimersByTimeAsync(4_999); + expect(upstreamController.signal.aborted).toBe(false); + + await vi.advanceTimersByTimeAsync(1); + const tasks = asyncTasks.splice(0, asyncTasks.length); + await Promise.allSettled(tasks); + + expect(upstreamController.signal.aborted).toBe(true); + expect(AsyncTaskManager.cancel).not.toHaveBeenCalled(); + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 499, + errorMessage: "CLIENT_ABORTED", + }) + ); + } finally { + vi.useRealTimers(); + } + }); + + it("preserves an existing idle deadline when the client aborts after a chunk", async () => { + vi.useFakeTimers(); + try { + const clientController = new AbortController(); + const upstreamController = new AbortController(); + const session = createSession(clientController.signal); + session.provider.streamingIdleTimeoutMs = 5_000; + Object.assign(session, { responseController: upstreamController }); + setDeferredStreamingFinalization(session, { + providerId: 1, + providerName: "avemujica-responses", + providerPriority: 1, + attemptNumber: 1, + totalProvidersAttempted: 1, + isFirstAttempt: true, + isFailoverSuccess: false, + endpointId: 42, + endpointUrl: "https://api.test.invalid/v1", + upstreamStatusCode: 200, + }); + + await ProxyResponseHandler.dispatch( + session, + createHangingResponsesSse(upstreamController.signal) + ); + await vi.advanceTimersByTimeAsync(0); + await vi.advanceTimersByTimeAsync(4_999); + expect(upstreamController.signal.aborted).toBe(false); + + clientController.abort(); + await vi.advanceTimersByTimeAsync(1); + const tasks = asyncTasks.splice(0, asyncTasks.length); + await Promise.allSettled(tasks); + + expect(upstreamController.signal.aborted).toBe(true); + expect(AsyncTaskManager.cancel).not.toHaveBeenCalled(); + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 499, + errorMessage: "CLIENT_ABORTED", + }) + ); + } finally { + vi.useRealTimers(); + } + }); + + it("caps client-abort drain at 60s when the upstream stream hangs", async () => { vi.useFakeTimers(); try { const clientController = new AbortController(); const upstreamController = new AbortController(); const session = createSession(clientController.signal); + session.provider.streamingIdleTimeoutMs = 120_000; Object.assign(session, { responseController: upstreamController }); setDeferredStreamingFinalization(session, { providerId: 1,