Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 82 additions & 71 deletions src/app/v1/_lib/proxy/response-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ import {
peekDeferredStreamingFinalization,
} from "./stream-finalization";

const CLIENT_ABORT_DRAIN_MAX_MS = 60_000;

/**
* Idempotent helper to release the agent pool reference count attached to a session.
* Prevents double-release by clearing the callback after first invocation.
Expand Down Expand Up @@ -2300,7 +2302,7 @@ export class ProxyResponseHandler {
}
}

// 使用 TransformStream 包装流,以便在 idle timeout 时能关闭客户端流
// 使用 TransformStream 包装流,以便在 idle timeout 时能关闭客户端流
// 这解决了 tee() 后 internalStream abort 不影响 clientStream 的问题
let streamController: TransformStreamDefaultController<Uint8Array> | null = null;
const controllableStream = processedStream.pipeThrough(
Expand All @@ -2322,17 +2324,76 @@ export class ProxyResponseHandler {
const abortController = new AbortController();
const idleTimeoutMs =
provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity;
const clientAbortDrainTimeoutMs = idleTimeoutMs === Infinity ? 60_000 : idleTimeoutMs;
const clientAbortDrainTimeoutMs = CLIENT_ABORT_DRAIN_MAX_MS;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve shorter client-abort drain windows

When a provider configures a short streamingIdleTimeoutMs and the downstream disconnects after response headers but before any body chunk has started the idle timer, this unconditional 60s drain window keeps the upstream stream, accumulated state, and abort listeners alive much longer than the provider's configured timeout (previously 5s would drain for 5s). Since the change is meant to cap long drain windows rather than extend short ones, this should use the smaller of the configured idle timeout and the 60s cap.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled with a slightly different split: the total client-abort drain cap stays fixed at 60s, and client abort now also arms the existing idle watchdog. That preserves the 60s total drain window for active streams while still letting a short streamingIdleTimeoutMs release pre-body hangs before the first chunk arrives. Added coverage for both cases: active chunks with a 5s idle timeout are allowed to drain until the 60s cap, and pre-body hangs with a 5s idle timeout are aborted by the idle watchdog.


// 提升 idleTimeoutId 到外部作用域,以便客户端断开时能清除
// 提升 idleTimeoutId 到外部作用域,以便客户端断开时能清除
let idleTimeoutId: NodeJS.Timeout | null = null;
let clientAbortDrainTimeoutId: NodeJS.Timeout | null = null;
const chunks: string[] = [];
const clearClientAbortDrainTimer = () => {
if (clientAbortDrainTimeoutId) {
clearTimeout(clientAbortDrainTimeoutId);
clientAbortDrainTimeoutId = null;
}
};
const clearIdleTimer = () => {
if (idleTimeoutId) {
clearTimeout(idleTimeoutId);
idleTimeoutId = null;
}
};
const startIdleTimer = () => {
if (idleTimeoutMs === Infinity) return; // 禁用时跳过
clearIdleTimer(); // 清除旧的
idleTimeoutId = setTimeout(() => {
logger.warn("ResponseHandler: Streaming idle timeout triggered", {
taskId,
providerId: provider.id,
idleTimeoutMs,
chunksCollected: chunks.length,
});

// 1. 关闭客户端流(让客户端收到连接关闭通知,避免悬挂)
try {
if (streamController) {
streamController.error(new Error("Streaming idle timeout"));
logger.debug("ResponseHandler: Client stream closed due to idle timeout", {
taskId,
providerId: provider.id,
});
}
} catch (e) {
logger.warn("ResponseHandler: Failed to close client stream", {
taskId,
providerId: provider.id,
error: e,
});
}

// 2. 终止上游连接(避免资源泄漏)
try {
const sessionWithController = session as typeof session & {
responseController?: AbortController;
};
if (sessionWithController.responseController) {
sessionWithController.responseController.abort(new Error("streaming_idle"));
logger.debug("ResponseHandler: Upstream connection aborted due to idle timeout", {
taskId,
providerId: provider.id,
});
}
} catch (e) {
logger.warn("ResponseHandler: Failed to abort upstream connection", {
taskId,
providerId: provider.id,
error: e,
});
}

// 3. 终止后台读取任务
abortController.abort(new Error("streaming_idle"));
}, idleTimeoutMs);
};
const cleanupClientAbortListener = bindClientAbortListener(session.clientAbortSignal, () => {
logger.debug("ResponseHandler: Client disconnected, cleaning up", {
taskId,
Expand All @@ -2344,6 +2405,9 @@ export class ProxyResponseHandler {
// still drain buffered final usage and record the request as successful.
// Idle/response timeout paths still abort via abortController.
clearClientAbortDrainTimer();
if (!idleTimeoutId) {
startIdleTimer();
Comment on lines +2408 to +2409

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve client-abort status when idle drain fires

When the downstream aborts before the first upstream chunk and streamingIdleTimeoutMs is finite, this new call arms the normal idle watchdog during the client-abort drain. If that watchdog fires, the existing isIdleTimeout catch branch runs before the client-abort branch and calls finalizeStream(..., false, false, "STREAM_IDLE_TIMEOUT"), so an already-disconnected client is persisted and accounted as a provider/idle failure (502 / STREAM_IDLE_TIMEOUT) instead of the intended 499 CLIENT_ABORTED path; the new pre-body test even expects the 499 behavior. Please preserve the client-aborted classification when this post-abort idle timer terminates the drain.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b2aeba0. The idle-timeout catch path now preserves clientAborted when the idle watchdog fires during a post-abort drain, so those records continue through the 499 CLIENT_ABORTED finalization path while non-client-aborted idle timeouts remain STREAM_IDLE_TIMEOUT.

}
clientAbortDrainTimeoutId = setTimeout(() => {
logger.info("ResponseHandler: Client abort drain window exceeded", {
taskId,
Expand Down Expand Up @@ -2375,71 +2439,13 @@ export class ProxyResponseHandler {
// 注意:即使 STORE_SESSION_RESPONSE_BODY=false(不写入 Redis),这里也会在内存中累积完整流内容:
// - 用于解析 usage/cost 与内部结算(例如“假 200”检测)
// 因此该开关仅影响“是否持久化”,不用于控制流式内存占用。
const chunks: string[] = [];
let usageForCost: UsageMetrics | null = null;
let isFirstChunk = true; // 标记是否为第一块数据
let isFirstChunk = true; // 标记是否为第一块数据

const startIdleTimer = () => {
if (idleTimeoutMs === Infinity) return; // 禁用时跳过
clearIdleTimer(); // 清除旧的
idleTimeoutId = setTimeout(() => {
logger.warn("ResponseHandler: Streaming idle timeout triggered", {
taskId,
providerId: provider.id,
idleTimeoutMs,
chunksCollected: chunks.length,
});

// ⭐ 1. 关闭客户端流(让客户端收到连接关闭通知,避免悬挂)
try {
if (streamController) {
streamController.error(new Error("Streaming idle timeout"));
logger.debug("ResponseHandler: Client stream closed due to idle timeout", {
taskId,
providerId: provider.id,
});
}
} catch (e) {
logger.warn("ResponseHandler: Failed to close client stream", {
taskId,
providerId: provider.id,
error: e,
});
}

// ⭐ 2. 终止上游连接(避免资源泄漏)
try {
const sessionWithController = session as typeof session & {
responseController?: AbortController;
};
if (sessionWithController.responseController) {
sessionWithController.responseController.abort(new Error("streaming_idle"));
logger.debug("ResponseHandler: Upstream connection aborted due to idle timeout", {
taskId,
providerId: provider.id,
});
}
} catch (e) {
logger.warn("ResponseHandler: Failed to abort upstream connection", {
taskId,
providerId: provider.id,
error: e,
});
}

// ⭐ 3. 终止后台读取任务
abortController.abort(new Error("streaming_idle"));
}, idleTimeoutMs);
};
const clearIdleTimer = () => {
if (idleTimeoutId) {
clearTimeout(idleTimeoutId);
idleTimeoutId = null;
}
};

// ⭐ 不在首次读取前启动 idle timer(避免与首字节超时职责重叠)
// idle timer 仅在首块数据到达后启动,用于检测流中途静默
// 不在首次读取前启动 idle timer(避免与首字节超时职责重叠)
// idle timer 仅在首块数据到达后启动,用于检测流中途静默。
// 客户端断开后例外:后台 drain 也会启动 idle timer,避免 pre-body
// 静默一直等到 60s drain 总上限。

const flushAndJoin = (): string => {
const flushed = decoder.decode();
Expand Down Expand Up @@ -2768,7 +2774,7 @@ export class ProxyResponseHandler {
const chunkSize = value.length;
chunks.push(decoder.decode(value, { stream: true }));

// 每次收到数据后重置静默期计时器(首次收到数据时启动)
// 每次收到数据后重置静默期计时器(首次收到数据时启动)
startIdleTimer();
logger.trace("ResponseHandler: Idle timer reset (data received)", {
taskId,
Expand All @@ -2778,7 +2784,7 @@ export class ProxyResponseHandler {
idleTimeoutMs: idleTimeoutMs === Infinity ? "disabled" : idleTimeoutMs,
});

// 流式:读到第一块数据后立即清除响应超时定时器
// 流式:读到第一块数据后立即清除响应超时定时器
if (isFirstChunk) {
session.recordTtfb();
isFirstChunk = false;
Expand All @@ -2797,7 +2803,7 @@ export class ProxyResponseHandler {
}
}

// 流式读取完成:清除静默期计时器
// 流式读取完成:清除静默期计时器
clearIdleTimer();
const allContent = flushAndJoin();
const clientAborted = session.clientAbortSignal?.aborted ?? false;
Expand Down Expand Up @@ -2890,7 +2896,12 @@ export class ProxyResponseHandler {
// 结算并消费 deferred meta,确保 provider chain/熔断归因完整
try {
const allContent = flushAndJoin();
await finalizeStream(allContent, false, false, "STREAM_IDLE_TIMEOUT");
await finalizeStream(
allContent,
false,
clientAborted,
clientAborted ? "CLIENT_ABORTED" : "STREAM_IDLE_TIMEOUT"
);
} catch (finalizeError) {
logger.error("ResponseHandler: Failed to finalize idle-timeout stream", {
taskId,
Expand Down Expand Up @@ -3028,7 +3039,7 @@ export class ProxyResponseHandler {
// 确保资源释放
cleanupClientAbortListener();
clearClientAbortDrainTimer();
clearIdleTimer(); // 清除静默期计时器(防止泄漏)
clearIdleTimer(); // 清除静默期计时器(防止泄漏)
try {
reader.releaseLock();
} catch (releaseError) {
Expand Down
Loading
Loading