-
-
Notifications
You must be signed in to change notification settings - Fork 358
fix(proxy): cap client abort drain window #1277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e62e3ff
c3f2bad
b19373f
3d1e139
b2aeba0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,6 +56,8 @@ import { | |
| peekDeferredStreamingFinalization, | ||
| } from "./stream-finalization"; | ||
|
|
||
| const CLIENT_ABORT_DRAIN_MAX_MS = 60_000; | ||
|
|
||
| /** | ||
| * Idempotent helper to release the agent pool reference count attached to a session. | ||
| * Prevents double-release by clearing the callback after first invocation. | ||
|
|
@@ -2300,7 +2302,7 @@ export class ProxyResponseHandler { | |
| } | ||
| } | ||
|
|
||
| // ⭐ 使用 TransformStream 包装流,以便在 idle timeout 时能关闭客户端流 | ||
| // 使用 TransformStream 包装流,以便在 idle timeout 时能关闭客户端流 | ||
| // 这解决了 tee() 后 internalStream abort 不影响 clientStream 的问题 | ||
| let streamController: TransformStreamDefaultController<Uint8Array> | null = null; | ||
| const controllableStream = processedStream.pipeThrough( | ||
|
|
@@ -2322,17 +2324,76 @@ export class ProxyResponseHandler { | |
| const abortController = new AbortController(); | ||
| const idleTimeoutMs = | ||
| provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; | ||
| const clientAbortDrainTimeoutMs = idleTimeoutMs === Infinity ? 60_000 : idleTimeoutMs; | ||
| const clientAbortDrainTimeoutMs = CLIENT_ABORT_DRAIN_MAX_MS; | ||
|
|
||
| // ⭐ 提升 idleTimeoutId 到外部作用域,以便客户端断开时能清除 | ||
| // 提升 idleTimeoutId 到外部作用域,以便客户端断开时能清除 | ||
| let idleTimeoutId: NodeJS.Timeout | null = null; | ||
| let clientAbortDrainTimeoutId: NodeJS.Timeout | null = null; | ||
| const chunks: string[] = []; | ||
| const clearClientAbortDrainTimer = () => { | ||
| if (clientAbortDrainTimeoutId) { | ||
| clearTimeout(clientAbortDrainTimeoutId); | ||
| clientAbortDrainTimeoutId = null; | ||
| } | ||
| }; | ||
| const clearIdleTimer = () => { | ||
| if (idleTimeoutId) { | ||
| clearTimeout(idleTimeoutId); | ||
| idleTimeoutId = null; | ||
| } | ||
| }; | ||
| const startIdleTimer = () => { | ||
| if (idleTimeoutMs === Infinity) return; // 禁用时跳过 | ||
| clearIdleTimer(); // 清除旧的 | ||
| idleTimeoutId = setTimeout(() => { | ||
| logger.warn("ResponseHandler: Streaming idle timeout triggered", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| idleTimeoutMs, | ||
| chunksCollected: chunks.length, | ||
| }); | ||
|
|
||
| // 1. 关闭客户端流(让客户端收到连接关闭通知,避免悬挂) | ||
| try { | ||
| if (streamController) { | ||
| streamController.error(new Error("Streaming idle timeout")); | ||
| logger.debug("ResponseHandler: Client stream closed due to idle timeout", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| }); | ||
| } | ||
| } catch (e) { | ||
| logger.warn("ResponseHandler: Failed to close client stream", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| error: e, | ||
| }); | ||
| } | ||
|
|
||
| // 2. 终止上游连接(避免资源泄漏) | ||
| try { | ||
| const sessionWithController = session as typeof session & { | ||
| responseController?: AbortController; | ||
| }; | ||
| if (sessionWithController.responseController) { | ||
| sessionWithController.responseController.abort(new Error("streaming_idle")); | ||
| logger.debug("ResponseHandler: Upstream connection aborted due to idle timeout", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| }); | ||
| } | ||
| } catch (e) { | ||
| logger.warn("ResponseHandler: Failed to abort upstream connection", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| error: e, | ||
| }); | ||
| } | ||
|
|
||
| // 3. 终止后台读取任务 | ||
| abortController.abort(new Error("streaming_idle")); | ||
| }, idleTimeoutMs); | ||
| }; | ||
| const cleanupClientAbortListener = bindClientAbortListener(session.clientAbortSignal, () => { | ||
| logger.debug("ResponseHandler: Client disconnected, cleaning up", { | ||
| taskId, | ||
|
|
@@ -2344,6 +2405,9 @@ export class ProxyResponseHandler { | |
| // still drain buffered final usage and record the request as successful. | ||
| // Idle/response timeout paths still abort via abortController. | ||
| clearClientAbortDrainTimer(); | ||
| if (!idleTimeoutId) { | ||
| startIdleTimer(); | ||
|
Comment on lines
+2408
to
+2409
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the downstream aborts before the first upstream chunk and Useful? React with 👍 / 👎.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in b2aeba0. The idle-timeout catch path now preserves clientAborted when the idle watchdog fires during a post-abort drain, so those records continue through the 499 CLIENT_ABORTED finalization path while non-client-aborted idle timeouts remain STREAM_IDLE_TIMEOUT. |
||
| } | ||
| clientAbortDrainTimeoutId = setTimeout(() => { | ||
| logger.info("ResponseHandler: Client abort drain window exceeded", { | ||
| taskId, | ||
|
|
@@ -2375,71 +2439,13 @@ export class ProxyResponseHandler { | |
| // 注意:即使 STORE_SESSION_RESPONSE_BODY=false(不写入 Redis),这里也会在内存中累积完整流内容: | ||
| // - 用于解析 usage/cost 与内部结算(例如“假 200”检测) | ||
| // 因此该开关仅影响“是否持久化”,不用于控制流式内存占用。 | ||
| const chunks: string[] = []; | ||
| let usageForCost: UsageMetrics | null = null; | ||
| let isFirstChunk = true; // ⭐ 标记是否为第一块数据 | ||
| let isFirstChunk = true; // 标记是否为第一块数据 | ||
|
|
||
| const startIdleTimer = () => { | ||
| if (idleTimeoutMs === Infinity) return; // 禁用时跳过 | ||
| clearIdleTimer(); // 清除旧的 | ||
| idleTimeoutId = setTimeout(() => { | ||
| logger.warn("ResponseHandler: Streaming idle timeout triggered", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| idleTimeoutMs, | ||
| chunksCollected: chunks.length, | ||
| }); | ||
|
|
||
| // ⭐ 1. 关闭客户端流(让客户端收到连接关闭通知,避免悬挂) | ||
| try { | ||
| if (streamController) { | ||
| streamController.error(new Error("Streaming idle timeout")); | ||
| logger.debug("ResponseHandler: Client stream closed due to idle timeout", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| }); | ||
| } | ||
| } catch (e) { | ||
| logger.warn("ResponseHandler: Failed to close client stream", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| error: e, | ||
| }); | ||
| } | ||
|
|
||
| // ⭐ 2. 终止上游连接(避免资源泄漏) | ||
| try { | ||
| const sessionWithController = session as typeof session & { | ||
| responseController?: AbortController; | ||
| }; | ||
| if (sessionWithController.responseController) { | ||
| sessionWithController.responseController.abort(new Error("streaming_idle")); | ||
| logger.debug("ResponseHandler: Upstream connection aborted due to idle timeout", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| }); | ||
| } | ||
| } catch (e) { | ||
| logger.warn("ResponseHandler: Failed to abort upstream connection", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| error: e, | ||
| }); | ||
| } | ||
|
|
||
| // ⭐ 3. 终止后台读取任务 | ||
| abortController.abort(new Error("streaming_idle")); | ||
| }, idleTimeoutMs); | ||
| }; | ||
| const clearIdleTimer = () => { | ||
| if (idleTimeoutId) { | ||
| clearTimeout(idleTimeoutId); | ||
| idleTimeoutId = null; | ||
| } | ||
| }; | ||
|
|
||
| // ⭐ 不在首次读取前启动 idle timer(避免与首字节超时职责重叠) | ||
| // idle timer 仅在首块数据到达后启动,用于检测流中途静默 | ||
| // 不在首次读取前启动 idle timer(避免与首字节超时职责重叠) | ||
| // idle timer 仅在首块数据到达后启动,用于检测流中途静默。 | ||
| // 客户端断开后例外:后台 drain 也会启动 idle timer,避免 pre-body | ||
| // 静默一直等到 60s drain 总上限。 | ||
|
|
||
| const flushAndJoin = (): string => { | ||
| const flushed = decoder.decode(); | ||
|
|
@@ -2768,7 +2774,7 @@ export class ProxyResponseHandler { | |
| const chunkSize = value.length; | ||
| chunks.push(decoder.decode(value, { stream: true })); | ||
|
|
||
| // ⭐ 每次收到数据后重置静默期计时器(首次收到数据时启动) | ||
| // 每次收到数据后重置静默期计时器(首次收到数据时启动) | ||
| startIdleTimer(); | ||
| logger.trace("ResponseHandler: Idle timer reset (data received)", { | ||
| taskId, | ||
|
|
@@ -2778,7 +2784,7 @@ export class ProxyResponseHandler { | |
| idleTimeoutMs: idleTimeoutMs === Infinity ? "disabled" : idleTimeoutMs, | ||
| }); | ||
|
|
||
| // ⭐ 流式:读到第一块数据后立即清除响应超时定时器 | ||
| // 流式:读到第一块数据后立即清除响应超时定时器 | ||
| if (isFirstChunk) { | ||
| session.recordTtfb(); | ||
| isFirstChunk = false; | ||
|
|
@@ -2797,7 +2803,7 @@ export class ProxyResponseHandler { | |
| } | ||
| } | ||
|
|
||
| // ⭐ 流式读取完成:清除静默期计时器 | ||
| // 流式读取完成:清除静默期计时器 | ||
| clearIdleTimer(); | ||
| const allContent = flushAndJoin(); | ||
| const clientAborted = session.clientAbortSignal?.aborted ?? false; | ||
|
|
@@ -2890,7 +2896,12 @@ export class ProxyResponseHandler { | |
| // 结算并消费 deferred meta,确保 provider chain/熔断归因完整 | ||
| try { | ||
| const allContent = flushAndJoin(); | ||
| await finalizeStream(allContent, false, false, "STREAM_IDLE_TIMEOUT"); | ||
| await finalizeStream( | ||
| allContent, | ||
| false, | ||
| clientAborted, | ||
| clientAborted ? "CLIENT_ABORTED" : "STREAM_IDLE_TIMEOUT" | ||
| ); | ||
| } catch (finalizeError) { | ||
| logger.error("ResponseHandler: Failed to finalize idle-timeout stream", { | ||
| taskId, | ||
|
|
@@ -3028,7 +3039,7 @@ export class ProxyResponseHandler { | |
| // 确保资源释放 | ||
| cleanupClientAbortListener(); | ||
| clearClientAbortDrainTimer(); | ||
| clearIdleTimer(); // ⭐ 清除静默期计时器(防止泄漏) | ||
| clearIdleTimer(); // 清除静默期计时器(防止泄漏) | ||
| try { | ||
| reader.releaseLock(); | ||
| } catch (releaseError) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a provider configures a short
streamingIdleTimeoutMsand the downstream disconnects after response headers but before any body chunk has started the idle timer, this unconditional 60s drain window keeps the upstream stream, accumulated state, and abort listeners alive much longer than the provider's configured timeout (previously 5s would drain for 5s). Since the change is meant to cap long drain windows rather than extend short ones, this should use the smaller of the configured idle timeout and the 60s cap.Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handled with a slightly different split: the total client-abort drain cap stays fixed at 60s, and client abort now also arms the existing idle watchdog. That preserves the 60s total drain window for active streams while still letting a short
streamingIdleTimeoutMsrelease pre-body hangs before the first chunk arrives. Added coverage for both cases: active chunks with a 5s idle timeout are allowed to drain until the 60s cap, and pre-body hangs with a 5s idle timeout are aborted by the idle watchdog.