diff --git a/index.ts b/index.ts index e29c9179..d8eadd12 100644 --- a/index.ts +++ b/index.ts @@ -2075,16 +2075,16 @@ export const OpenAIOAuthPlugin: Plugin = async ({ client }: PluginInput) => { } }; - let allRateLimitedRetries = 0; - let emptyResponseRetries = 0; - const attemptedUnsupportedFallbackModels = new Set(); + let allRateLimitedRetries = 0; + let emptyResponseRetries = 0; + const attemptedUnsupportedFallbackModels = new Set(); if (model) { attemptedUnsupportedFallbackModels.add(model); } while (true) { let accountCount = accountManager.getAccountCount(); - const attempted = new Set(); + const attempted = new Set(); let restartAccountTraversalWithFallback = false; while (attempted.size < Math.max(1, accountCount)) { @@ -2101,11 +2101,14 @@ while (attempted.size < Math.max(1, accountCount)) { quotaKey, explainability: selectionExplainability, }; - const account = accountManager.getCurrentOrNextForFamilyHybrid(modelFamily, model, { pidOffsetEnabled }); - if (!account || attempted.has(account.index)) { + const account = accountManager.getNextRequestEligibleForFamilyHybrid(modelFamily, model, { + attemptedAccountKeys: attempted, + pidOffsetEnabled, + }); + if (!account) { break; } - attempted.add(account.index); + attempted.add(accountManager.getRequestAttemptKey(account)); runtimeMetrics.lastSelectedAccountIndex = account.index; runtimeMetrics.lastQuotaKey = quotaKey; if (runtimeMetrics.lastSelectionSnapshot) { @@ -2256,7 +2259,6 @@ while (attempted.size < Math.max(1, accountCount)) { // Consume a token before making the request for proactive rate limiting const tokenConsumed = accountManager.consumeToken(account, modelFamily, model); if (!tokenConsumed) { - accountManager.recordRateLimit(account, modelFamily, model); runtimeMetrics.accountRotations++; runtimeMetrics.lastError = `Local token bucket depleted for account ${account.index + 1} (${modelFamily}${model ? `:${model}` : ""})`; @@ -2264,7 +2266,7 @@ while (attempted.size < Math.max(1, accountCount)) { logWarn( `Skipping account ${account.index + 1}: local token bucket depleted for ${modelFamily}${model ? `:${model}` : ""}`, ); - break; + continue; } while (true) { @@ -2633,10 +2635,12 @@ while (attempted.size < Math.max(1, accountCount)) { const waitMs = accountManager.getMinWaitTimeForFamily(modelFamily, model); const count = accountManager.getAccountCount(); + const hasFiniteWait = Number.isFinite(waitMs); if ( retryAllAccountsRateLimited && count > 0 && + hasFiniteWait && waitMs > 0 && (retryAllAccountsMaxWaitMs === 0 || waitMs <= retryAllAccountsMaxWaitMs) && @@ -2652,12 +2656,19 @@ while (attempted.size < Math.max(1, accountCount)) { continue; } - const waitLabel = waitMs > 0 ? formatWaitTime(waitMs) : "a bit"; + const waitLabel = + waitMs > 0 + ? hasFiniteWait + ? formatWaitTime(waitMs) + : "an indefinite wait" + : "a bit"; const message = count === 0 ? "No Codex accounts configured. Run `opencode auth login`." : waitMs > 0 - ? `All ${count} account(s) are rate-limited. Try again in ${waitLabel} or add another account with \`opencode auth login\`.` + ? hasFiniteWait + ? `All ${count} account(s) are rate-limited. Try again in ${waitLabel} or add another account with \`opencode auth login\`.` + : `All ${count} account(s) are rate-limited indefinitely. Re-enable token refill or add another account with \`opencode auth login\`.` : `All ${count} account(s) failed (server errors or auth issues). Check account health with \`codex-health\`.`; runtimeMetrics.failedRequests++; runtimeMetrics.lastError = message; diff --git a/lib/accounts.ts b/lib/accounts.ts index 286bc6f2..7f7a959d 100644 --- a/lib/accounts.ts +++ b/lib/accounts.ts @@ -1,6 +1,7 @@ import { existsSync, promises as fs } from "node:fs"; import { homedir } from "node:os"; import { join } from "node:path"; +import { randomUUID } from "node:crypto"; import type { Auth } from "@opencode-ai/sdk"; import { createLogger } from "./logger.js"; import { @@ -15,9 +16,11 @@ import { MODEL_FAMILIES, type ModelFamily } from "./prompts/codex.js"; import { getHealthTracker, getTokenTracker, + reindexTrackersAfterRemoval, selectHybridAccount, type AccountWithMetrics, type HybridSelectionOptions, + type RequestHybridSelectionOptions, } from "./rotation.js"; import { isRecord, nowMs } from "./utils.js"; import { decodeJWT } from "./auth/auth.js"; @@ -172,6 +175,7 @@ function initFamilyState(defaultValue: number): Record { export interface ManagedAccount { index: number; + immutableId: string; accountId?: string; organizationId?: string; accountIdSource?: AccountIdSource; @@ -206,6 +210,18 @@ export interface AccountSelectionExplainability { lastUsed: number; } +type AccountAvailabilitySnapshot = { + enabled: boolean; + rateLimited: boolean; + coolingDown: boolean; + tokensAvailable: number; + eligible: boolean; +}; + +function getRequestAttemptKey(account: Pick): string { + return account.immutableId; +} + export class AccountManager { private accounts: ManagedAccount[] = []; private cursorByFamily: Record = initFamilyState(0); @@ -298,6 +314,7 @@ export class AccountManager { return { index, + immutableId: randomUUID(), accountId: matchesFallback ? fallbackAccountId ?? account.accountId : account.accountId, organizationId: account.organizationId, accountIdSource: account.accountIdSource, @@ -334,6 +351,7 @@ export class AccountManager { const now = nowMs(); this.accounts.push({ index: this.accounts.length, + immutableId: randomUUID(), accountId: fallbackAccountId, organizationId: undefined, accountIdSource: fallbackAccountId ? "token" : undefined, @@ -367,6 +385,7 @@ export class AccountManager { this.accounts = [ { index: 0, + immutableId: randomUUID(), accountId: fallbackAccountId, organizationId: undefined, accountIdSource: fallbackAccountId ? "token" : undefined, @@ -411,12 +430,38 @@ export class AccountManager { })); } + getRequestAttemptKey(account: ManagedAccount): string { + return getRequestAttemptKey(account); + } + + private getAccountAvailabilitySnapshot( + account: ManagedAccount, + family: ModelFamily, + model?: string | null, + tokenTracker = getTokenTracker(), + ): AccountAvailabilitySnapshot { + clearExpiredRateLimits(account); + const enabled = account.enabled !== false; + const rateLimited = isRateLimitedForFamily(account, family, model); + const coolingDown = this.isAccountCoolingDown(account); + const quotaKey = getQuotaKey(family, model); + const tokensAvailable = tokenTracker.getTokens(account.index, quotaKey); + const eligible = enabled && !rateLimited && !coolingDown && tokensAvailable >= 1; + return { + enabled, + rateLimited, + coolingDown, + tokensAvailable, + eligible, + }; + } + getSelectionExplainability( family: ModelFamily, model?: string | null, now = nowMs(), ): AccountSelectionExplainability[] { - const quotaKey = model ? `${family}:${model}` : family; + const quotaKey = getQuotaKey(family, model); const baseQuotaKey = getQuotaKey(family); const modelQuotaKey = model ? getQuotaKey(family, model) : null; const currentIndex = this.currentAccountIndexByFamily[family]; @@ -424,8 +469,12 @@ export class AccountManager { const tokenTracker = getTokenTracker(); return this.accounts.map((account) => { - clearExpiredRateLimits(account); - const enabled = account.enabled !== false; + const availability = this.getAccountAvailabilitySnapshot( + account, + family, + model, + tokenTracker, + ); const reasons: string[] = []; let rateLimitedUntil: number | undefined; const baseRateLimit = account.rateLimitResetTimes[baseQuotaKey]; @@ -446,7 +495,7 @@ export class AccountManager { ? account.coolingDownUntil : undefined; - if (!enabled) reasons.push("disabled"); + if (!availability.enabled) reasons.push("disabled"); if (rateLimitedUntil !== undefined) reasons.push("rate-limited"); if (coolingDownUntil !== undefined) { reasons.push( @@ -454,24 +503,17 @@ export class AccountManager { ); } - const tokensAvailable = tokenTracker.getTokens(account.index, quotaKey); - if (tokensAvailable < 1) reasons.push("token-bucket-empty"); - - const eligible = - enabled && - rateLimitedUntil === undefined && - coolingDownUntil === undefined && - tokensAvailable >= 1; + if (availability.tokensAvailable < 1) reasons.push("token-bucket-empty"); if (reasons.length === 0) reasons.push("eligible"); return { index: account.index, - enabled, + enabled: availability.enabled, isCurrentForFamily: currentIndex === account.index, - eligible, + eligible: availability.eligible, reasons, healthScore: healthTracker.getScore(account.index, quotaKey), - tokensAvailable, + tokensAvailable: availability.tokensAvailable, rateLimitedUntil, coolingDownUntil, cooldownReason: coolingDownUntil !== undefined ? account.cooldownReason : undefined, @@ -591,7 +633,7 @@ export class AccountManager { } } - const quotaKey = model ? `${family}:${model}` : family; + const quotaKey = getQuotaKey(family, model); const healthTracker = getHealthTracker(); const tokenTracker = getTokenTracker(); @@ -622,14 +664,89 @@ export class AccountManager { return account; } + getNextRequestEligibleForFamilyHybrid( + family: ModelFamily, + model?: string | null, + options: RequestHybridSelectionOptions = {}, + ): ManagedAccount | null { + const count = this.accounts.length; + if (count === 0) return null; + + const attemptedAccountKeys = options.attemptedAccountKeys ?? new Set(); + const quotaKey = getQuotaKey(family, model); + const healthTracker = getHealthTracker(); + const tokenTracker = getTokenTracker(); + const currentIndex = this.currentAccountIndexByFamily[family]; + let alreadyCheckedCurrentIndex = -1; + + const currentAccount = + currentIndex >= 0 && currentIndex < count ? this.accounts[currentIndex] : undefined; + if ( + currentAccount && + !attemptedAccountKeys.has(getRequestAttemptKey(currentAccount)) + ) { + const availability = this.getAccountAvailabilitySnapshot( + currentAccount, + family, + model, + tokenTracker, + ); + if (availability.eligible) { + currentAccount.lastUsed = nowMs(); + return currentAccount; + } + alreadyCheckedCurrentIndex = currentIndex; + } + + const accountsWithMetrics: AccountWithMetrics[] = this.accounts + .map((account): AccountWithMetrics | null => { + if (!account) return null; + if (account.index === alreadyCheckedCurrentIndex) return null; + if (attemptedAccountKeys.has(getRequestAttemptKey(account))) return null; + const availability = this.getAccountAvailabilitySnapshot( + account, + family, + model, + tokenTracker, + ); + if (!availability.eligible) return null; + return { + index: account.index, + isAvailable: true, + lastUsed: account.lastUsed, + }; + }) + .filter((account): account is AccountWithMetrics => account !== null); + + // Every entry passed here is already request-eligible, so hybrid scoring only + // ranks eligible candidates and does not rely on the LRU unavailable fallback. + const selected = selectHybridAccount( + accountsWithMetrics, + healthTracker, + tokenTracker, + quotaKey, + {}, + { pidOffsetEnabled: options.pidOffsetEnabled }, + ); + if (!selected) return null; + + const account = this.accounts[selected.index]; + if (!account) return null; + + this.currentAccountIndexByFamily[family] = account.index; + this.cursorByFamily[family] = (account.index + 1) % count; + account.lastUsed = nowMs(); + return account; + } + recordSuccess(account: ManagedAccount, family: ModelFamily, model?: string | null): void { - const quotaKey = model ? `${family}:${model}` : family; + const quotaKey = getQuotaKey(family, model); const healthTracker = getHealthTracker(); healthTracker.recordSuccess(account.index, quotaKey); } recordRateLimit(account: ManagedAccount, family: ModelFamily, model?: string | null): void { - const quotaKey = model ? `${family}:${model}` : family; + const quotaKey = getQuotaKey(family, model); const healthTracker = getHealthTracker(); const tokenTracker = getTokenTracker(); healthTracker.recordRateLimit(account.index, quotaKey); @@ -637,13 +754,13 @@ export class AccountManager { } recordFailure(account: ManagedAccount, family: ModelFamily, model?: string | null): void { - const quotaKey = model ? `${family}:${model}` : family; + const quotaKey = getQuotaKey(family, model); const healthTracker = getHealthTracker(); healthTracker.recordFailure(account.index, quotaKey); } consumeToken(account: ManagedAccount, family: ModelFamily, model?: string | null): boolean { - const quotaKey = model ? `${family}:${model}` : family; + const quotaKey = getQuotaKey(family, model); const tokenTracker = getTokenTracker(); return tokenTracker.tryConsume(account.index, quotaKey); } @@ -654,7 +771,7 @@ export class AccountManager { * @returns true if refund was successful, false if no valid consumption found */ refundToken(account: ManagedAccount, family: ModelFamily, model?: string | null): boolean { - const quotaKey = model ? `${family}:${model}` : family; + const quotaKey = getQuotaKey(family, model); const tokenTracker = getTokenTracker(); return tokenTracker.refundToken(account.index, quotaKey); } @@ -791,10 +908,11 @@ export class AccountManager { getMinWaitTimeForFamily(family: ModelFamily, model?: string | null): number { const now = nowMs(); + const quotaKey = getQuotaKey(family, model); + const tokenTracker = getTokenTracker(); const enabledAccounts = this.accounts.filter((account) => account.enabled !== false); const available = enabledAccounts.filter((account) => { - clearExpiredRateLimits(account); - return !isRateLimitedForFamily(account, family, model) && !this.isAccountCoolingDown(account); + return this.getAccountAvailabilitySnapshot(account, family, model, tokenTracker).eligible; }); if (available.length > 0) return 0; if (enabledAccounts.length === 0) return 0; @@ -804,24 +922,37 @@ export class AccountManager { const modelKey = model ? getQuotaKey(family, model) : null; for (const account of enabledAccounts) { + const accountWaits: number[] = []; + // `available` above clears stale blocker windows first, so any waits collected + // here reflect only blockers that are still preventing this account from serving. const baseResetAt = account.rateLimitResetTimes[baseKey]; if (typeof baseResetAt === "number") { - waitTimes.push(Math.max(0, baseResetAt - now)); + accountWaits.push(Math.max(0, baseResetAt - now)); } if (modelKey) { const modelResetAt = account.rateLimitResetTimes[modelKey]; if (typeof modelResetAt === "number") { - waitTimes.push(Math.max(0, modelResetAt - now)); + accountWaits.push(Math.max(0, modelResetAt - now)); } } if (typeof account.coolingDownUntil === "number") { - waitTimes.push(Math.max(0, account.coolingDownUntil - now)); + accountWaits.push(Math.max(0, account.coolingDownUntil - now)); + } + + const tokenWaitMs = tokenTracker.getWaitTimeUntilTokenAvailable(account.index, quotaKey); + if (tokenWaitMs > 0) { + accountWaits.push(Number.isFinite(tokenWaitMs) ? tokenWaitMs : Number.POSITIVE_INFINITY); + } + + if (accountWaits.length > 0) { + waitTimes.push(Math.max(...accountWaits)); } } - return waitTimes.length > 0 ? Math.min(...waitTimes) : 0; + if (waitTimes.length === 0) return 0; + return Math.min(...waitTimes); } removeAccount(account: ManagedAccount): boolean { @@ -834,6 +965,14 @@ export class AccountManager { this.accounts.forEach((acc, index) => { acc.index = index; }); + if (this.lastToastAccountIndex === idx) { + this.lastToastAccountIndex = -1; + } else if (this.lastToastAccountIndex > idx) { + this.lastToastAccountIndex -= 1; + } + // Trackers are keyed by account index, so removals must reindex surviving entries + // to keep health and token history attached to the correct remaining account. + reindexTrackersAfterRemoval(idx); if (this.accounts.length === 0) { for (const family of MODEL_FAMILIES) { diff --git a/lib/rotation.ts b/lib/rotation.ts index 9b171636..258c2202 100644 --- a/lib/rotation.ts +++ b/lib/rotation.ts @@ -121,6 +121,20 @@ export class HealthScoreTracker { this.entries.delete(key); } + reindexAfterRemoval(removedIndex: number): void { + const nextEntries = new Map(); + for (const [key, entry] of this.entries) { + const [indexText, ...quotaParts] = key.split(":"); + const index = Number.parseInt(indexText ?? "", 10); + if (!Number.isFinite(index)) continue; + if (index === removedIndex) continue; + const nextIndex = index > removedIndex ? index - 1 : index; + const nextKey = quotaParts.length > 0 ? `${nextIndex}:${quotaParts.join(":")}` : `${nextIndex}`; + nextEntries.set(nextKey, entry); + } + this.entries = nextEntries; + } + clear(): void { this.entries.clear(); } @@ -180,6 +194,18 @@ export class TokenBucketTracker { return this.refillTokens(entry); } + getWaitTimeUntilTokenAvailable(accountIndex: number, quotaKey?: string): number { + const key = this.getKey(accountIndex, quotaKey); + const entry = this.buckets.get(key); + const currentTokens = entry ? this.refillTokens(entry) : this.config.maxTokens; + if (currentTokens >= 1) return 0; + if (this.config.tokensPerMinute <= 0) return Number.POSITIVE_INFINITY; + + const tokensNeeded = 1 - currentTokens; + const minutesUntilAvailable = tokensNeeded / this.config.tokensPerMinute; + return Math.max(0, Math.ceil(minutesUntilAvailable * 60_000)); + } + /** * Attempt to consume a token. Returns true if successful, false if bucket is empty. */ @@ -255,6 +281,20 @@ export class TokenBucketTracker { this.buckets.delete(key); } + reindexAfterRemoval(removedIndex: number): void { + const nextBuckets = new Map(); + for (const [key, entry] of this.buckets) { + const [indexText, ...quotaParts] = key.split(":"); + const index = Number.parseInt(indexText ?? "", 10); + if (!Number.isFinite(index)) continue; + if (index === removedIndex) continue; + const nextIndex = index > removedIndex ? index - 1 : index; + const nextKey = quotaParts.length > 0 ? `${nextIndex}:${quotaParts.join(":")}` : `${nextIndex}`; + nextBuckets.set(nextKey, entry); + } + this.buckets = nextBuckets; + } + clear(): void { this.buckets.clear(); } @@ -299,6 +339,11 @@ export interface HybridSelectionOptions { pidOffsetEnabled?: boolean; } +export interface RequestHybridSelectionOptions { + attemptedAccountKeys?: ReadonlySet; + pidOffsetEnabled?: boolean; +} + export function selectHybridAccount( accounts: AccountWithMetrics[], healthTracker: HealthScoreTracker, @@ -437,3 +482,8 @@ export function resetTrackers(): void { healthTrackerInstance?.clear(); tokenTrackerInstance?.clear(); } + +export function reindexTrackersAfterRemoval(removedIndex: number): void { + healthTrackerInstance?.reindexAfterRemoval(removedIndex); + tokenTrackerInstance?.reindexAfterRemoval(removedIndex); +} diff --git a/test/accounts.test.ts b/test/accounts.test.ts index 68e6da9c..46921716 100644 --- a/test/accounts.test.ts +++ b/test/accounts.test.ts @@ -894,6 +894,14 @@ describe("AccountManager", () => { }); describe("getMinWaitTimeForFamily", () => { + beforeEach(() => { + resetTrackers(); + }); + + afterEach(() => { + resetTrackers(); + }); + it("returns 0 when accounts are available", () => { const now = Date.now(); const stored = { @@ -955,6 +963,92 @@ describe("AccountManager", () => { expect(waitTime).toBeGreaterThan(0); expect(waitTime).toBeLessThanOrEqual(45000); }); + + it("considers local token bucket recovery time", () => { + const now = Date.now(); + const stored = { + version: 3 as const, + activeIndex: 0, + activeIndexByFamily: { codex: 0 }, + accounts: [ + { + refreshToken: "token-1", + addedAt: now, + lastUsed: now, + }, + ], + }; + + const manager = new AccountManager(undefined, stored as never); + getTokenTracker().drain(0, "codex", 50); + + const waitTime = manager.getMinWaitTimeForFamily("codex"); + expect(waitTime).toBeGreaterThan(0); + expect(waitTime).toBeLessThanOrEqual(10000); + }); + + it("uses the earliest account availability when blockers differ per account", () => { + const now = Date.now(); + const stored = { + version: 3 as const, + activeIndex: 0, + activeIndexByFamily: { codex: 0 }, + accounts: [ + { + refreshToken: "token-1", + addedAt: now, + lastUsed: now, + rateLimitResetTimes: { codex: now + 60000 }, + }, + { + refreshToken: "token-2", + addedAt: now, + lastUsed: now, + coolingDownUntil: now + 30000, + cooldownReason: "manual", + }, + ], + }; + + const manager = new AccountManager(undefined, stored as never); + getTokenTracker().drain(0, "codex", 50); + + const waitTime = manager.getMinWaitTimeForFamily("codex"); + expect(waitTime).toBeGreaterThan(20000); + expect(waitTime).toBeLessThanOrEqual(30000); + }); + + it("returns Infinity when token refill is disabled", () => { + const now = Date.now(); + const stored = { + version: 3 as const, + activeIndex: 0, + activeIndexByFamily: { codex: 0 }, + accounts: [ + { + refreshToken: "token-1", + addedAt: now, + lastUsed: now, + }, + ], + }; + + const manager = new AccountManager(undefined, stored as never); + const tracker = getTokenTracker() as unknown as { + config: { tokensPerMinute: number }; + drain: (accountIndex: number, quotaKey?: string, drainAmount?: number) => void; + }; + const originalTokensPerMinute = tracker.config.tokensPerMinute; + try { + tracker.config.tokensPerMinute = 0; + tracker.drain(0, "codex", 50); + + const waitTime = manager.getMinWaitTimeForFamily("codex"); + expect(waitTime).toBe(Number.POSITIVE_INFINITY); + } finally { + tracker.config.tokensPerMinute = originalTokensPerMinute; + } + }); }); describe("updateFromAuth", () => { @@ -2043,6 +2137,66 @@ describe("AccountManager", () => { expect(selected?.index).toBe(0); }); + it("request selector skips attempted current account and picks the next eligible account", () => { + const now = Date.now(); + const stored = { + version: 3 as const, + activeIndex: 0, + activeIndexByFamily: { codex: 0 }, + accounts: [ + { refreshToken: "token-1", addedAt: now, lastUsed: now }, + { refreshToken: "token-2", addedAt: now, lastUsed: now - 10000 }, + ], + }; + + const manager = new AccountManager(undefined, stored as never); + const selected = manager.getNextRequestEligibleForFamilyHybrid("codex", undefined, { + attemptedAccountKeys: new Set([manager.getRequestAttemptKey(manager.getAccountsSnapshot()[0]!)]), + }); + + expect(selected).not.toBeNull(); + expect(selected?.index).toBe(1); + }); + + it("request selector treats token-bucket-empty current account as ineligible", () => { + const now = Date.now(); + const stored = { + version: 3 as const, + activeIndex: 0, + activeIndexByFamily: { codex: 0 }, + accounts: [ + { refreshToken: "token-1", addedAt: now, lastUsed: now }, + { refreshToken: "token-2", addedAt: now, lastUsed: now - 10000 }, + ], + }; + + const manager = new AccountManager(undefined, stored as never); + getTokenTracker().drain(0, "codex", 50); + + const selected = manager.getNextRequestEligibleForFamilyHybrid("codex"); + expect(selected).not.toBeNull(); + expect(selected?.index).toBe(1); + }); + + it("request selector returns null instead of falling back to unavailable accounts", () => { + const now = Date.now(); + const stored = { + version: 3 as const, + activeIndex: 0, + activeIndexByFamily: { codex: 0 }, + accounts: [ + { refreshToken: "token-1", addedAt: now, lastUsed: now }, + ], + }; + + const manager = new AccountManager(undefined, stored as never); + const account0 = manager.setActiveIndex(0)!; + manager.markRateLimited(account0, 60000, "codex"); + + const selected = manager.getNextRequestEligibleForFamilyHybrid("codex"); + expect(selected).toBeNull(); + }); + it("reports selection explainability with eligibility reasons", () => { const now = Date.now(); const stored = { diff --git a/test/index-retry.test.ts b/test/index-retry.test.ts index e4268e6c..0dbc134c 100644 --- a/test/index-retry.test.ts +++ b/test/index-retry.test.ts @@ -55,6 +55,19 @@ vi.mock("../lib/accounts.js", () => { return this.getCurrentOrNextForFamily(); } + getNextRequestEligibleForFamilyHybrid( + _family?: unknown, + _model?: unknown, + options?: { attemptedAccountKeys?: ReadonlySet }, + ) { + if (options?.attemptedAccountKeys?.has("mock::0")) return null; + return this.getCurrentOrNextForFamily(); + } + + getRequestAttemptKey() { + return "mock::0"; + } + getSelectionExplainability() { return []; } diff --git a/test/index.test.ts b/test/index.test.ts index daf55c6c..ba86bc40 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -301,6 +301,22 @@ vi.mock("../lib/accounts.js", () => { return this.accounts[0] ?? null; } + getNextRequestEligibleForFamilyHybrid( + _family?: unknown, + _model?: unknown, + options?: { attemptedAccountKeys?: ReadonlySet }, + ) { + const account = this.accounts[0]; + if (!account || options?.attemptedAccountKeys?.has(`mock::${account.index}`)) { + return null; + } + return account; + } + + getRequestAttemptKey(account: { index: number }) { + return `mock::${account.index}`; + } + getSelectionExplainability() { return this.accounts.map((account, index) => ({ index, @@ -2011,6 +2027,83 @@ describe("OpenAIOAuthPlugin fetch handler", () => { return { plugin, sdk, mockClient }; }; + const createRequestAwareManager = ( + accounts: Array<{ + index: number; + accountId?: string; + email?: string; + refreshToken: string; + }>, + overrides?: { + select?: ( + attemptedAccountKeys: ReadonlySet, + model?: string | null, + ) => (typeof accounts)[number] | null; + consumeToken?: ( + account: (typeof accounts)[number], + family: string, + model?: string | null, + ) => boolean; + getMinWaitTimeForFamily?: (family: string, model?: string | null) => number; + }, + ) => ({ + getAccountCount: () => accounts.length, + getCurrentOrNextForFamilyHybrid: () => accounts[0] ?? null, + getNextRequestEligibleForFamilyHybrid: ( + _family: string, + model?: string | null, + options?: { attemptedAccountKeys?: ReadonlySet }, + ) => { + if (overrides?.select) { + return overrides.select(options?.attemptedAccountKeys ?? new Set(), model); + } + return ( + accounts.find((account) => !(options?.attemptedAccountKeys?.has(`mock::${account.index}`) ?? false)) ?? null + ); + }, + getRequestAttemptKey: (account: { index: number }) => `mock::${account.index}`, + getSelectionExplainability: () => + accounts.map((account, index) => ({ + index, + enabled: true, + isCurrentForFamily: index === 0, + eligible: true, + reasons: ["eligible"], + healthScore: 100, + tokensAvailable: 50, + lastUsed: Date.now(), + })), + toAuthDetails: (account: { accountId?: string }) => ({ + type: "oauth" as const, + access: `access-${account.accountId ?? "unknown"}`, + refresh: `refresh-${account.accountId ?? "unknown"}`, + expires: Date.now() + 60_000, + }), + hasRefreshToken: () => true, + saveToDiskDebounced: () => {}, + updateFromAuth: () => {}, + clearAuthFailures: () => {}, + incrementAuthFailures: () => 1, + markAccountCoolingDown: () => {}, + markAccountsWithRefreshTokenCoolingDown: () => 1, + markRateLimitedWithReason: () => {}, + recordRateLimit: () => {}, + consumeToken: (account: (typeof accounts)[number], family: string, model?: string | null) => + overrides?.consumeToken?.(account, family, model) ?? true, + refundToken: () => {}, + markSwitched: () => {}, + removeAccount: () => {}, + removeAccountsWithSameRefreshToken: () => 1, + recordFailure: () => {}, + recordSuccess: () => {}, + getMinWaitTimeForFamily: (family: string, model?: string | null) => + overrides?.getMinWaitTimeForFamily?.(family, model) ?? 0, + shouldShowAccountToast: () => false, + markToastShown: () => {}, + setActiveIndex: (index: number) => accounts[index] ?? null, + getAccountsSnapshot: () => accounts, + }); + it("returns success response for successful fetch", async () => { globalThis.fetch = vi.fn().mockResolvedValue( new Response(JSON.stringify({ content: "test" }), { status: 200 }), @@ -2026,7 +2119,22 @@ describe("OpenAIOAuthPlugin fetch handler", () => { }); it("handles network errors and rotates to next account", async () => { - globalThis.fetch = vi.fn().mockRejectedValue(new Error("Network timeout")); + const { AccountManager } = await import("../lib/accounts.js"); + const fetchHelpers = await import("../lib/request/fetch-helpers.js"); + const accounts = [ + { index: 0, accountId: "acc-1", email: "user1@example.com", refreshToken: "refresh-1" }, + { index: 1, accountId: "acc-2", email: "user2@example.com", refreshToken: "refresh-2" }, + ]; + const customManager = createRequestAwareManager(accounts); + vi.spyOn(AccountManager, "loadFromDisk").mockResolvedValueOnce(customManager as never); + vi.mocked(fetchHelpers.createCodexHeaders).mockImplementation( + (_init, _accountId, accessToken) => + new Headers({ authorization: `Bearer ${String(accessToken)}` }), + ); + globalThis.fetch = vi + .fn() + .mockRejectedValueOnce(new Error("Network timeout")) + .mockResolvedValueOnce(new Response(JSON.stringify({ content: "ok" }), { status: 200 })); const { sdk } = await setupPlugin(); const response = await sdk.fetch!("https://api.openai.com/v1/chat", { @@ -2034,8 +2142,15 @@ describe("OpenAIOAuthPlugin fetch handler", () => { body: JSON.stringify({ model: "gpt-5.1" }), }); - expect(response.status).toBe(503); - expect(await response.text()).toContain("server errors or auth issues"); + expect(response.status).toBe(200); + expect(globalThis.fetch).toHaveBeenCalledTimes(2); + const accessTokens = vi.mocked(globalThis.fetch).mock.calls.map((call) => + new Headers((call[1] as RequestInit | undefined)?.headers).get("authorization"), + ); + expect(accessTokens).toEqual([ + "Bearer access-acc-1", + "Bearer access-acc-2", + ]); }); it("cools down the account when grouped auth removal removes zero entries", async () => { @@ -2079,11 +2194,25 @@ describe("OpenAIOAuthPlugin fetch handler", () => { ); }); - it("skips fetch when local token bucket is depleted", async () => { + it("uses the next eligible account when consumeToken loses the race after selection", async () => { const { AccountManager } = await import("../lib/accounts.js"); - const consumeSpy = vi.spyOn(AccountManager.prototype, "consumeToken").mockReturnValue(false); + const fetchHelpers = await import("../lib/request/fetch-helpers.js"); + const accounts = [ + { index: 0, accountId: "acc-1", email: "user1@example.com", refreshToken: "refresh-1" }, + { index: 1, accountId: "acc-2", email: "user2@example.com", refreshToken: "refresh-2" }, + ]; + const customManager = createRequestAwareManager(accounts, { + consumeToken: (account) => account.index !== 0, + }); + const recordRateLimitSpy = vi.fn(); + customManager.recordRateLimit = recordRateLimitSpy; + vi.spyOn(AccountManager, "loadFromDisk").mockResolvedValueOnce(customManager as never); + vi.mocked(fetchHelpers.createCodexHeaders).mockImplementation( + (_init, _accountId, accessToken) => + new Headers({ authorization: `Bearer ${String(accessToken)}` }), + ); globalThis.fetch = vi.fn().mockResolvedValue( - new Response(JSON.stringify({ content: "should-not-be-returned" }), { status: 200 }), + new Response(JSON.stringify({ content: "used-second-account" }), { status: 200 }), ); const { sdk } = await setupPlugin(); @@ -2092,10 +2221,134 @@ describe("OpenAIOAuthPlugin fetch handler", () => { body: JSON.stringify({ model: "gpt-5.1" }), }); + expect(globalThis.fetch).toHaveBeenCalledTimes(1); + expect(response.status).toBe(200); + const authorization = new Headers( + (vi.mocked(globalThis.fetch).mock.calls[0]?.[1] as RequestInit | undefined)?.headers, + ).get("authorization"); + expect(authorization).toBe("Bearer access-acc-2"); + expect(recordRateLimitSpy).not.toHaveBeenCalled(); + }); + + it("returns 429 without sleeping when all account waits are indefinite", async () => { + const { AccountManager } = await import("../lib/accounts.js"); + const accounts = [ + { index: 0, accountId: "acc-1", email: "user1@example.com", refreshToken: "refresh-1" }, + ]; + const customManager = createRequestAwareManager(accounts, { + select: () => null, + getMinWaitTimeForFamily: () => Number.POSITIVE_INFINITY, + }); + vi.spyOn(AccountManager, "loadFromDisk").mockResolvedValueOnce(customManager as never); + globalThis.fetch = vi.fn(); + + const { sdk } = await setupPlugin(); + const response = await sdk.fetch!("https://api.openai.com/v1/chat", { + method: "POST", + body: JSON.stringify({ model: "gpt-5.1" }), + }); + + expect(response.status).toBe(429); + await expect(response.text()).resolves.toContain("rate-limited indefinitely"); expect(globalThis.fetch).not.toHaveBeenCalled(); - expect(response.status).toBe(503); - expect(await response.text()).toContain("server errors or auth issues"); - consumeSpy.mockRestore(); + }); + + it("rotates to the next account after a 5xx response", async () => { + const { AccountManager } = await import("../lib/accounts.js"); + const fetchHelpers = await import("../lib/request/fetch-helpers.js"); + const accounts = [ + { index: 0, accountId: "acc-1", email: "user1@example.com", refreshToken: "refresh-1" }, + { index: 1, accountId: "acc-2", email: "user2@example.com", refreshToken: "refresh-2" }, + ]; + const customManager = createRequestAwareManager(accounts); + vi.spyOn(AccountManager, "loadFromDisk").mockResolvedValueOnce(customManager as never); + vi.mocked(fetchHelpers.createCodexHeaders).mockImplementation( + (_init, _accountId, accessToken) => + new Headers({ authorization: `Bearer ${String(accessToken)}` }), + ); + globalThis.fetch = vi + .fn() + .mockResolvedValueOnce(new Response("upstream bad", { status: 502 })) + .mockResolvedValueOnce(new Response(JSON.stringify({ content: "ok" }), { status: 200 })); + + const { sdk } = await setupPlugin(); + const response = await sdk.fetch!("https://api.openai.com/v1/chat", { + method: "POST", + body: JSON.stringify({ model: "gpt-5.1" }), + }); + + expect(response.status).toBe(200); + expect(globalThis.fetch).toHaveBeenCalledTimes(2); + const accessTokens = vi.mocked(globalThis.fetch).mock.calls.map((call) => + new Headers((call[1] as RequestInit | undefined)?.headers).get("authorization"), + ); + expect(accessTokens).toEqual([ + "Bearer access-acc-1", + "Bearer access-acc-2", + ]); + }); + + it("rotates to the next account after an empty response retry", async () => { + const { AccountManager } = await import("../lib/accounts.js"); + const fetchHelpers = await import("../lib/request/fetch-helpers.js"); + const accounts = [ + { index: 0, accountId: "acc-1", email: "user1@example.com", refreshToken: "refresh-1" }, + { index: 1, accountId: "acc-2", email: "user2@example.com", refreshToken: "refresh-2" }, + ]; + const customManager = createRequestAwareManager(accounts); + vi.spyOn(AccountManager, "loadFromDisk").mockResolvedValueOnce(customManager as never); + vi.mocked(fetchHelpers.createCodexHeaders).mockImplementation( + (_init, _accountId, accessToken) => + new Headers({ authorization: `Bearer ${String(accessToken)}` }), + ); + globalThis.fetch = vi + .fn() + .mockResolvedValueOnce( + new Response(JSON.stringify({ id: "resp-empty", object: "response" }), { status: 200 }), + ) + .mockResolvedValueOnce(new Response(JSON.stringify({ content: "ok" }), { status: 200 })); + + const { sdk } = await setupPlugin(); + const response = await sdk.fetch!("https://api.openai.com/v1/chat", { + method: "POST", + body: JSON.stringify({ model: "gpt-5.1" }), + }); + + expect(response.status).toBe(200); + expect(globalThis.fetch).toHaveBeenCalledTimes(2); + const accessTokens = vi.mocked(globalThis.fetch).mock.calls.map((call) => + new Headers((call[1] as RequestInit | undefined)?.headers).get("authorization"), + ); + expect(accessTokens).toEqual([ + "Bearer access-acc-1", + "Bearer access-acc-2", + ]); + }); + + it("returns a retryable wait when every account is locally token-bucket depleted", async () => { + const { AccountManager } = await import("../lib/accounts.js"); + const configModule = await import("../lib/config.js"); + const accounts = [ + { index: 0, accountId: "acc-1", email: "user1@example.com", refreshToken: "refresh-1" }, + { index: 1, accountId: "acc-2", email: "user2@example.com", refreshToken: "refresh-2" }, + ]; + const customManager = createRequestAwareManager(accounts, { + consumeToken: () => false, + getMinWaitTimeForFamily: () => 12_000, + }); + vi.spyOn(AccountManager, "loadFromDisk").mockResolvedValueOnce(customManager as never); + vi.spyOn(configModule, "getRetryAllAccountsRateLimited").mockReturnValueOnce(false); + globalThis.fetch = vi.fn(); + + const { sdk } = await setupPlugin(); + const response = await sdk.fetch!("https://api.openai.com/v1/chat", { + method: "POST", + body: JSON.stringify({ model: "gpt-5.1" }), + }); + + expect(globalThis.fetch).not.toHaveBeenCalled(); + expect(response.status).toBe(429); + expect(await response.text()).toContain("Try again in 12s"); }); it("falls back from gpt-5.4-pro to gpt-5.4 when unsupported fallback is enabled", async () => { @@ -2312,6 +2565,25 @@ describe("OpenAIOAuthPlugin fetch handler", () => { } return null; }, + getNextRequestEligibleForFamilyHybrid: ( + family: string, + currentModel?: string, + options?: { attemptedAccountKeys?: ReadonlySet }, + ) => { + const attemptedAccountKeys = options?.attemptedAccountKeys ?? new Set(); + for (let remaining = 0; remaining < customManager.getAccountCount(); remaining++) { + const candidate = customManager.getCurrentOrNextForFamilyHybrid( + family, + currentModel, + ); + if (!candidate) return null; + if (!attemptedAccountKeys.has(`mock::${candidate.index}`)) { + return candidate; + } + } + return null; + }, + getRequestAttemptKey: (account: { index: number }) => `mock::${account.index}`, getSelectionExplainability: () => [ { index: 0, diff --git a/test/rotation-integration.test.ts b/test/rotation-integration.test.ts index c3309dd8..544a215c 100644 --- a/test/rotation-integration.test.ts +++ b/test/rotation-integration.test.ts @@ -3,6 +3,12 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import { describe, it, expect, beforeAll, beforeEach, afterAll } from "vitest"; import { AccountManager } from "../lib/accounts.js"; +import { + getTokenTracker, + getHealthTracker, + DEFAULT_TOKEN_BUCKET_CONFIG, + DEFAULT_HEALTH_SCORE_CONFIG, +} from "../lib/rotation.js"; import { deduplicateAccounts, deduplicateAccountsByEmail, @@ -156,6 +162,44 @@ describe("Multi-Account Rotation Integration", () => { expect(account1?.index).toBe(1); }); + it("request selector skips attempted accounts and returns the next eligible account", () => { + const family: ModelFamily = "codex"; + const attemptedKey = manager.getRequestAttemptKey(manager.getAccountsSnapshot()[0]!); + + const selected = manager.getNextRequestEligibleForFamilyHybrid(family, undefined, { + attemptedAccountKeys: new Set([attemptedKey]), + }); + + expect(selected?.index).toBe(1); + }); + + it("clears token tracker state after account removal renumbers indices", () => { + getTokenTracker().drain(0, "codex", 50); + getHealthTracker().recordFailure(0, "codex"); + const firstAccount = manager.setActiveIndex(0); + expect(firstAccount).not.toBeNull(); + manager.removeAccount(firstAccount!); + expect(getTokenTracker().getTokens(0, "codex")).toBe(DEFAULT_TOKEN_BUCKET_CONFIG.maxTokens); + expect(getHealthTracker().getScore(0, "codex")).toBe(DEFAULT_HEALTH_SCORE_CONFIG.maxScore); + }); + + it("keeps a held survivor reference aligned with reindexed token buckets after removal", () => { + const family: ModelFamily = "codex"; + const firstAccount = manager.getCurrentOrNextForFamily(family); + const survivor = manager.getCurrentOrNextForFamily(family); + + expect(firstAccount?.index).toBe(0); + expect(survivor?.index).toBe(1); + + getTokenTracker().drain(1, "codex", DEFAULT_TOKEN_BUCKET_CONFIG.maxTokens - 1); + manager.removeAccount(firstAccount!); + + expect(survivor?.index).toBe(0); + expect(manager.consumeToken(survivor!, family)).toBe(true); + expect(getTokenTracker().getTokens(0, "codex")).toBe(0); + expect(getTokenTracker().getTokens(1, "codex")).toBe(DEFAULT_TOKEN_BUCKET_CONFIG.maxTokens); + }); + it("returns null when all accounts are rate-limited", () => { const family: ModelFamily = "codex"; diff --git a/test/rotation.test.ts b/test/rotation.test.ts index 29ead08f..eff462dd 100644 --- a/test/rotation.test.ts +++ b/test/rotation.test.ts @@ -149,6 +149,20 @@ describe("HealthScoreTracker", () => { }); }); + describe("reindexAfterRemoval", () => { + it("preserves model-qualified keys for surviving health entries", () => { + tracker.recordFailure(0, "codex:gpt-5.1"); + tracker.recordRateLimit(1, "codex:gpt-5.1"); + + tracker.reindexAfterRemoval(0); + + expect(tracker.getScore(0, "codex:gpt-5.1")).toBe( + DEFAULT_HEALTH_SCORE_CONFIG.maxScore + DEFAULT_HEALTH_SCORE_CONFIG.rateLimitDelta, + ); + expect(tracker.getScore(1, "codex:gpt-5.1")).toBe(DEFAULT_HEALTH_SCORE_CONFIG.maxScore); + }); + }); + describe("quotaKey isolation", () => { it("isolates scores by quotaKey", () => { tracker.recordFailure(0, "quota-a"); @@ -183,6 +197,26 @@ describe("TokenBucketTracker", () => { }); }); + describe("getWaitTimeUntilTokenAvailable", () => { + it("returns 0 for an untouched bucket", () => { + expect(tracker.getWaitTimeUntilTokenAvailable(0)).toBe(0); + }); + + it("returns Infinity when refill is disabled and the bucket is empty", () => { + tracker = new TokenBucketTracker({ + ...DEFAULT_TOKEN_BUCKET_CONFIG, + tokensPerMinute: 0, + }); + tracker.drain(0, undefined, DEFAULT_TOKEN_BUCKET_CONFIG.maxTokens); + expect(tracker.getWaitTimeUntilTokenAvailable(0)).toBe(Number.POSITIVE_INFINITY); + }); + + it("returns a finite wait after the bucket is partially drained", () => { + tracker.drain(0, undefined, DEFAULT_TOKEN_BUCKET_CONFIG.maxTokens); + expect(tracker.getWaitTimeUntilTokenAvailable(0)).toBeGreaterThan(0); + }); + }); + describe("tryConsume", () => { it("consumes one token and returns true", () => { const result = tracker.tryConsume(0); @@ -300,6 +334,20 @@ describe("TokenBucketTracker", () => { expect(tracker.getTokens(1)).toBe(DEFAULT_TOKEN_BUCKET_CONFIG.maxTokens); }); }); + + describe("reindexAfterRemoval", () => { + it("preserves model-qualified keys for surviving accounts", () => { + tracker.drain(0, "codex:gpt-5.1", 10); + tracker.drain(1, "codex:gpt-5.1", 20); + + tracker.reindexAfterRemoval(0); + + expect(tracker.getTokens(0, "codex:gpt-5.1")).toBe( + DEFAULT_TOKEN_BUCKET_CONFIG.maxTokens - 20, + ); + expect(tracker.getTokens(1, "codex:gpt-5.1")).toBe(DEFAULT_TOKEN_BUCKET_CONFIG.maxTokens); + }); + }); }); describe("selectHybridAccount", () => {