Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { existsSync, mkdirSync, writeFileSync } from 'fs';
import { execSync } from 'child_process';
import { log } from './config.js';
import { extractImages } from './image.js';
import { grpcFrame, grpcUnary, grpcStream } from './grpc.js';
import { closeSessionForPort, grpcFrame, grpcUnary, grpcStream } from './grpc.js';
import { getLsEntryByPort } from './langserver.js';
import {
buildRawGetChatMessageRequest, parseRawResponse,
Expand All @@ -29,6 +29,28 @@ import {

const LS_SERVICE = '/exa.language_server_pb.LanguageServerService';

export function isCascadeTransportError(err) {
const msg = String(err?.message || err || '');
return /pending stream has been canceled|ECONNRESET|ERR_HTTP2|session closed|stream closed|panel state/i.test(msg);
}

function markCascadeTransportError(err) {
if (!err || typeof err !== 'object') return err;
err.isModelError = true;
err.kind = 'transient_stall';
err.isCascadeTransportError = true;
return err;
}

function resetCascadeTransportState(port) {
// Cascade warmup 的 HTTP/2 取消代表当前 LS 会话不可靠,清掉复用状态后让下一次请求重新建会话。
closeSessionForPort(port);
const lsEntry = getLsEntryByPort(port);
if (!lsEntry) return;
lsEntry.workspaceInit = null;
lsEntry.sessionId = null;
}

function isImageLikeBlock(part) {
const type = String(part?.type || '').toLowerCase();
return type === 'image' || type === 'image_url' || type === 'input_image'
Expand Down Expand Up @@ -300,23 +322,30 @@ export class WindsurfClient {
const workspacePath = `/home/user/projects/workspace-${wsId}`;
const workspaceUri = `file://${workspacePath}`;

const handleWarmupError = (stage, err) => {
log.warn(`${stage}: ${err.message}`);
if (!isCascadeTransportError(err)) return;
resetCascadeTransportState(this.port);
throw markCascadeTransportError(new Error(`${stage}: ${err.message}`));
};

lsEntry.workspaceInit = (async () => {
try {
const initProto = buildInitializePanelStateRequest(this.apiKey, sessionId);
await grpcUnary(this.port, this.csrfToken,
`${LS_SERVICE}/InitializeCascadePanelState`, grpcFrame(initProto), 5000);
} catch (e) { log.warn(`InitializeCascadePanelState: ${e.message}`); }
} catch (e) { handleWarmupError('InitializeCascadePanelState', e); }
try {
ensureWorkspaceDir(workspacePath);
const addWsProto = buildAddTrackedWorkspaceRequest(workspacePath);
await grpcUnary(this.port, this.csrfToken,
`${LS_SERVICE}/AddTrackedWorkspace`, grpcFrame(addWsProto), 5000);
} catch (e) { log.warn(`AddTrackedWorkspace: ${e.message}`); }
} catch (e) { handleWarmupError('AddTrackedWorkspace', e); }
try {
const trustProto = buildUpdateWorkspaceTrustRequest(this.apiKey, workspaceUri, true, sessionId);
await grpcUnary(this.port, this.csrfToken,
`${LS_SERVICE}/UpdateWorkspaceTrust`, grpcFrame(trustProto), 5000);
} catch (e) { log.warn(`UpdateWorkspaceTrust: ${e.message}`); }
} catch (e) { handleWarmupError('UpdateWorkspaceTrust', e); }
log.info(`Cascade workspace init complete for LS port=${this.port}`);
})().catch(e => {
lsEntry.workspaceInit = null;
Expand Down Expand Up @@ -349,7 +378,7 @@ export class WindsurfClient {
// One-shot per-LS workspace init (idempotent; typically pre-warmed at
// LS startup). Falls back to a local session id if the LS entry is gone.
const lsEntry = getLsEntryByPort(this.port);
await this.warmupCascade().catch(() => {});
await this.warmupCascade();
let sessionId = reuseEntry?.sessionId || lsEntry?.sessionId || randomUUID();

// "panel state not found" means the LS forgot the panel for our sessionId
Expand Down Expand Up @@ -379,7 +408,7 @@ export class WindsurfClient {
} catch (e) {
if (!isPanelMissing(e)) throw e;
log.warn(`Panel state missing, re-warming LS port=${this.port}`);
await this.warmupCascade(true).catch(() => {});
await this.warmupCascade(true);
sessionId = getLsEntryByPort(this.port)?.sessionId || randomUUID();
reuseEntry = null; // cascade expired — treat as fresh
cascadeId = await openCascade();
Expand Down Expand Up @@ -531,7 +560,12 @@ export class WindsurfClient {
await rebuildFullHistoryText();
historyRebuilt = true;
}
await this.warmupCascade(true).catch(err => log.warn(`warmupCascade failed: ${err.message}`));
try {
await this.warmupCascade(true);
} catch (err) {
if (isCascadeTransportError(err)) throw err;
log.warn(`warmupCascade failed: ${err.message}`);
}
// Small backoff — LS panel state sometimes needs a moment after Init
if (panelRetry > 1) await new Promise(r => setTimeout(r, 250 * panelRetry));
sessionId = getLsEntryByPort(this.port)?.sessionId || randomUUID();
Expand Down Expand Up @@ -894,6 +928,10 @@ export class WindsurfClient {
return chunks;

} catch (err) {
if (isCascadeTransportError(err)) {
resetCascadeTransportState(this.port);
markCascadeTransportError(err);
}
onError?.(err);
throw err;
}
Expand Down
67 changes: 35 additions & 32 deletions src/handlers/chat.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import { createHash, randomUUID } from 'crypto';
import { WindsurfClient } from '../client.js';
import { WindsurfClient, isCascadeTransportError } from '../client.js';
import { getApiKey, acquireAccountByKey, releaseAccount, getAccountAvailability, reportError, reportSuccess, markRateLimited, reportInternalError, updateCapability, getAccountList, isAllRateLimited } from '../auth.js';
import { resolveModel, getModelInfo } from '../models.js';
import { getLsFor, ensureLs } from '../langserver.js';
Expand Down Expand Up @@ -43,8 +43,15 @@ async function internalErrorBackoff(retryIdx) {
return ms;
}

function upstreamTransientErrorMessage(model, triedCount) {
return `${model} 上游 Windsurf Cascade 服务瞬态故障:已在 ${triedCount} 个账号上重试都收到 internal_error。这是上游服务端的瞬时问题,反代无法绕过,建议 30-60 秒后重试。`;
function upstreamTransientErrorMessage(model, triedCount, reason = 'internal_error') {
const detail = reason === 'cascade_transport'
? 'Cascade/语言服务器 HTTP/2 流被取消'
: 'internal_error';
return `${model} 上游 Windsurf Cascade 服务瞬态故障:已在 ${triedCount} 个账号上重试都收到 ${detail}。这是上游或本地语言服务器会话的瞬时问题,建议 30-60 秒后重试;若连续出现,请重启语言服务器。`;
}

export function isUpstreamTransientError(err, isInternal = false) {
return !!err && (isInternal || err.kind === 'transient_stall' || isCascadeTransportError(err));
}

function shortHash(text) {
Expand Down Expand Up @@ -831,14 +838,11 @@ export async function handleChatCompletions(body, context = {}) {
log.warn(`Account ${acct.email} rate-limited on ${displayModel}, trying next account`);
continue;
}
// Upstream Cascade transient (internal error occurred): back off
// before falling over so the next account doesn't hit the same hot
// upstream window. Counts toward internalCount for final-error
// classification below.
if (errType === 'upstream_internal_error') {
// Cascade transient 错误通常是上游或本地 LS 短暂抖动,先退避再切账号,避免连续打爆同一热窗口。
if (errType === 'upstream_internal_error' || errType === 'upstream_transient_error') {
internalCount++;
const backoffMs = await internalErrorBackoff(internalCount - 1);
log.warn(`Chat[${reqId}]: ${acct.email} upstream internal_error, waited ${backoffMs}ms before next account`);
log.warn(`Chat[${reqId}]: ${acct.email} upstream transient error, waited ${backoffMs}ms before next account`);
continue;
}
// Model not available on this account (permission_denied, etc.)
Expand All @@ -854,21 +858,17 @@ export async function handleChatCompletions(body, context = {}) {
if (acct) releaseAccount(acct.apiKey);
}
}
// If every attempt (or every-attempt-since-the-last-success) hit
// upstream_internal_error, the proxy's account-rotation can't fix it —
// it's a Cascade server-side transient. Surface a clean message that
// tells the caller "this is upstream, retry shortly" instead of the
// misleading "rate limit" / "model not available" they'd otherwise
// see from lastErr.
// 所有账号都遇到 Cascade transient 时,账号轮换已经无法修复;返回明确错误,避免误报成限流或模型不可用。
if (internalCount > 0 && tried.length > 0 && internalCount >= tried.length) {
if (checkedOutReuseEntry && fpBefore) {
poolCheckin(fpBefore, checkedOutReuseEntry, callerKey);
log.info(`Chat[${reqId}]: restored checked-out cascade after all-internal-error chain`);
}
log.error(`Chat[${reqId}]: ${tried.length}/${tried.length} accounts hit upstream_internal_error — surfacing upstream_transient_error`);
const lastIsTransport = isCascadeTransportError(lastErr);
log.error(`Chat[${reqId}]: ${tried.length}/${tried.length} accounts hit upstream transient error — surfacing upstream_transient_error`);
return {
status: 502,
body: { error: { message: upstreamTransientErrorMessage(displayModel, tried.length), type: 'upstream_transient_error' } },
body: { error: { message: upstreamTransientErrorMessage(displayModel, tried.length, lastIsTransport ? 'cascade_transport' : 'internal_error'), type: 'upstream_transient_error' } },
};
}
// If all accounts exhausted, check if it's because they're all rate-limited
Expand Down Expand Up @@ -1008,9 +1008,12 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages,
const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message);
const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message);
const isInternal = /internal error occurred.*error id/i.test(err.message);
const isTransport = isCascadeTransportError(err);
const isTransient = isUpstreamTransientError(err, isInternal);
if (isAuthFail) reportError(apiKey);
if (isRateLimit) { markRateLimited(apiKey, rateLimitCooldownMs(err.message), modelKey); err.isRateLimit = true; err.isModelError = true; err.kind ||= 'model_error'; }
if (isInternal) { reportInternalError(apiKey); err.isModelError = true; err.kind ||= 'transient_stall'; }
if (isTransport) { err.isModelError = true; err.kind ||= 'transient_stall'; }
if (err.isModelError && err.kind !== 'transient_stall' && !isRateLimit && !isInternal) {
updateCapability(apiKey, modelKey, false, 'model_error');
}
Expand Down Expand Up @@ -1046,10 +1049,12 @@ async function nonStreamResponse(client, id, created, model, modelKey, messages,
}
}
return {
status: isInternal ? 502 : (err.isModelError ? 403 : 502),
status: isTransient ? 502 : (err.isModelError ? 403 : 502),
body: { error: {
message: sanitizeText(err.message),
type: isInternal ? 'upstream_internal_error' : (err.isModelError ? 'model_not_available' : 'upstream_error'),
message: isTransient
? upstreamTransientErrorMessage(model, 1, isTransport ? 'cascade_transport' : 'internal_error')
: sanitizeText(err.message),
type: isTransient ? 'upstream_transient_error' : (err.isModelError ? 'model_not_available' : 'upstream_error'),
} },
};
}
Expand Down Expand Up @@ -1405,9 +1410,12 @@ function streamResponse(id, created, model, modelKey, messages, cascadeMessages,
const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message);
const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message);
const isInternal = /internal error occurred.*error id/i.test(err.message);
const isTransport = isCascadeTransportError(err);
const isTransient = isUpstreamTransientError(err, isInternal);
if (isAuthFail) reportError(currentApiKey);
if (isRateLimit) { markRateLimited(currentApiKey, rateLimitCooldownMs(err.message), modelKey); err.isRateLimit = true; err.isModelError = true; err.kind ||= 'model_error'; }
if (isInternal) { reportInternalError(currentApiKey); err.isModelError = true; err.kind ||= 'transient_stall'; }
if (isTransport) { err.isModelError = true; err.kind ||= 'transient_stall'; }
if (err.isModelError && err.kind !== 'transient_stall' && !isRateLimit && !isInternal) {
updateCapability(currentApiKey, modelKey, false, 'model_error');
}
Expand All @@ -1417,11 +1425,11 @@ function streamResponse(id, created, model, modelKey, messages, cascadeMessages,
}
// Retry only if nothing has been streamed yet AND it's a retryable error
if (!hadSuccess && (err.isModelError || isRateLimit)) {
const tag = isRateLimit ? 'rate_limit' : isInternal ? 'internal_error' : 'model_error';
if (isInternal) {
const tag = isRateLimit ? 'rate_limit' : isTransient ? 'upstream_transient' : 'model_error';
if (isTransient) {
streamInternalCount++;
const backoffMs = await internalErrorBackoff(streamInternalCount - 1);
log.warn(`Chat[${reqId}] stream: ${acct.email} upstream internal_error, waited ${backoffMs}ms before next account`);
log.warn(`Chat[${reqId}] stream: ${acct.email} upstream transient error (${isTransport ? 'cascade_transport' : 'internal_error'}), waited ${backoffMs}ms before next account`);
} else {
log.warn(`Account ${acct.email} failed (${tag}) on ${model}, trying next`);
}
Expand All @@ -1443,20 +1451,15 @@ function streamResponse(id, created, model, modelKey, messages, cascadeMessages,
try {
const rl = isAllRateLimited(modelKey);
const allInternal = streamInternalCount > 0 && tried.length > 0 && streamInternalCount >= tried.length;
// Prioritize upstream_transient over all-rate-limited the same way
// the non-stream path does (chat.js: handleChatCompletions). When
// both flags happen to fire (account got marked rate-limited mid-
// burst while internal_errors were also accumulating), the more
// specific diagnosis is "upstream is throwing internal_error" —
// the rate-limit message would send the caller chasing the wrong
// thing. Audit catch by codex 5.5.
// 优先暴露 upstream_transient,避免把 Cascade transport 抖动误报成账号限流。
const lastIsTransport = isCascadeTransportError(lastErr);
const errMsg = allInternal
? upstreamTransientErrorMessage(model, tried.length)
? upstreamTransientErrorMessage(model, tried.length, lastIsTransport ? 'cascade_transport' : 'internal_error')
: rl.allLimited
? `${model} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试`
: sanitizeText(lastErr?.message || 'no accounts');
if (allInternal) {
log.error(`Chat[${reqId}] stream: ${tried.length}/${tried.length} accounts hit upstream_internal_error — surfacing upstream_transient_error`);
log.error(`Chat[${reqId}] stream: ${tried.length}/${tried.length} accounts hit upstream transient error — surfacing upstream_transient_error`);
}
if (!hadSuccess && checkedOutReuseEntry && fpBefore) {
poolCheckin(fpBefore, checkedOutReuseEntry, callerKey);
Expand Down
29 changes: 28 additions & 1 deletion test/stream-error.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { describe, it } from 'node:test';
import assert from 'node:assert/strict';
import { chatStreamError, redactRequestLogText } from '../src/handlers/chat.js';
import { isCascadeTransportError } from '../src/client.js';
import { chatStreamError, isUpstreamTransientError, redactRequestLogText } from '../src/handlers/chat.js';
import { handleMessages } from '../src/handlers/messages.js';

function parseEvents(raw) {
Expand Down Expand Up @@ -39,6 +40,13 @@ describe('stream error protocol', () => {
});
});

it('classifies Cascade HTTP/2 cancellation as upstream transient', () => {
const err = new Error('The pending stream has been canceled (caused by: )');
assert.equal(isCascadeTransportError(err), true);
assert.equal(isUpstreamTransientError(err), true);
assert.equal(isUpstreamTransientError(new Error('permission_denied: model unavailable')), false);
});

it('redacts common secret patterns before debug request-body logging', () => {
const redacted = redactRequestLogText('sk-1234567890abcdefghijklmnop test@example.com Cookie: session=abc eyJabc.def.ghi AKIAABCDEFGHIJKLMNOP');
assert.doesNotMatch(redacted, /sk-1234567890/);
Expand Down Expand Up @@ -66,4 +74,23 @@ describe('stream error protocol', () => {
assert.equal(events[0].event, 'error');
assert.equal(events[0].data.error.message, 'boom');
});

it('preserves upstream_transient_error in Anthropic stream errors', async () => {
const result = await handleMessages({ model: 'claude-sonnet-4.6', stream: true, messages: [{ role: 'user', content: 'hi' }] }, {
async handleChatCompletions() {
return {
status: 200,
stream: true,
async handler(res) {
res.end(`data: ${JSON.stringify(chatStreamError('cascade transport canceled', 'upstream_transient_error'))}\n\n`);
},
};
},
});
const res = fakeRes();
await result.handler(res);
const events = parseEvents(res.body);
assert.equal(events[0].event, 'error');
assert.equal(events[0].data.error.type, 'upstream_transient_error');
});
});
Loading