From 05653b80ff8de411757a536179df6e58375099da Mon Sep 17 00:00:00 2001 From: Baily Date: Sat, 25 Apr 2026 16:15:31 -0400 Subject: [PATCH] fix: recover from canceled cascade panel transport --- src/client.js | 52 ++++++++++++++++++++++++++---- src/handlers/chat.js | 67 ++++++++++++++++++++------------------- test/stream-error.test.js | 29 ++++++++++++++++- 3 files changed, 108 insertions(+), 40 deletions(-) diff --git a/src/client.js b/src/client.js index 7c5e903..db32612 100644 --- a/src/client.js +++ b/src/client.js @@ -12,7 +12,7 @@ import { existsSync, mkdirSync, writeFileSync } from 'fs'; import { execSync } from 'child_process'; import { log } from './config.js'; import { extractImages } from './image.js'; -import { grpcFrame, grpcUnary, grpcStream } from './grpc.js'; +import { closeSessionForPort, grpcFrame, grpcUnary, grpcStream } from './grpc.js'; import { getLsEntryByPort } from './langserver.js'; import { buildRawGetChatMessageRequest, parseRawResponse, @@ -29,6 +29,28 @@ import { const LS_SERVICE = '/exa.language_server_pb.LanguageServerService'; +export function isCascadeTransportError(err) { + const msg = String(err?.message || err || ''); + return /pending stream has been canceled|ECONNRESET|ERR_HTTP2|session closed|stream closed|panel state/i.test(msg); +} + +function markCascadeTransportError(err) { + if (!err || typeof err !== 'object') return err; + err.isModelError = true; + err.kind = 'transient_stall'; + err.isCascadeTransportError = true; + return err; +} + +function resetCascadeTransportState(port) { + // Cascade warmup 的 HTTP/2 取消代表当前 LS 会话不可靠,清掉复用状态后让下一次请求重新建会话。 + closeSessionForPort(port); + const lsEntry = getLsEntryByPort(port); + if (!lsEntry) return; + lsEntry.workspaceInit = null; + lsEntry.sessionId = null; +} + function isImageLikeBlock(part) { const type = String(part?.type || '').toLowerCase(); return type === 'image' || type === 'image_url' || type === 'input_image' @@ -300,23 +322,30 @@ export class WindsurfClient { const workspacePath = `/home/user/projects/workspace-${wsId}`; const workspaceUri = `file://${workspacePath}`; + const handleWarmupError = (stage, err) => { + log.warn(`${stage}: ${err.message}`); + if (!isCascadeTransportError(err)) return; + resetCascadeTransportState(this.port); + throw markCascadeTransportError(new Error(`${stage}: ${err.message}`)); + }; + lsEntry.workspaceInit = (async () => { try { const initProto = buildInitializePanelStateRequest(this.apiKey, sessionId); await grpcUnary(this.port, this.csrfToken, `${LS_SERVICE}/InitializeCascadePanelState`, grpcFrame(initProto), 5000); - } catch (e) { log.warn(`InitializeCascadePanelState: ${e.message}`); } + } catch (e) { handleWarmupError('InitializeCascadePanelState', e); } try { ensureWorkspaceDir(workspacePath); const addWsProto = buildAddTrackedWorkspaceRequest(workspacePath); await grpcUnary(this.port, this.csrfToken, `${LS_SERVICE}/AddTrackedWorkspace`, grpcFrame(addWsProto), 5000); - } catch (e) { log.warn(`AddTrackedWorkspace: ${e.message}`); } + } catch (e) { handleWarmupError('AddTrackedWorkspace', e); } try { const trustProto = buildUpdateWorkspaceTrustRequest(this.apiKey, workspaceUri, true, sessionId); await grpcUnary(this.port, this.csrfToken, `${LS_SERVICE}/UpdateWorkspaceTrust`, grpcFrame(trustProto), 5000); - } catch (e) { log.warn(`UpdateWorkspaceTrust: ${e.message}`); } + } catch (e) { handleWarmupError('UpdateWorkspaceTrust', e); } log.info(`Cascade workspace init complete for LS port=${this.port}`); })().catch(e => { lsEntry.workspaceInit = null; @@ -349,7 +378,7 @@ export class WindsurfClient { // One-shot per-LS workspace init (idempotent; typically pre-warmed at // LS startup). Falls back to a local session id if the LS entry is gone. const lsEntry = getLsEntryByPort(this.port); - await this.warmupCascade().catch(() => {}); + await this.warmupCascade(); let sessionId = reuseEntry?.sessionId || lsEntry?.sessionId || randomUUID(); // "panel state not found" means the LS forgot the panel for our sessionId @@ -379,7 +408,7 @@ export class WindsurfClient { } catch (e) { if (!isPanelMissing(e)) throw e; log.warn(`Panel state missing, re-warming LS port=${this.port}`); - await this.warmupCascade(true).catch(() => {}); + await this.warmupCascade(true); sessionId = getLsEntryByPort(this.port)?.sessionId || randomUUID(); reuseEntry = null; // cascade expired — treat as fresh cascadeId = await openCascade(); @@ -531,7 +560,12 @@ export class WindsurfClient { await rebuildFullHistoryText(); historyRebuilt = true; } - await this.warmupCascade(true).catch(err => log.warn(`warmupCascade failed: ${err.message}`)); + try { + await this.warmupCascade(true); + } catch (err) { + if (isCascadeTransportError(err)) throw err; + log.warn(`warmupCascade failed: ${err.message}`); + } // Small backoff — LS panel state sometimes needs a moment after Init if (panelRetry > 1) await new Promise(r => setTimeout(r, 250 * panelRetry)); sessionId = getLsEntryByPort(this.port)?.sessionId || randomUUID(); @@ -894,6 +928,10 @@ export class WindsurfClient { return chunks; } catch (err) { + if (isCascadeTransportError(err)) { + resetCascadeTransportState(this.port); + markCascadeTransportError(err); + } onError?.(err); throw err; } diff --git a/src/handlers/chat.js b/src/handlers/chat.js index 121c437..a4810cb 100644 --- a/src/handlers/chat.js +++ b/src/handlers/chat.js @@ -4,7 +4,7 @@ */ import { createHash, randomUUID } from 'crypto'; -import { WindsurfClient } from '../client.js'; +import { WindsurfClient, isCascadeTransportError } from '../client.js'; import { getApiKey, acquireAccountByKey, releaseAccount, getAccountAvailability, reportError, reportSuccess, markRateLimited, reportInternalError, updateCapability, getAccountList, isAllRateLimited } from '../auth.js'; import { resolveModel, getModelInfo } from '../models.js'; import { getLsFor, ensureLs } from '../langserver.js'; @@ -43,8 +43,15 @@ async function internalErrorBackoff(retryIdx) { return ms; } -function upstreamTransientErrorMessage(model, triedCount) { - return `${model} 上游 Windsurf Cascade 服务瞬态故障:已在 ${triedCount} 个账号上重试都收到 internal_error。这是上游服务端的瞬时问题,反代无法绕过,建议 30-60 秒后重试。`; +function upstreamTransientErrorMessage(model, triedCount, reason = 'internal_error') { + const detail = reason === 'cascade_transport' + ? 'Cascade/语言服务器 HTTP/2 流被取消' + : 'internal_error'; + return `${model} 上游 Windsurf Cascade 服务瞬态故障:已在 ${triedCount} 个账号上重试都收到 ${detail}。这是上游或本地语言服务器会话的瞬时问题,建议 30-60 秒后重试;若连续出现,请重启语言服务器。`; +} + +export function isUpstreamTransientError(err, isInternal = false) { + return !!err && (isInternal || err.kind === 'transient_stall' || isCascadeTransportError(err)); } function shortHash(text) { @@ -831,14 +838,11 @@ export async function handleChatCompletions(body, context = {}) { log.warn(`Account ${acct.email} rate-limited on ${displayModel}, trying next account`); continue; } - // Upstream Cascade transient (internal error occurred): back off - // before falling over so the next account doesn't hit the same hot - // upstream window. Counts toward internalCount for final-error - // classification below. - if (errType === 'upstream_internal_error') { + // Cascade transient 错误通常是上游或本地 LS 短暂抖动,先退避再切账号,避免连续打爆同一热窗口。 + if (errType === 'upstream_internal_error' || errType === 'upstream_transient_error') { internalCount++; const backoffMs = await internalErrorBackoff(internalCount - 1); - log.warn(`Chat[${reqId}]: ${acct.email} upstream internal_error, waited ${backoffMs}ms before next account`); + log.warn(`Chat[${reqId}]: ${acct.email} upstream transient error, waited ${backoffMs}ms before next account`); continue; } // Model not available on this account (permission_denied, etc.) @@ -854,21 +858,17 @@ export async function handleChatCompletions(body, context = {}) { if (acct) releaseAccount(acct.apiKey); } } - // If every attempt (or every-attempt-since-the-last-success) hit - // upstream_internal_error, the proxy's account-rotation can't fix it — - // it's a Cascade server-side transient. Surface a clean message that - // tells the caller "this is upstream, retry shortly" instead of the - // misleading "rate limit" / "model not available" they'd otherwise - // see from lastErr. + // 所有账号都遇到 Cascade transient 时,账号轮换已经无法修复;返回明确错误,避免误报成限流或模型不可用。 if (internalCount > 0 && tried.length > 0 && internalCount >= tried.length) { if (checkedOutReuseEntry && fpBefore) { poolCheckin(fpBefore, checkedOutReuseEntry, callerKey); log.info(`Chat[${reqId}]: restored checked-out cascade after all-internal-error chain`); } - log.error(`Chat[${reqId}]: ${tried.length}/${tried.length} accounts hit upstream_internal_error — surfacing upstream_transient_error`); + const lastIsTransport = isCascadeTransportError(lastErr); + log.error(`Chat[${reqId}]: ${tried.length}/${tried.length} accounts hit upstream transient error — surfacing upstream_transient_error`); return { status: 502, - body: { error: { message: upstreamTransientErrorMessage(displayModel, tried.length), type: 'upstream_transient_error' } }, + body: { error: { message: upstreamTransientErrorMessage(displayModel, tried.length, lastIsTransport ? 'cascade_transport' : 'internal_error'), type: 'upstream_transient_error' } }, }; } // If all accounts exhausted, check if it's because they're all rate-limited @@ -1008,9 +1008,12 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages, const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message); const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message); const isInternal = /internal error occurred.*error id/i.test(err.message); + const isTransport = isCascadeTransportError(err); + const isTransient = isUpstreamTransientError(err, isInternal); if (isAuthFail) reportError(apiKey); if (isRateLimit) { markRateLimited(apiKey, rateLimitCooldownMs(err.message), modelKey); err.isRateLimit = true; err.isModelError = true; err.kind ||= 'model_error'; } if (isInternal) { reportInternalError(apiKey); err.isModelError = true; err.kind ||= 'transient_stall'; } + if (isTransport) { err.isModelError = true; err.kind ||= 'transient_stall'; } if (err.isModelError && err.kind !== 'transient_stall' && !isRateLimit && !isInternal) { updateCapability(apiKey, modelKey, false, 'model_error'); } @@ -1046,10 +1049,12 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages, } } return { - status: isInternal ? 502 : (err.isModelError ? 403 : 502), + status: isTransient ? 502 : (err.isModelError ? 403 : 502), body: { error: { - message: sanitizeText(err.message), - type: isInternal ? 'upstream_internal_error' : (err.isModelError ? 'model_not_available' : 'upstream_error'), + message: isTransient + ? upstreamTransientErrorMessage(model, 1, isTransport ? 'cascade_transport' : 'internal_error') + : sanitizeText(err.message), + type: isTransient ? 'upstream_transient_error' : (err.isModelError ? 'model_not_available' : 'upstream_error'), } }, }; } @@ -1405,9 +1410,12 @@ function streamResponse(id, created, model, modelKey, messages, cascadeMessages, const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message); const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message); const isInternal = /internal error occurred.*error id/i.test(err.message); + const isTransport = isCascadeTransportError(err); + const isTransient = isUpstreamTransientError(err, isInternal); if (isAuthFail) reportError(currentApiKey); if (isRateLimit) { markRateLimited(currentApiKey, rateLimitCooldownMs(err.message), modelKey); err.isRateLimit = true; err.isModelError = true; err.kind ||= 'model_error'; } if (isInternal) { reportInternalError(currentApiKey); err.isModelError = true; err.kind ||= 'transient_stall'; } + if (isTransport) { err.isModelError = true; err.kind ||= 'transient_stall'; } if (err.isModelError && err.kind !== 'transient_stall' && !isRateLimit && !isInternal) { updateCapability(currentApiKey, modelKey, false, 'model_error'); } @@ -1417,11 +1425,11 @@ function streamResponse(id, created, model, modelKey, messages, cascadeMessages, } // Retry only if nothing has been streamed yet AND it's a retryable error if (!hadSuccess && (err.isModelError || isRateLimit)) { - const tag = isRateLimit ? 'rate_limit' : isInternal ? 'internal_error' : 'model_error'; - if (isInternal) { + const tag = isRateLimit ? 'rate_limit' : isTransient ? 'upstream_transient' : 'model_error'; + if (isTransient) { streamInternalCount++; const backoffMs = await internalErrorBackoff(streamInternalCount - 1); - log.warn(`Chat[${reqId}] stream: ${acct.email} upstream internal_error, waited ${backoffMs}ms before next account`); + log.warn(`Chat[${reqId}] stream: ${acct.email} upstream transient error (${isTransport ? 'cascade_transport' : 'internal_error'}), waited ${backoffMs}ms before next account`); } else { log.warn(`Account ${acct.email} failed (${tag}) on ${model}, trying next`); } @@ -1443,20 +1451,15 @@ function streamResponse(id, created, model, modelKey, messages, cascadeMessages, try { const rl = isAllRateLimited(modelKey); const allInternal = streamInternalCount > 0 && tried.length > 0 && streamInternalCount >= tried.length; - // Prioritize upstream_transient over all-rate-limited the same way - // the non-stream path does (chat.js: handleChatCompletions). When - // both flags happen to fire (account got marked rate-limited mid- - // burst while internal_errors were also accumulating), the more - // specific diagnosis is "upstream is throwing internal_error" — - // the rate-limit message would send the caller chasing the wrong - // thing. Audit catch by codex 5.5. + // 优先暴露 upstream_transient,避免把 Cascade transport 抖动误报成账号限流。 + const lastIsTransport = isCascadeTransportError(lastErr); const errMsg = allInternal - ? upstreamTransientErrorMessage(model, tried.length) + ? upstreamTransientErrorMessage(model, tried.length, lastIsTransport ? 'cascade_transport' : 'internal_error') : rl.allLimited ? `${model} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试` : sanitizeText(lastErr?.message || 'no accounts'); if (allInternal) { - log.error(`Chat[${reqId}] stream: ${tried.length}/${tried.length} accounts hit upstream_internal_error — surfacing upstream_transient_error`); + log.error(`Chat[${reqId}] stream: ${tried.length}/${tried.length} accounts hit upstream transient error — surfacing upstream_transient_error`); } if (!hadSuccess && checkedOutReuseEntry && fpBefore) { poolCheckin(fpBefore, checkedOutReuseEntry, callerKey); diff --git a/test/stream-error.test.js b/test/stream-error.test.js index 8ba02ab..4a79186 100644 --- a/test/stream-error.test.js +++ b/test/stream-error.test.js @@ -1,6 +1,7 @@ import { describe, it } from 'node:test'; import assert from 'node:assert/strict'; -import { chatStreamError, redactRequestLogText } from '../src/handlers/chat.js'; +import { isCascadeTransportError } from '../src/client.js'; +import { chatStreamError, isUpstreamTransientError, redactRequestLogText } from '../src/handlers/chat.js'; import { handleMessages } from '../src/handlers/messages.js'; function parseEvents(raw) { @@ -39,6 +40,13 @@ describe('stream error protocol', () => { }); }); + it('classifies Cascade HTTP/2 cancellation as upstream transient', () => { + const err = new Error('The pending stream has been canceled (caused by: )'); + assert.equal(isCascadeTransportError(err), true); + assert.equal(isUpstreamTransientError(err), true); + assert.equal(isUpstreamTransientError(new Error('permission_denied: model unavailable')), false); + }); + it('redacts common secret patterns before debug request-body logging', () => { const redacted = redactRequestLogText('sk-1234567890abcdefghijklmnop test@example.com Cookie: session=abc eyJabc.def.ghi AKIAABCDEFGHIJKLMNOP'); assert.doesNotMatch(redacted, /sk-1234567890/); @@ -66,4 +74,23 @@ describe('stream error protocol', () => { assert.equal(events[0].event, 'error'); assert.equal(events[0].data.error.message, 'boom'); }); + + it('preserves upstream_transient_error in Anthropic stream errors', async () => { + const result = await handleMessages({ model: 'claude-sonnet-4.6', stream: true, messages: [{ role: 'user', content: 'hi' }] }, { + async handleChatCompletions() { + return { + status: 200, + stream: true, + async handler(res) { + res.end(`data: ${JSON.stringify(chatStreamError('cascade transport canceled', 'upstream_transient_error'))}\n\n`); + }, + }; + }, + }); + const res = fakeRes(); + await result.handler(res); + const events = parseEvents(res.body); + assert.equal(events[0].event, 'error'); + assert.equal(events[0].data.error.type, 'upstream_transient_error'); + }); });