diff --git a/packages/billing/src/__tests__/balance-calculator.test.ts b/packages/billing/src/__tests__/balance-calculator.test.ts index b4c526aca0..4a123e57a2 100644 --- a/packages/billing/src/__tests__/balance-calculator.test.ts +++ b/packages/billing/src/__tests__/balance-calculator.test.ts @@ -404,147 +404,6 @@ describe('Balance Calculator - calculateUsageAndBalance', () => { }) }) -describe('shouldBlockFreeUserOverdraw', () => { - afterEach(() => { - clearMockedModules() - }) - - async function importModule() { - await mockModule('@codebuff/internal/db', () => ({ - default: {}, - })) - await mockModule('@codebuff/common/analytics', () => ({ - trackEvent: () => {}, - })) - return import('@codebuff/billing/balance-calculator') - } - - it('should block when exhausted free-tier user tries to consume', async () => { - const { shouldBlockFreeUserOverdraw } = await importModule() - expect( - shouldBlockFreeUserOverdraw([{ balance: 0, type: 'free' }], 100), - ).toBe(true) - }) - - it('should block when free-tier user balance is less than charge', async () => { - const { shouldBlockFreeUserOverdraw } = await importModule() - expect( - shouldBlockFreeUserOverdraw([{ balance: 50, type: 'free' }], 100), - ).toBe(true) - }) - - it('should not block when free-tier user has sufficient balance', async () => { - const { shouldBlockFreeUserOverdraw } = await importModule() - expect( - shouldBlockFreeUserOverdraw([{ balance: 500, type: 'free' }], 100), - ).toBe(false) - }) - - it('should not block when user has a subscription grant even with zero balance', async () => { - const { shouldBlockFreeUserOverdraw } = await importModule() - expect( - shouldBlockFreeUserOverdraw( - [ - { balance: 0, type: 'free' }, - { balance: 0, type: 'subscription' }, - ], - 100, - ), - ).toBe(false) - }) - - it('should not block when user has a purchase grant', async () => { - const { shouldBlockFreeUserOverdraw } = await importModule() - expect( - shouldBlockFreeUserOverdraw( - [ - { balance: 0, type: 'free' }, - { balance: 10, type: 'purchase' }, - ], - 100, - ), - ).toBe(false) - }) - - it('should not block when credits to charge is 0 (free-mode agent)', async () => { - const { shouldBlockFreeUserOverdraw } = await importModule() - expect( - shouldBlockFreeUserOverdraw([{ balance: 0, type: 'free' }], 0), - ).toBe(false) - }) - - it('should block referral-only user with insufficient credits', async () => { - const { shouldBlockFreeUserOverdraw } = await importModule() - expect( - shouldBlockFreeUserOverdraw([{ balance: 50, type: 'referral' }], 100), - ).toBe(true) - }) - - it('should block user in debt with no paid grants', async () => { - const { shouldBlockFreeUserOverdraw } = await importModule() - expect( - shouldBlockFreeUserOverdraw([{ balance: -100, type: 'free' }], 50), - ).toBe(true) - }) - - it('should aggregate balance across multiple unpaid grants', async () => { - const { shouldBlockFreeUserOverdraw } = await importModule() - // Total balance: 110, charge: 100 → not blocked - expect( - shouldBlockFreeUserOverdraw( - [ - { balance: 30, type: 'free' }, - { balance: 80, type: 'referral' }, - ], - 100, - ), - ).toBe(false) - }) -}) - -describe('InsufficientCreditsError', () => { - afterEach(() => { - clearMockedModules() - }) - - async function importModule() { - await mockModule('@codebuff/internal/db', () => ({ - default: {}, - })) - await mockModule('@codebuff/common/analytics', () => ({ - trackEvent: () => {}, - })) - return import('@codebuff/billing/balance-calculator') - } - - it('should be an instance of Error with the correct name and fields', async () => { - const { InsufficientCreditsError } = await importModule() - const err = new InsufficientCreditsError(-50, 200) - expect(err).toBeInstanceOf(Error) - expect(err).toBeInstanceOf(InsufficientCreditsError) - expect(err.name).toBe('InsufficientCreditsError') - expect(err.netBalance).toBe(-50) - expect(err.chargeAmount).toBe(200) - expect(err.message).toBe( - 'Insufficient credits for free-tier user: balance=-50, charge=200', - ) - }) - - it('should be exported from the billing barrel (@codebuff/billing)', async () => { - await mockModule('@codebuff/internal/db', () => ({ - default: {}, - })) - await mockModule('@codebuff/common/analytics', () => ({ - trackEvent: () => {}, - })) - const billing = await import('@codebuff/billing') - expect(typeof billing.InsufficientCreditsError).toBe('function') - const err = new billing.InsufficientCreditsError(0, 100) - expect(err).toBeInstanceOf(Error) - expect(err.name).toBe('InsufficientCreditsError') - }) -}) - describe('consumeFromOrderedGrants - credit consumption bugs', () => { // Regression tests for two compounding bugs: // 1. Pass 1 ("repay debt") was directionally wrong: consumption reduced debt instead of diff --git a/packages/billing/src/balance-calculator.ts b/packages/billing/src/balance-calculator.ts index 9d03528924..6c4f7d6820 100644 --- a/packages/billing/src/balance-calculator.ts +++ b/packages/billing/src/balance-calculator.ts @@ -38,45 +38,6 @@ export interface CreditConsumptionResult { fromPurchased: number } -/** - * Thrown when a free-tier user (no purchase or subscription grants) - * attempts to consume more credits than their balance allows. - */ -export class InsufficientCreditsError extends Error { - public readonly netBalance: number - public readonly chargeAmount: number - - constructor(netBalance: number, chargeAmount: number) { - super( - `Insufficient credits for free-tier user: balance=${netBalance}, charge=${chargeAmount}`, - ) - this.name = 'InsufficientCreditsError' - this.netBalance = netBalance - this.chargeAmount = chargeAmount - } -} - -/** - * Hard gate: blocks a charge when a free-tier user (no purchase or subscription - * grants) would overdraw their credit balance. This prevents credit-farming - * abuse where users consume far more than their granted credits. - * - * Users with purchase or subscription grants are always allowed through - * (they have a payment relationship and can accumulate debt). - */ -export function shouldBlockFreeUserOverdraw( - grants: Array<{ balance: number; type: string }>, - credits: number, -): boolean { - if (credits <= 0) return false - const hasPaidGrant = grants.some( - (g) => g.type === 'purchase' || g.type === 'subscription', - ) - if (hasPaidGrant) return false - const netBalance = grants.reduce((sum, g) => sum + g.balance, 0) - return netBalance < credits -} - // Add a minimal structural type that both `db` and `tx` satisfy type DbConn = Pick< typeof db, @@ -602,7 +563,18 @@ export async function consumeCreditsAndAddAgentStep(params: { const finishedAt = new Date() const latencyMs = finishedAt.getTime() - startTime.getTime() - // Track grant state for error logging (declared outside transaction for access in catch block) + // Test sentinel: short-circuit both credit consumption and the message + // insert. Matches prior behavior so agent-runtime unit tests that use this + // sentinel as userId don't hit the DB. + if (userId === TEST_USER_ID) { + return success({ + consumed: 0, + fromPurchased: 0, + agentStepId: 'test-step-id', + }) + } + + // Track grant state for error logging let activeGrantsSnapshot: Array<{ operation_id: string balance: number @@ -610,192 +582,86 @@ export async function consumeCreditsAndAddAgentStep(params: { priority: number expires_at: Date | null }> = [] - let phase: 'fetch_grants' | 'consume_credits' | 'insert_message' | 'complete' = - 'fetch_grants' + let phase: 'fetch_grants' | 'consume_credits' | 'complete' = 'fetch_grants' + + // Billing transaction. Isolated from the message insert below so that a + // billing failure never prevents us from recording that OpenRouter was paid. + // OR bills us the moment the upstream request completes; the audit row must + // exist regardless of whether we successfully charged the user. + let consumeResult: CreditConsumptionResult | null = null + let billingError: unknown = null + let lockWaitMs: number | undefined + let alreadyRecorded = false try { - const { result, lockWaitMs } = await withAdvisoryLockTransaction({ - callback: async (tx) => { - // Reset state at start of each transaction attempt (in case of retries) + const txOut = await withAdvisoryLockTransaction({ + callback: async (tx): Promise => { activeGrantsSnapshot = [] phase = 'fetch_grants' - const now = new Date() - - let consumeResult: CreditConsumptionResult | null = null - consumeCredits: { - if (byok) { - break consumeCredits - } - - const activeGrants = await getOrderedActiveGrantsForConsumption({ - ...params, - now, - conn: tx, - }) - - // Capture grant snapshot for error logging (includes expires_at for timing issues) - activeGrantsSnapshot = activeGrants.map((g) => ({ - operation_id: g.operation_id, - balance: g.balance, - type: g.type, - priority: g.priority, - expires_at: g.expires_at, - })) - - if (activeGrants.length === 0) { - logger.error( - { userId, credits }, - 'No active grants found to consume credits from', - ) - throw new Error('No active grants found') - } - - // Hard gate: block free-tier users from overdrawing credits. - // This prevents credit-farming abuse where users with only free/referral - // grants consume far beyond their balance due to the debt-repay bug - // in consumeFromOrderedGrants. - // (BYOK path already broke out of this `consumeCredits:` block above.) - if (shouldBlockFreeUserOverdraw(activeGrants, credits)) { - const netBalance = activeGrants.reduce( - (sum, g) => sum + g.balance, - 0, - ) - logger.warn( - { - userId, - credits, - netBalance, - grantTypes: [...new Set(activeGrants.map((g) => g.type))], - }, - 'Blocked free-tier user from overdrawing credits', - ) - throw new InsufficientCreditsError(netBalance, credits) - } - - phase = 'consume_credits' - consumeResult = await consumeFromOrderedGrants({ - ...params, - creditsToConsume: credits, - grants: activeGrants, - tx, - }) - - if (userId === TEST_USER_ID) { - return { ...consumeResult, agentStepId: 'test-step-id' } - } + if (byok) return null + + // Idempotency: if we've already recorded this messageId (e.g. a retry + // of the exact same upstream call), skip credit consumption. The + // advisory lock is keyed by userId so this check is serialized per + // user. messageId is globally unique in practice (OR generation id). + const existing = await tx + .select({ id: schema.message.id }) + .from(schema.message) + .where(eq(schema.message.id, messageId)) + .limit(1) + if (existing.length > 0) { + alreadyRecorded = true + return null } - phase = 'insert_message' - try { - await tx.insert(schema.message).values({ - id: messageId, - agent_id: agentId, - finished_at: new Date(), - client_id: clientId, - client_request_id: clientRequestId, - model, - reasoning_text: reasoningText, - response, - input_tokens: inputTokens, - cache_creation_input_tokens: cacheCreationInputTokens, - cache_read_input_tokens: cacheReadInputTokens, - reasoning_tokens: reasoningTokens, - output_tokens: outputTokens, - cost: cost.toString(), - credits, - byok, - latency_ms: latencyMs, - ttft_ms: ttftMs, - user_id: userId, - }) - } catch (error) { + const now = new Date() + const activeGrants = await getOrderedActiveGrantsForConsumption({ + ...params, + now, + conn: tx, + }) + + activeGrantsSnapshot = activeGrants.map((g) => ({ + operation_id: g.operation_id, + balance: g.balance, + type: g.type, + priority: g.priority, + expires_at: g.expires_at, + })) + + if (activeGrants.length === 0) { + // Non-fatal: user has no grants (not even a free one). Log loudly, + // let the message insert proceed so we at least have an audit row. logger.error( - { - messageId, - userId, - agentId, - error: getErrorObject(error), - pgDetails: extractPostgresErrorDetails(error), - }, - 'Failed to insert message', + { userId, credits, messageId }, + 'No active grants found to consume credits from', ) - throw error + return null } + phase = 'consume_credits' + const result = await consumeFromOrderedGrants({ + ...params, + creditsToConsume: credits, + grants: activeGrants, + tx, + }) phase = 'complete' - if (!consumeResult) { - consumeResult = { - consumed: 0, - fromPurchased: 0, - } - } - return { ...consumeResult, agentStepId: crypto.randomUUID() } + return result }, lockKey: `user:${userId}`, context: { userId, credits }, logger, }) - - // Log successful credit consumption with lock timing - logger.info( - { - userId, - messageId, - creditsConsumed: result.consumed, - creditsRequested: credits, - fromPurchased: result.fromPurchased, - lockWaitMs, - agentId, - model, - }, - 'Credits consumed and agent step recorded', - ) - - // Track credit consumption analytics - trackEvent({ - event: AnalyticsEvent.CREDIT_CONSUMED, - userId, - properties: { - creditsConsumed: result.consumed, - creditsRequested: credits, - fromPurchased: result.fromPurchased, - messageId, - agentId, - model, - source: 'consumeCreditsAndAddAgentStep', - inputTokens, - outputTokens, - reasoningTokens: reasoningTokens ?? 0, - cacheReadInputTokens, - latencyMs, - byok, - }, - logger, - }) - - await reportPurchasedCreditsToStripe({ - userId, - stripeCustomerId: params.stripeCustomerId, - purchasedCredits: result.fromPurchased, - logger, - eventId: messageId, - timestamp: finishedAt, - extraPayload: { - source: 'consumeCreditsAndAddAgentStep', - message_id: messageId, - }, - }) - - return success(result) + consumeResult = txOut.result + lockWaitMs = txOut.lockWaitMs } catch (error) { - // Extract detailed error information for debugging - const pgDetails = extractPostgresErrorDetails(error) - + billingError = error logger.error( { error: getErrorObject(error), - pgDetails, + pgDetails: extractPostgresErrorDetails(error), transactionContext: { phase, userId, @@ -816,10 +682,125 @@ export async function consumeCreditsAndAddAgentStep(params: { 0, ), }, - 'Error consuming credits and adding agent step', + 'Error consuming credits; proceeding with message insert', + ) + } + + // Idempotent replay: message row already exists. Skip the insert and the + // post-billing side effects (Stripe metering already fired on the first + // call; analytics were already emitted). + if (alreadyRecorded) { + logger.info( + { messageId, userId, agentId }, + 'Message already recorded; skipping duplicate consumeCreditsAndAddAgentStep', + ) + return success({ + consumed: 0, + fromPurchased: 0, + agentStepId: crypto.randomUUID(), + }) + } + + // Always record the message row. If billing failed, mark credits=0 so the + // audit row still exists — the row being absent is how OR costs leaked before. + const recordedCredits = billingError === null ? credits : 0 + + try { + await db + .insert(schema.message) + .values({ + id: messageId, + agent_id: agentId, + finished_at: new Date(), + client_id: clientId, + client_request_id: clientRequestId, + model, + reasoning_text: reasoningText, + response, + input_tokens: inputTokens, + cache_creation_input_tokens: cacheCreationInputTokens, + cache_read_input_tokens: cacheReadInputTokens, + reasoning_tokens: reasoningTokens, + output_tokens: outputTokens, + cost: cost.toString(), + credits: recordedCredits, + byok, + latency_ms: latencyMs, + ttft_ms: ttftMs, + user_id: userId, + }) + .onConflictDoNothing({ target: schema.message.id }) + } catch (error) { + logger.error( + { + messageId, + userId, + agentId, + error: getErrorObject(error), + pgDetails: extractPostgresErrorDetails(error), + }, + 'Failed to insert message row', ) - return failure(error) } + + if (billingError) { + return failure(billingError) + } + + const finalResult: CreditConsumptionResult = + consumeResult ?? { consumed: 0, fromPurchased: 0 } + + logger.info( + { + userId, + messageId, + creditsConsumed: finalResult.consumed, + creditsRequested: credits, + fromPurchased: finalResult.fromPurchased, + lockWaitMs, + agentId, + model, + }, + 'Credits consumed and agent step recorded', + ) + + trackEvent({ + event: AnalyticsEvent.CREDIT_CONSUMED, + userId, + properties: { + creditsConsumed: finalResult.consumed, + creditsRequested: credits, + fromPurchased: finalResult.fromPurchased, + messageId, + agentId, + model, + source: 'consumeCreditsAndAddAgentStep', + inputTokens, + outputTokens, + reasoningTokens: reasoningTokens ?? 0, + cacheReadInputTokens, + latencyMs, + byok, + }, + logger, + }) + + await reportPurchasedCreditsToStripe({ + userId, + stripeCustomerId: params.stripeCustomerId, + purchasedCredits: finalResult.fromPurchased, + logger, + eventId: messageId, + timestamp: finishedAt, + extraPayload: { + source: 'consumeCreditsAndAddAgentStep', + message_id: messageId, + }, + }) + + const agentStepId = + userId === TEST_USER_ID ? 'test-step-id' : crypto.randomUUID() + return success({ ...finalResult, agentStepId }) } /** diff --git a/scripts/check-fireworks-health.ts b/scripts/check-fireworks-health.ts new file mode 100644 index 0000000000..f534653c81 --- /dev/null +++ b/scripts/check-fireworks-health.ts @@ -0,0 +1,141 @@ +#!/usr/bin/env bun + +/** + * Scrape Fireworks metrics once and print the health snapshot the + * web server's monitor would produce. Useful for ad-hoc verification. + * + * Usage: + * bun scripts/check-fireworks-health.ts + * bun scripts/check-fireworks-health.ts --raw # also print raw metrics count + * bun scripts/check-fireworks-health.ts --json # machine-readable output + * + * Reads FIREWORKS_API_KEY from env (.env.local is loaded automatically by bun). + */ + +import { computeSnapshot, DEFAULT_HEALTH_THRESHOLDS } from '../web/src/server/fireworks-monitor/compute-health' +import { parsePrometheusText } from '../web/src/server/fireworks-monitor/parse-prometheus' +import { + FIREWORKS_ACCOUNT_ID, + FIREWORKS_DEPLOYMENT_MAP, +} from '../web/src/llm-api/fireworks-config' + +import type { DeploymentHealthStatus } from '../web/src/server/fireworks-monitor/types' + +const METRICS_URL = (accountId: string) => + `https://api.fireworks.ai/v1/accounts/${accountId}/metrics` + +async function scrapeFireworksMetrics(params: { apiKey: string; accountId: string }) { + const response = await fetch(METRICS_URL(params.accountId), { + headers: { Authorization: `Bearer ${params.apiKey}` }, + }) + if (!response.ok) { + const body = await response.text().catch(() => '') + throw new Error( + `Fireworks metrics scrape failed: ${response.status} ${response.statusText}${body ? ` — ${body.slice(0, 300)}` : ''}`, + ) + } + const text = await response.text() + return parsePrometheusText(text) +} + +const STATUS_COLORS: Record = { + healthy: '\x1b[32m', + degraded: '\x1b[33m', + unhealthy: '\x1b[31m', + unknown: '\x1b[90m', +} +const RESET = '\x1b[0m' + +function formatMs(value: number | null): string { + if (value === null) return 'n/a' + if (value >= 1000) return `${(value / 1000).toFixed(2)}s` + return `${Math.round(value)}ms` +} + +function formatPct(value: number, digits = 1): string { + return `${(value * 100).toFixed(digits)}%` +} + +async function main() { + const args = process.argv.slice(2) + const jsonMode = args.includes('--json') + const showRaw = args.includes('--raw') + + const apiKey = process.env.FIREWORKS_API_KEY + if (!apiKey) { + console.error('❌ FIREWORKS_API_KEY is not set. Add it to .env.local or export it.') + process.exit(1) + } + + const accountId = process.env.FIREWORKS_ACCOUNT_ID ?? FIREWORKS_ACCOUNT_ID + const deployments = Object.values(FIREWORKS_DEPLOYMENT_MAP) + + const scrapeStart = Date.now() + let metrics + try { + metrics = await scrapeFireworksMetrics({ apiKey, accountId }) + } catch (error) { + console.error('❌ Scrape failed:', error instanceof Error ? error.message : error) + process.exit(1) + } + const scrapeElapsedMs = Date.now() - scrapeStart + + const snapshot = computeSnapshot({ + metrics, + deployments, + thresholds: DEFAULT_HEALTH_THRESHOLDS, + }) + + if (jsonMode) { + console.log(JSON.stringify({ scrapeElapsedMs, sampleCount: metrics.samples.length, snapshot }, null, 2)) + return + } + + console.log('🔥 Fireworks Deployment Health') + console.log('='.repeat(78)) + console.log(`Account: accounts/${accountId}`) + console.log(`Scraped in: ${scrapeElapsedMs}ms`) + console.log(`Samples: ${metrics.samples.length}`) + console.log(`Overall: ${STATUS_COLORS[snapshot.overall]}${snapshot.overall.toUpperCase()}${RESET}`) + if (snapshot.lastError) console.log(`Last error: ${snapshot.lastError}`) + console.log() + + const modelByDeployment = Object.fromEntries( + Object.entries(FIREWORKS_DEPLOYMENT_MAP).map(([model, dep]) => [dep, model]), + ) + + for (const [deployment, health] of Object.entries(snapshot.deployments)) { + const model = modelByDeployment[deployment] ?? '(unknown model)' + const color = STATUS_COLORS[health.status] + console.log(`── ${color}${health.status.toUpperCase().padEnd(9)}${RESET} ${model}`) + console.log(` deployment: ${deployment}`) + console.log(` base model: ${health.baseModel ?? 'n/a'}`) + console.log(` request rate: ${health.metrics.requestRate.toFixed(3)} req/s`) + console.log(` error rate: ${health.metrics.errorRate.toFixed(3)} err/s (${formatPct(health.metrics.errorFraction)})`) + console.log(` concurrent requests: ${health.metrics.concurrentRequests.toFixed(2)}`) + console.log(` KV blocks utilization: ${formatPct(health.metrics.kvBlocksFraction, 0)}`) + console.log(` KV slots utilization: ${formatPct(health.metrics.kvSlotsFraction, 0)}`) + console.log(` p50 queue wait: ${formatMs(health.metrics.p50GenerationQueueMs)}`) + console.log(` p50 TTFT: ${formatMs(health.metrics.p50TimeToFirstTokenMs)}`) + if (health.reasons.length > 0) { + console.log(` reasons: ${health.reasons.join('; ')}`) + } + console.log() + } + + if (showRaw) { + console.log('── Metric name breakdown ─────────────────────────────') + const counts = new Map() + for (const s of metrics.samples) { + counts.set(s.name, (counts.get(s.name) ?? 0) + 1) + } + const sorted = [...counts.entries()].sort((a, b) => b[1] - a[1]) + for (const [name, count] of sorted) { + console.log(` ${String(count).padStart(4)} ${name}`) + } + } + + process.exit(snapshot.overall === 'unhealthy' ? 2 : 0) +} + +main() diff --git a/scripts/test-fireworks-cache-intervals.ts b/scripts/test-fireworks-cache-intervals.ts new file mode 100644 index 0000000000..0ed71193fd --- /dev/null +++ b/scripts/test-fireworks-cache-intervals.ts @@ -0,0 +1,720 @@ +#!/usr/bin/env bun + +/** + * Test script to measure how long Fireworks prompt caching persists across + * idle intervals. Sends an initial priming request, then waits various + * intervals before sending follow-up requests that share the same prefix. + * + * The script reports the cache hit rate after each wait interval so you can + * identify where prompt caching stops working (e.g. after 5 min, 30 min, etc.) + * + * Usage: + * bun scripts/test-fireworks-cache-intervals.ts [model] [--deployment] [--intervals=30,60,120,300,600,1200,1800] + * + * Models: + * glm-5.1 (default) — z-ai/glm-5.1 + * kimi-k2.5 — moonshotai/kimi-k2.5 + * minimax — minimax/minimax-m2.5 + * + * Flags: + * --deployment Use custom deployment instead of serverless + * --intervals=a,b,c Comma-separated wait intervals in SECONDS + * (default: 30,60,120,300,600,900,1500,2100) + * + * Examples: + * # Default glm-5.1 serverless with default intervals + * bun scripts/test-fireworks-cache-intervals.ts + * + * # Custom GLM deployment with a faster sweep + * bun scripts/test-fireworks-cache-intervals.ts glm-5.1 --deployment --intervals=30,60,120,300,600 + * + * # Long sweep up to 1 hour + * bun scripts/test-fireworks-cache-intervals.ts glm-5.1 --deployment --intervals=60,300,600,1200,1800,2700,3600 + */ + +export {} + +const FIREWORKS_BASE_URL = 'https://api.fireworks.ai/inference/v1' + +type ModelConfig = { + id: string + standardModel: string + deploymentModel: string + inputCostPerToken: number + cachedInputCostPerToken: number + outputCostPerToken: number +} + +const MODEL_CONFIGS: Record = { + 'glm-5.1': { + id: 'z-ai/glm-5.1', + standardModel: 'accounts/fireworks/models/glm-5p1', + deploymentModel: 'accounts/james-65d217/deployments/mjb4i7ea', + inputCostPerToken: 1.4 / 1_000_000, + cachedInputCostPerToken: 0.26 / 1_000_000, + outputCostPerToken: 4.4 / 1_000_000, + }, + 'kimi-k2.5': { + id: 'moonshotai/kimi-k2.5', + standardModel: 'accounts/fireworks/models/kimi-k2p5', + deploymentModel: 'accounts/james-65d217/deployments/mx8l5rq2', + inputCostPerToken: 0.6 / 1_000_000, + cachedInputCostPerToken: 0.1 / 1_000_000, + outputCostPerToken: 3.0 / 1_000_000, + }, + minimax: { + id: 'minimax/minimax-m2.5', + standardModel: 'accounts/fireworks/models/minimax-m2p5', + deploymentModel: 'accounts/james-65d217/deployments/lnfid5h9', + inputCostPerToken: 0.3 / 1_000_000, + cachedInputCostPerToken: 0.03 / 1_000_000, + outputCostPerToken: 1.2 / 1_000_000, + }, +} + +const DEFAULT_MODEL = 'glm-5.1' +const DEFAULT_INTERVALS_SEC = [30, 60, 120, 300, 600, 900, 1500, 2100] + +function parseArgs(): { + modelKey: string + useDeployment: boolean + intervals: number[] +} { + const args = process.argv.slice(2) + let modelKey = DEFAULT_MODEL + let useDeployment = false + let intervals = DEFAULT_INTERVALS_SEC + + for (const arg of args) { + if (arg === '--deployment') { + useDeployment = true + } else if (arg.startsWith('--intervals=')) { + const raw = arg.slice('--intervals='.length) + const parsed = raw + .split(',') + .map((s) => Number(s.trim())) + .filter((n) => Number.isFinite(n) && n >= 0) + if (parsed.length === 0) { + console.error(`❌ Invalid --intervals value: "${raw}"`) + process.exit(1) + } + intervals = parsed + } else if (!arg.startsWith('-')) { + modelKey = arg + } + } + + if (!MODEL_CONFIGS[modelKey]) { + console.error( + `❌ Unknown model: "${modelKey}". Available models: ${Object.keys(MODEL_CONFIGS).join(', ')}`, + ) + process.exit(1) + } + + return { modelKey, useDeployment, intervals } +} + +const { modelKey, useDeployment: USE_DEPLOYMENT, intervals: INTERVALS_SEC } = + parseArgs() +const MODEL = MODEL_CONFIGS[modelKey] +const FIREWORKS_MODEL = USE_DEPLOYMENT + ? MODEL.deploymentModel + : MODEL.standardModel +const INPUT_COST_PER_TOKEN = MODEL.inputCostPerToken +const CACHED_INPUT_COST_PER_TOKEN = MODEL.cachedInputCostPerToken +const OUTPUT_COST_PER_TOKEN = MODEL.outputCostPerToken + +const MAX_TOKENS = 50 // keep output small; we only care about cache behaviour + +// Stable session ID so all requests route to the same machine for prompt caching +const SESSION_ID = `cache-test-${Math.random().toString(36).slice(2, 10)}` + +// Unique seed per run so the cache prefix is specific to this script invocation +// (avoids hits from unrelated prior runs polluting results) +const SEED_STRING = `Run seed: ${Math.random().toString(36).slice(2, 10)}-${Date.now()}` + +function computeCost(usage: Record): number { + const inputTokens = + typeof usage.prompt_tokens === 'number' ? usage.prompt_tokens : 0 + const outputTokens = + typeof usage.completion_tokens === 'number' ? usage.completion_tokens : 0 + const promptDetails = usage.prompt_tokens_details as + | Record + | undefined + const cachedTokens = + typeof promptDetails?.cached_tokens === 'number' + ? promptDetails.cached_tokens + : 0 + const nonCachedInput = Math.max(0, inputTokens - cachedTokens) + + return ( + nonCachedInput * INPUT_COST_PER_TOKEN + + cachedTokens * CACHED_INPUT_COST_PER_TOKEN + + outputTokens * OUTPUT_COST_PER_TOKEN + ) +} + +// Large system prompt (~5k+ tokens) borrowed in spirit from test-fireworks-long.ts. +// All content is invariant across requests except the per-run SEED_STRING so +// prefix caching has a large shared prefix to hit on. +const SYSTEM_PROMPT = `You are an expert software architect, technical writer, and senior engineering consultant. +${SEED_STRING} +You always respond with brief, concise answers — one or two sentences at most. +You provide practical advice grounded in real-world engineering experience. + +Your areas of expertise include: +- Distributed systems design and architecture patterns (microservices, event-driven, CQRS, saga patterns, choreography vs orchestration, bulkhead pattern, circuit breaker, retry with exponential backoff, sidecar pattern, ambassador pattern, strangler fig pattern, anti-corruption layer) +- Database design and optimization (relational databases including PostgreSQL, MySQL, SQL Server; document databases including MongoDB, CouchDB, DynamoDB; graph databases including Neo4j, ArangoDB, JanusGraph; time-series databases including InfluxDB, TimescaleDB, QuestDB; wide-column stores including Cassandra, ScyllaDB, HBase; sharding strategies including hash-based, range-based, geographic; replication topologies including primary-replica, multi-primary, chain replication; connection pooling with PgBouncer, ProxySQL; query optimization techniques including index selection, query plan analysis, materialized views, covering indexes, partial indexes, expression indexes) +- Cloud infrastructure and deployment (AWS services including EC2, ECS, EKS, Lambda, S3, DynamoDB, RDS, Aurora, ElastiCache, CloudFront, Route53, IAM, VPC, SQS, SNS, Kinesis, Step Functions; GCP services including GKE, Cloud Run, Cloud Functions, BigQuery, Spanner, Pub/Sub, Cloud Storage; Azure services including AKS, Azure Functions, Cosmos DB, Azure SQL; container orchestration with Kubernetes including deployments, stateful sets, daemon sets, jobs, CronJobs, custom resource definitions, operators, Helm charts, Kustomize; infrastructure as code with Terraform, Pulumi, CloudFormation, CDK; service mesh with Istio, Linkerd, Consul Connect; load balancers including ALB, NLB, HAProxy, Nginx, Envoy; auto-scaling including HPA, VPA, KEDA, cluster autoscaler) +- Programming languages and their ecosystems (TypeScript/JavaScript with Node.js, Deno, Bun; Python with FastAPI, Django, Flask, SQLAlchemy, Pydantic; Rust with Tokio, Actix, Axum, Serde; Go with Gin, Echo, GORM; Java with Spring Boot, Quarkus, Micronaut, Hibernate; C++ with Boost, gRPC, Abseil; Kotlin with Ktor, Spring; Scala with Akka, ZIO, Cats Effect; Elixir with Phoenix, Ecto, LiveView; Haskell with Servant, Yesod, Persistent) +- API design principles (REST architectural constraints, Richardson Maturity Model, HATEOAS, content negotiation; GraphQL including schema design, resolvers, DataLoader, subscriptions, federation; gRPC including protobuf schema design, streaming patterns, interceptors, deadline propagation; WebSocket patterns for real-time communication; Server-Sent Events for unidirectional streaming; OpenAPI/Swagger specification; API versioning strategies including URL path, header, query parameter; pagination patterns including cursor-based, offset, keyset; rate limiting algorithms including token bucket, leaky bucket, sliding window; API gateway patterns) +- Security best practices (authentication protocols including OAuth 2.0, OIDC, SAML, WebAuthn, FIDO2; authorization models including RBAC, ABAC, ReBAC, PBAC; encryption at rest with AES-256, at transit with TLS 1.3; OWASP Top 10 including injection, broken authentication, sensitive data exposure, XXE, broken access control, security misconfiguration, XSS, insecure deserialization, known vulnerabilities, insufficient logging; Content Security Policy headers; CORS configuration; DDoS mitigation with WAF, rate limiting, geo-blocking; secret management with HashiCorp Vault, AWS Secrets Manager, GCP Secret Manager; certificate management including Let's Encrypt, cert-manager, mTLS; supply chain security with SBOM, Sigstore, dependency scanning) +- Performance optimization and profiling (caching strategies including write-through, write-behind, read-through, cache-aside, refresh-ahead; cache invalidation patterns; CDN configuration with CloudFront, Fastly, Cloudflare; connection pooling for HTTP, database, Redis; async patterns including event loops, worker threads, thread pools, coroutines; WebAssembly for compute-intensive operations; JIT compilation optimization; memory profiling with heap snapshots, allocation tracking; CPU profiling with flame graphs, perf, async-profiler; load testing with k6, Locust, Artillery, Gatling; performance budgets and real user monitoring) +- Testing methodologies (unit testing with Jest, Vitest, pytest, Go testing; integration testing with Testcontainers, Docker Compose; end-to-end testing with Playwright, Cypress, Selenium; property-based testing with fast-check, Hypothesis, QuickCheck; mutation testing with Stryker, PITest; snapshot testing; contract testing with Pact, Spring Cloud Contract; chaos engineering with Chaos Monkey, Litmus, Gremlin; load testing; fuzz testing with AFL, LibFuzzer; visual regression testing; accessibility testing) +- CI/CD pipelines and DevOps practices (GitHub Actions workflows, Jenkins pipelines, GitLab CI, CircleCI; ArgoCD for GitOps; deployment strategies including blue-green, canary, rolling update, recreate; feature flag systems with LaunchDarkly, Flagsmith, Unleash; trunk-based development; semantic versioning and conventional commits; artifact management with Artifactory, Nexus, ECR, GCR; infrastructure pipeline including Terraform plan/apply, drift detection; security scanning in CI including SAST, DAST, SCA, secret scanning; release management including changelogs, release notes, semantic-release) +- Monitoring and observability (metrics collection with Prometheus, StatsD, Datadog; visualization with Grafana, Kibana; distributed tracing with Jaeger, Zipkin, Tempo, OpenTelemetry; log aggregation with Elasticsearch, Loki, CloudWatch; alerting with PagerDuty, OpsGenie, VictorOps; SLO/SLI definition and error budgets; synthetic monitoring; real user monitoring; custom business metrics; incident management processes; postmortem culture; runbook automation) +- Data engineering and analytics (stream processing with Apache Kafka, Flink, Spark Streaming, Kinesis; batch processing with Spark, Hadoop, dbt; data warehousing with Snowflake, BigQuery, Redshift, ClickHouse; data lake architecture with Delta Lake, Apache Iceberg, Apache Hudi; ETL/ELT patterns; data quality frameworks with Great Expectations, dbt tests; schema evolution and backward compatibility; data governance and lineage tracking; real-time analytics with materialized views, OLAP cubes) +- Machine learning operations (model serving with TensorFlow Serving, TorchServe, Triton; MLOps pipelines with MLflow, Kubeflow, Metaflow; feature stores with Feast, Tecton; model monitoring for drift detection; A/B testing for ML models; experiment tracking; model versioning and registry; GPU cluster management; inference optimization with quantization, pruning, distillation) + +When providing responses, you follow these conventions: +- Keep answers extremely brief — one or two sentences maximum +- Be direct and actionable +- Use concrete examples over abstract advice +- Reference specific tools, libraries, or patterns by name + +Additional context for this conversation: +- We are working on a high-traffic web application that serves 50 million requests per day across 3 regions +- The system needs to handle bursty traffic patterns with 10x spikes during peak hours and flash sales +- Data consistency is important but eventual consistency is acceptable for most read paths with a 5-second staleness budget +- The team is experienced with TypeScript and Node.js but open to other technologies for specific use cases +- We use PostgreSQL 16 as our primary database with logical replication to read replicas and Redis 7 Cluster for caching +- The application is deployed on Kubernetes 1.29 in a multi-region setup across US-East-1, US-West-2, and EU-West-1 +- We need to maintain 99.95% uptime SLA with a target p99 latency of 150ms for API endpoints and 50ms for cached reads +- Cost optimization is a secondary concern after reliability and developer experience, but we spend $2.5M/year on infrastructure +- The codebase is approximately 750k lines of TypeScript across 80+ microservices with an additional 200k lines of Python for ML services +- We use an event-driven architecture with Kafka (3 clusters, 500+ topics) for inter-service communication with exactly-once semantics +- All services expose both REST (OpenAPI 3.1) and gRPC (protobuf v3) endpoints with automatic code generation +- We have a comprehensive monitoring stack with Prometheus (50M time series), Grafana (200+ dashboards), Jaeger, and PagerDuty +- Database migrations are managed with Drizzle ORM with automated rollback capabilities and zero-downtime schema changes +- The frontend is a Next.js 15 application with React Server Components, streaming SSR, and partial prerendering +- We use feature flags extensively via LaunchDarkly with 500+ active flags and automated cleanup for stale flags +- The CI/CD pipeline runs 5000+ tests (unit, integration, e2e) with a target of under 8 minutes using distributed execution on BuildKite +- We practice trunk-based development with short-lived feature branches, PR previews, and automated merge queues +- The team consists of 60 engineers across 10 squads, each owning 5-12 services with clear domain boundaries +- We use a mono-repo structure managed with Turborepo and Bun workspaces with remote caching +- All inter-service communication uses Protocol Buffers for serialization with a shared schema registry and backward compatibility enforcement +- We have a custom API gateway built on Envoy that handles authentication, rate limiting, request routing, and observability injection +- The system processes approximately 100TB of data per day through our analytics pipeline (Kafka → Flink → ClickHouse + BigQuery) +- Mobile clients communicate via a BFF (Backend for Frontend) layer with GraphQL federation across 12 subgraphs +- We have a custom feature flag evaluation engine that supports complex targeting rules including percentage rollouts, user segments, and geographic targeting +- The deployment pipeline supports multi-region blue-green deployments with automated rollback on SLO violation detection +- We use HashiCorp Vault for secret management with automatic rotation policies for database credentials, API keys, and certificates +- Our observability stack includes custom instrumentation for business metrics including revenue, conversion, engagement, and error rates +- The team follows an RFC process for architectural decisions with ADRs stored in the repo and reviewed by the architecture guild +- We have a dedicated platform team of 8 engineers that maintains shared infrastructure, developer tooling, and internal SDKs +- All services implement health checks (liveness + readiness), graceful shutdown handlers, and circuit breakers via a shared middleware library +- We use PgBouncer in transaction mode for PostgreSQL connection pooling (max 500 connections per region) and Redis Cluster with 6 shards per region +- The system supports multi-tenancy with tenant isolation at the database level using row-level security and per-tenant connection pools +- We have a custom schema registry for Kafka topic schemas with backward/forward compatibility validation and automated consumer migration +- Our error handling follows a structured error taxonomy with 200+ error codes, retry policies, and dead-letter queues for unprocessable messages +- We use structured logging with JSON format, correlation IDs, and trace context propagation across all services via OpenTelemetry +- The frontend uses a design system with 300+ components maintained by a dedicated UI platform team with visual regression testing via Chromatic +- We have automated performance regression testing that runs nightly against production-like data with 10% traffic replay +- Our incident response process includes automated runbook execution, escalation policies, and post-incident review within 48 hours +- We maintain a service catalog with dependency graphs, SLO definitions, on-call schedules, and cost attribution per service +- The platform supports A/B testing with Bayesian statistical significance calculations, multi-armed bandit allocation, and segment analysis +- We use GitOps for all infrastructure management with Terraform modules in a dedicated repo and Atlantis for plan/apply workflows +- Our security posture includes weekly penetration testing, continuous dependency scanning with Snyk, SAST with Semgrep, and DAST with OWASP ZAP +- We have a data mesh architecture for analytics with 15 domain-owned data products, each with defined SLAs and data contracts +- The system supports webhook delivery with at-least-once semantics, configurable retry policies (exponential backoff up to 24h), and delivery status tracking +- We use OpenTelemetry Collector for telemetry pipeline with custom processors for PII redaction, sampling, and cost-based routing +- Our caching strategy uses L1 (in-process LRU, 100MB per pod), L2 (Redis Cluster, 500GB), and L3 (CloudFront, 30+ edge locations) with coordinated invalidation +- We maintain backward compatibility for 3 API versions simultaneously with automated deprecation notices, usage tracking, and migration guides +- The platform includes a developer portal with API documentation, SDK generation, sandbox environments, and usage analytics +- We use Temporal for workflow orchestration across 20+ long-running business processes including order fulfillment, payment processing, and user onboarding +- Our ML platform serves 50+ models in production with A/B testing, shadow mode deployment, and automated retraining pipelines +- The search infrastructure uses Elasticsearch clusters with 500M+ documents, custom analyzers, and learning-to-rank models +- We have a notification system that delivers 10M+ messages daily across email, push, SMS, and in-app channels with template management and delivery optimization +- The billing system processes $50M+ in monthly transactions with Stripe integration, usage-based billing, and revenue recognition +- We use Crossplane for provisioning cloud resources as Kubernetes custom resources with drift detection and reconciliation +- Our edge computing layer uses Cloudflare Workers for geo-routing, A/B test assignment, and personalization at the edge +- The platform includes a custom query builder for internal dashboards that generates optimized SQL for ClickHouse and PostgreSQL +- We maintain a shared protobuf definition repository with 500+ message types, automated code generation for 6 languages, and breaking change detection` + +// The user message is shared across all requests so the full prefix +// (system + first user turn) is eligible for caching. Only the final +// short user prompt differs per request. +const SHARED_USER_PROMPT = + 'I have a high-level question about the system. Give me your short, direct opinion based on the context above.' + +// Short unique trailing questions so we still get a real response each time. +// Keep them short — they should not bust the cache of the shared prefix. +const TRAILING_QUESTIONS = [ + 'What is the single biggest reliability risk?', + 'What would you prioritize improving first?', + 'Where is the biggest cost-saving opportunity?', + 'What architectural debt worries you most?', + 'Which SLO is likely most fragile?', + 'What is your top observability blind spot?', + 'Where is latency most likely to regress?', + 'What is the riskiest deployment pattern here?', + 'Which subsystem would you most worry about scaling?', + 'What is your top security concern?', + 'Where is the data consistency story weakest?', + 'What would you refactor first given the team size?', + 'Which failure mode is most likely under-tested?', + 'Where is on-call pain most likely to come from?', + 'What cache layer is most likely to cause an incident?', + 'Which third-party dependency concerns you most?', + 'What metric would you add to the dashboard first?', + 'Where would you invest engineering time next quarter?', + 'What is the biggest knowledge silo risk?', + 'Which migration would you delay if resources were tight?', +] + +interface ConversationMessage { + role: string + content: string +} + +interface TurnResult { + label: string + waitedSec: number + usage: Record | null + elapsedMs: number + ttftMs?: number + outputTokens: number + cost: number + inputTokens: number + cachedTokens: number + cacheRate: number + error?: string +} + +async function sendRequest( + label: string, + waitedSec: number, + apiKey: string, + trailingQuestion: string, +): Promise { + const messages: ConversationMessage[] = [ + { role: 'system', content: SYSTEM_PROMPT }, + { role: 'user', content: SHARED_USER_PROMPT }, + // A stable first assistant turn so the "prefix" grows — Fireworks will + // cache system + user + assistant. Then we append a fresh user question. + { + role: 'assistant', + content: + 'Understood. Ask the question and I will respond with a concise, opinionated answer.', + }, + { role: 'user', content: trailingQuestion }, + ] + + const startTime = Date.now() + let ttftMs: number | undefined + + const response = await fetch(`${FIREWORKS_BASE_URL}/chat/completions`, { + method: 'POST', + headers: { + Authorization: `Bearer ${apiKey}`, + 'Content-Type': 'application/json', + 'x-session-affinity': SESSION_ID, + }, + body: JSON.stringify({ + model: FIREWORKS_MODEL, + messages, + max_tokens: MAX_TOKENS, + stream: true, + stream_options: { include_usage: true }, + }), + }) + + if (!response.ok) { + const errorText = await response.text() + console.error(`❌ ${label}: API returned ${response.status}: ${errorText}`) + return { + label, + waitedSec, + usage: null, + elapsedMs: Date.now() - startTime, + outputTokens: 0, + cost: 0, + inputTokens: 0, + cachedTokens: 0, + cacheRate: 0, + error: `${response.status}: ${errorText}`, + } + } + + const reader = response.body?.getReader() + if (!reader) { + return { + label, + waitedSec, + usage: null, + elapsedMs: Date.now() - startTime, + outputTokens: 0, + cost: 0, + inputTokens: 0, + cachedTokens: 0, + cacheRate: 0, + error: 'no reader', + } + } + + const decoder = new TextDecoder() + let streamUsage: Record | null = null + let firstContentChunkTime: number | undefined + let streamContent = '' + + let done = false + while (!done) { + const result = await reader.read() + done = result.done + if (done) break + + const text = decoder.decode(result.value, { stream: true }) + const lines = text.split('\n').filter((l) => l.startsWith('data: ')) + + for (const line of lines) { + const raw = line.slice('data: '.length) + if (raw === '[DONE]') continue + + try { + const chunk = JSON.parse(raw) + const delta = chunk.choices?.[0]?.delta + if (delta && firstContentChunkTime === undefined) { + firstContentChunkTime = Date.now() + ttftMs = firstContentChunkTime - startTime + } + if (delta?.content) streamContent += delta.content + if (chunk.usage) streamUsage = chunk.usage + } catch { + // skip non-JSON lines + } + } + } + + const elapsedMs = Date.now() - startTime + const inputTokens = + streamUsage && typeof streamUsage.prompt_tokens === 'number' + ? streamUsage.prompt_tokens + : 0 + const outputTokens = + streamUsage && typeof streamUsage.completion_tokens === 'number' + ? streamUsage.completion_tokens + : 0 + const promptDetails = streamUsage?.prompt_tokens_details as + | Record + | undefined + const cachedTokens = + typeof promptDetails?.cached_tokens === 'number' + ? promptDetails.cached_tokens + : 0 + const cacheRate = inputTokens > 0 ? (cachedTokens / inputTokens) * 100 : 0 + const cost = streamUsage ? computeCost(streamUsage) : 0 + + const waitedStr = + waitedSec > 0 ? `after ${formatDuration(waitedSec)} wait` : 'cold prime' + console.log( + ` ✅ ${label.padEnd(28)} | ${waitedStr.padEnd(22)} | ${( + elapsedMs / 1000 + ) + .toFixed(2) + .padStart(5)}s | TTFT ${ + ttftMs !== undefined ? (ttftMs / 1000).toFixed(2) + 's' : 'n/a' + } | in ${String(inputTokens).padStart(5)} (cached ${String( + cachedTokens, + ).padStart(5)}, ${cacheRate.toFixed(1).padStart(5)}%) | out ${String( + outputTokens, + ).padStart(3)} | $${cost.toFixed(6)}`, + ) + if (streamContent) { + const preview = streamContent.replace(/\s+/g, ' ').slice(0, 120) + console.log( + ` ↳ ${preview}${streamContent.length > 120 ? '...' : ''}`, + ) + } + + return { + label, + waitedSec, + usage: streamUsage, + elapsedMs, + ttftMs, + outputTokens, + cost, + inputTokens, + cachedTokens, + cacheRate, + } +} + +function formatDuration(sec: number): string { + if (sec < 60) return `${sec}s` + const m = Math.floor(sec / 60) + const s = sec % 60 + if (s === 0) return `${m}m` + return `${m}m${s}s` +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +async function sleepWithProgress(totalMs: number, label: string) { + if (totalMs <= 0) return + const start = Date.now() + const end = start + totalMs + // Print a dot every 10 seconds so the user knows we're still alive + process.stdout.write(` ⏳ ${label}: waiting ${formatDuration(Math.round(totalMs / 1000))}`) + while (Date.now() < end) { + const remainingMs = end - Date.now() + const sliceMs = Math.min(10_000, remainingMs) + await sleep(sliceMs) + const elapsedSec = Math.round((Date.now() - start) / 1000) + process.stdout.write(`. (${elapsedSec}s)`) + } + process.stdout.write('\n') +} + +function printRollingSummary( + results: TurnResult[], + plannedIntervalsSec: number[], +) { + const probes = results.slice(1) // skip priming + if (probes.length === 0) return + const completed = probes.length + const total = plannedIntervalsSec.length + const cumulativeWaitSec = plannedIntervalsSec + .slice(0, completed) + .reduce((a, b) => a + b, 0) + const remainingWaitSec = plannedIntervalsSec + .slice(completed) + .reduce((a, b) => a + b, 0) + + const lastHit = [...probes].reverse().find((r) => r.cachedTokens > 0) + const firstMiss = probes.find( + (r) => r.cachedTokens === 0 && !r.error && r.inputTokens > 0, + ) + + console.log( + ` 📊 Progress: ${completed}/${total} probes done — cumulative idle ${formatDuration( + cumulativeWaitSec, + )}, ${formatDuration(remainingWaitSec)} of waits remaining.`, + ) + if (lastHit && !firstMiss) { + console.log( + ` Cache still alive — last hit after ${formatDuration(lastHit.waitedSec)} idle.`, + ) + } else if (lastHit && firstMiss) { + // Intervals are usually monotonically increasing, but guard against + // user-supplied non-monotonic intervals by ordering the bounds. + const lo = Math.min(lastHit.waitedSec, firstMiss.waitedSec) + const hi = Math.max(lastHit.waitedSec, firstMiss.waitedSec) + console.log( + ` Estimated cache TTL so far: between ${formatDuration(lo)} (hit) and ${formatDuration(hi)} (miss).`, + ) + } else if (firstMiss) { + console.log( + ` No cache hits observed yet — first miss after ${formatDuration(firstMiss.waitedSec)} idle.`, + ) + } +} + +async function main() { + const apiKey = process.env.FIREWORKS_API_KEY + if (!apiKey) { + console.error( + '❌ FIREWORKS_API_KEY is not set. Add it to .env.local or pass it directly.', + ) + process.exit(1) + } + + const totalWaitSec = INTERVALS_SEC.reduce((a, b) => a + b, 0) + + console.log('🧪 Fireworks Prompt Cache Interval Test') + console.log('='.repeat(80)) + console.log( + `Model: ${MODEL.id} (${FIREWORKS_MODEL}) [${USE_DEPLOYMENT ? 'deployment' : 'serverless'}]`, + ) + console.log(`Base URL: ${FIREWORKS_BASE_URL}`) + console.log(`Session ID: ${SESSION_ID} (x-session-affinity header)`) + console.log(`Seed: ${SEED_STRING}`) + console.log(`Max tokens: ${MAX_TOKENS}`) + console.log( + `Intervals: ${INTERVALS_SEC.map(formatDuration).join(', ')} (total wait ≈ ${formatDuration(totalWaitSec)})`, + ) + console.log('='.repeat(80)) + console.log() + console.log( + 'Plan: send a priming request, then for each interval wait and re-send', + ) + console.log( + 'a request that shares the full system/user/assistant prefix. Each test', + ) + console.log( + 'also refreshes the cache, so interval N measures persistence after', + ) + console.log( + 'the previous request. If caching is disabled or expired, cached_tokens', + ) + console.log('will drop to ~0 and cache% will collapse.') + console.log() + + const results: TurnResult[] = [] + + // Prime the cache + const priming = await sendRequest( + 'Priming (0)', + 0, + apiKey, + TRAILING_QUESTIONS[0], + ) + results.push(priming) + + // Print an early verdict from priming so you know whether caching is + // even plausible before sitting through the first wait. + console.log() + if (priming.error) { + console.log( + ` ⚠️ Priming request errored (${priming.error}). Subsequent probes will probably also fail.`, + ) + } else { + console.log( + ` ℹ️ Priming prefix was ${priming.inputTokens} tokens (cached ${priming.cachedTokens} on the priming call itself — expected to be 0 on a cold run).`, + ) + } + console.log() + + let firstMissHintPrinted = false + for (let i = 0; i < INTERVALS_SEC.length; i++) { + const waitSec = INTERVALS_SEC[i] + const questionIdx = (i + 1) % TRAILING_QUESTIONS.length + const label = `Probe ${i + 1}/${INTERVALS_SEC.length}` + await sleepWithProgress(waitSec * 1000, label) + const result = await sendRequest( + label, + waitSec, + apiKey, + TRAILING_QUESTIONS[questionIdx], + ) + results.push(result) + printRollingSummary(results, INTERVALS_SEC) + + const isMiss = + result.cachedTokens === 0 && !result.error && result.inputTokens > 0 + if (isMiss) { + console.log( + ` 🔴 Cache MISS after ${formatDuration(waitSec)} idle. The cache likely expired.`, + ) + if (!firstMissHintPrinted) { + console.log( + ` (Ctrl-C now if you don't want to wait through the remaining probes.)`, + ) + firstMissHintPrinted = true + } + } else if (result.cachedTokens > 0) { + console.log( + ` 🟢 Cache HIT after ${formatDuration(waitSec)} idle (${result.cacheRate.toFixed(1)}%).`, + ) + } + console.log() + } + + // ── Summary ── + console.log() + console.log('━'.repeat(100)) + console.log('SUMMARY — cache hit rate vs. idle time since previous request') + console.log('━'.repeat(100)) + console.log() + console.log( + ' Label | Waited | Input | Cached | Cache% | TTFT | Elapsed | Cost', + ) + console.log(' ' + '-'.repeat(95)) + + let totalCost = 0 + for (const r of results) { + const waited = r.waitedSec > 0 ? formatDuration(r.waitedSec) : '—' + const cacheStr = `${r.cacheRate.toFixed(1)}%` + const ttft = + r.ttftMs !== undefined ? `${(r.ttftMs / 1000).toFixed(2)}s` : 'n/a' + const elapsed = `${(r.elapsedMs / 1000).toFixed(2)}s` + totalCost += r.cost + + const indicator = + r.cachedTokens > 0 + ? r.cacheRate >= 50 + ? '🟢' + : '🟡' + : r.waitedSec === 0 + ? '⬜' + : '🔴' + + console.log( + ` ${indicator} ${r.label.padEnd(22)} | ${waited.padStart(10)} | ${String(r.inputTokens).padStart(6)} | ${String(r.cachedTokens).padStart(6)} | ${cacheStr.padStart(7)} | ${ttft.padStart(7)} | ${elapsed.padStart(7)} | $${r.cost.toFixed(6)}${r.error ? ' [ERR]' : ''}`, + ) + } + console.log(' ' + '-'.repeat(95)) + console.log(` Total cost: $${totalCost.toFixed(6)}`) + console.log() + + // ── Analysis ── + console.log('━'.repeat(100)) + console.log('ANALYSIS') + console.log('━'.repeat(100)) + console.log() + + const probes = results.slice(1) // skip priming + const firstMissIdx = probes.findIndex((r) => r.cachedTokens === 0) + const lastHit = [...probes].reverse().find((r) => r.cachedTokens > 0) + const firstMiss = firstMissIdx >= 0 ? probes[firstMissIdx] : null + + if (lastHit) { + console.log( + ` ✅ Last successful cache hit was after ${formatDuration(lastHit.waitedSec)} idle`, + ) + console.log( + ` (cached ${lastHit.cachedTokens}/${lastHit.inputTokens} tokens = ${lastHit.cacheRate.toFixed(1)}%)`, + ) + } else { + console.log( + ' ⚠️ No probe returned any cached tokens — caching may be disabled for this deployment.', + ) + } + + if (firstMiss) { + console.log( + ` 🔴 First cache miss was after ${formatDuration(firstMiss.waitedSec)} idle (cache% = ${firstMiss.cacheRate.toFixed(1)}%)`, + ) + console.log( + ` ⏱ Estimated cache TTL is between ${formatDuration( + lastHit ? lastHit.waitedSec : 0, + )} and ${formatDuration(firstMiss.waitedSec)}.`, + ) + } else { + console.log( + ' 🟢 No cache misses observed across all tested intervals — cache persisted the full duration.', + ) + } + + console.log() + console.log('Notes:') + console.log( + ' • Cache misses on a serverless deployment can also be caused by request', + ) + console.log( + ' routing to a different node; we use x-session-affinity to mitigate this,', + ) + console.log( + ' but it is not a hard guarantee. Re-run if results look noisy.', + ) + console.log( + ' • Each probe refreshes the cache, so interval N measures persistence', + ) + console.log(' since the previous request, not since the priming request.') + console.log() + console.log('Done!') +} + +main() diff --git a/web/instrumentation.ts b/web/instrumentation.ts index 6ce22befe4..b38ccc27f3 100644 --- a/web/instrumentation.ts +++ b/web/instrumentation.ts @@ -8,6 +8,7 @@ * causing Render's proxy to return 502 Bad Gateway errors. */ +import { startFireworksMonitor } from '@/server/fireworks-monitor/monitor' import { logger } from '@/util/logger' export function register() { @@ -45,4 +46,6 @@ export function register() { }) logger.info({}, '[Instrumentation] Global error handlers registered') + + startFireworksMonitor() } diff --git a/web/src/app/api/admin/fireworks-health/__tests__/fireworks-health.test.ts b/web/src/app/api/admin/fireworks-health/__tests__/fireworks-health.test.ts new file mode 100644 index 0000000000..7cf42b10f5 --- /dev/null +++ b/web/src/app/api/admin/fireworks-health/__tests__/fireworks-health.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, test } from 'bun:test' +import { NextResponse } from 'next/server' + +import { getFireworksHealth } from '../_get' + +import type { FireworksHealthSnapshot } from '@/server/fireworks-monitor/types' + +function snapshot( + overall: FireworksHealthSnapshot['overall'], +): FireworksHealthSnapshot { + return { + scrapedAt: 1000, + ageMs: 0, + overall, + deployments: {}, + lastError: null, + } +} + +const allowAdmin = async () => ({ id: 'admin-user', email: 'admin@example.com' }) +const forbidAdmin = async () => + NextResponse.json({ error: 'Forbidden - not an admin' }, { status: 403 }) + +describe('/api/admin/fireworks-health', () => { + test('returns 403 when caller is not an admin', async () => { + const response = await getFireworksHealth({ + getSnapshot: () => snapshot('healthy'), + checkAdminAuth: forbidAdmin, + }) + expect(response.status).toBe(403) + }) + + test('returns 200 with snapshot when overall is healthy', async () => { + const response = await getFireworksHealth({ + getSnapshot: () => snapshot('healthy'), + checkAdminAuth: allowAdmin, + }) + expect(response.status).toBe(200) + const body = await response.json() + expect(body.overall).toBe('healthy') + }) + + test('returns 200 when degraded', async () => { + const response = await getFireworksHealth({ + getSnapshot: () => snapshot('degraded'), + checkAdminAuth: allowAdmin, + }) + expect(response.status).toBe(200) + }) + + test('returns 200 when unknown (no scrape yet)', async () => { + const response = await getFireworksHealth({ + getSnapshot: () => snapshot('unknown'), + checkAdminAuth: allowAdmin, + }) + expect(response.status).toBe(200) + }) + + test('returns 503 when overall is unhealthy', async () => { + const response = await getFireworksHealth({ + getSnapshot: () => snapshot('unhealthy'), + checkAdminAuth: allowAdmin, + }) + expect(response.status).toBe(503) + }) +}) diff --git a/web/src/app/api/admin/fireworks-health/_get.ts b/web/src/app/api/admin/fireworks-health/_get.ts new file mode 100644 index 0000000000..1b40b5cb41 --- /dev/null +++ b/web/src/app/api/admin/fireworks-health/_get.ts @@ -0,0 +1,22 @@ +import { NextResponse } from 'next/server' + +import type { FireworksHealthSnapshot } from '@/server/fireworks-monitor/types' + +export interface FireworksHealthDeps { + getSnapshot: () => FireworksHealthSnapshot + checkAdminAuth: () => Promise +} + +export async function getFireworksHealth({ + getSnapshot, + checkAdminAuth, +}: FireworksHealthDeps) { + const authResult = await checkAdminAuth() + if (authResult instanceof NextResponse) { + return authResult + } + + const snapshot = getSnapshot() + const httpStatus = snapshot.overall === 'unhealthy' ? 503 : 200 + return NextResponse.json(snapshot, { status: httpStatus }) +} diff --git a/web/src/app/api/admin/fireworks-health/route.ts b/web/src/app/api/admin/fireworks-health/route.ts new file mode 100644 index 0000000000..2307c4398e --- /dev/null +++ b/web/src/app/api/admin/fireworks-health/route.ts @@ -0,0 +1,11 @@ +import { getFireworksHealth } from './_get' + +import { checkAdminAuth } from '@/lib/admin-auth' +import { getFireworksHealthSnapshot } from '@/server/fireworks-monitor/monitor' + +export const GET = () => { + return getFireworksHealth({ + getSnapshot: getFireworksHealthSnapshot, + checkAdminAuth, + }) +} diff --git a/web/src/app/api/v1/chat/completions/__tests__/completions.test.ts b/web/src/app/api/v1/chat/completions/__tests__/completions.test.ts index 803b730ba7..ea74ad2569 100644 --- a/web/src/app/api/v1/chat/completions/__tests__/completions.test.ts +++ b/web/src/app/api/v1/chat/completions/__tests__/completions.test.ts @@ -135,6 +135,13 @@ describe('/api/v1/chat/completions POST endpoint', () => { status: 'running', } } + if (runId === 'run-free') { + return { + // Real free-mode allowlisted agent (see FREE_MODE_AGENT_MODELS). + agent_id: 'base2-free', + status: 'running', + } + } if (runId === 'run-completed') { return { agent_id: 'agent-123', @@ -529,10 +536,10 @@ describe('/api/v1/chat/completions POST endpoint', () => { method: 'POST', headers: { Authorization: 'Bearer test-api-key-new-free' }, body: JSON.stringify({ - model: 'test/test-model', + model: 'z-ai/glm-5.1', stream: false, codebuff_metadata: { - run_id: 'run-123', + run_id: 'run-free', client_id: 'test-client-id-123', cost_mode: 'free', }, @@ -562,10 +569,10 @@ describe('/api/v1/chat/completions POST endpoint', () => { method: 'POST', headers: { Authorization: 'Bearer test-api-key-no-credits' }, body: JSON.stringify({ - model: 'test/test-model', + model: 'z-ai/glm-5.1', stream: false, codebuff_metadata: { - run_id: 'run-123', + run_id: 'run-free', client_id: 'test-client-id-123', cost_mode: 'free', }, @@ -587,6 +594,116 @@ describe('/api/v1/chat/completions POST endpoint', () => { expect(response.status).toBe(200) }) + + it('rejects free-mode requests using a non-allowlisted model (e.g. Opus)', async () => { + const req = new NextRequest( + 'http://localhost:3000/api/v1/chat/completions', + { + method: 'POST', + headers: { Authorization: 'Bearer test-api-key-new-free' }, + body: JSON.stringify({ + // Expensive model the attacker wants for free. + model: 'anthropic/claude-4.7-opus', + stream: true, + codebuff_metadata: { + run_id: 'run-free', + client_id: 'test-client-id-123', + cost_mode: 'free', + }, + }), + }, + ) + + const response = await postChatCompletions({ + req, + getUserInfoFromApiKey: mockGetUserInfoFromApiKey, + logger: mockLogger, + trackEvent: mockTrackEvent, + getUserUsageData: mockGetUserUsageData, + getAgentRunFromId: mockGetAgentRunFromId, + fetch: mockFetch, + insertMessageBigquery: mockInsertMessageBigquery, + loggerWithContext: mockLoggerWithContext, + }) + + expect(response.status).toBe(403) + const body = await response.json() + expect(body.error).toBe('free_mode_invalid_agent_model') + }) + + it('rejects free-mode requests with an allowlisted agent but a model outside its allowed set', async () => { + // agent=base2-free is allowlisted, but Opus is not in its allowed + // model set. This is the spoofing variant of the attack where the + // caller picks a real free-mode agentId to try to sneak past the gate. + const req = new NextRequest( + 'http://localhost:3000/api/v1/chat/completions', + { + method: 'POST', + headers: { Authorization: 'Bearer test-api-key-new-free' }, + body: JSON.stringify({ + model: 'anthropic/claude-4.7-opus', + stream: true, + codebuff_metadata: { + run_id: 'run-free', + client_id: 'test-client-id-123', + cost_mode: 'free', + }, + }), + }, + ) + + const response = await postChatCompletions({ + req, + getUserInfoFromApiKey: mockGetUserInfoFromApiKey, + logger: mockLogger, + trackEvent: mockTrackEvent, + getUserUsageData: mockGetUserUsageData, + getAgentRunFromId: mockGetAgentRunFromId, + fetch: mockFetch, + insertMessageBigquery: mockInsertMessageBigquery, + loggerWithContext: mockLoggerWithContext, + }) + + expect(response.status).toBe(403) + const body = await response.json() + expect(body.error).toBe('free_mode_invalid_agent_model') + }) + + it('rejects free-mode requests where agentId is not in the allowlist at all', async () => { + // run-123 points to agent-123, which is not a free-mode agent. + const req = new NextRequest( + 'http://localhost:3000/api/v1/chat/completions', + { + method: 'POST', + headers: { Authorization: 'Bearer test-api-key-new-free' }, + body: JSON.stringify({ + model: 'z-ai/glm-5.1', + stream: true, + codebuff_metadata: { + run_id: 'run-123', + client_id: 'test-client-id-123', + cost_mode: 'free', + }, + }), + }, + ) + + const response = await postChatCompletions({ + req, + getUserInfoFromApiKey: mockGetUserInfoFromApiKey, + logger: mockLogger, + trackEvent: mockTrackEvent, + getUserUsageData: mockGetUserUsageData, + getAgentRunFromId: mockGetAgentRunFromId, + fetch: mockFetch, + insertMessageBigquery: mockInsertMessageBigquery, + loggerWithContext: mockLoggerWithContext, + }) + + expect(response.status).toBe(403) + const body = await response.json() + expect(body.error).toBe('free_mode_invalid_agent_model') + }) }) describe('Successful responses', () => { @@ -734,10 +851,10 @@ describe('/api/v1/chat/completions POST endpoint', () => { method: 'POST', headers: { Authorization: 'Bearer test-api-key-123' }, body: JSON.stringify({ - model: 'test/test-model', + model: 'z-ai/glm-5.1', stream: false, codebuff_metadata: { - run_id: 'run-123', + run_id: 'run-free', client_id: 'test-client-id-123', cost_mode: 'free', }, diff --git a/web/src/app/api/v1/chat/completions/_post.ts b/web/src/app/api/v1/chat/completions/_post.ts index 1d24d35ae3..93e052e4b6 100644 --- a/web/src/app/api/v1/chat/completions/_post.ts +++ b/web/src/app/api/v1/chat/completions/_post.ts @@ -1,6 +1,9 @@ import { AnalyticsEvent } from '@codebuff/common/constants/analytics-events' import { BYOK_OPENROUTER_HEADER } from '@codebuff/common/constants/byok' -import { isFreeMode } from '@codebuff/common/constants/free-agents' +import { + isFreeMode, + isFreeModeAllowedAgentModel, +} from '@codebuff/common/constants/free-agents' import { getErrorObject } from '@codebuff/common/util/error' import { pluralize } from '@codebuff/common/util/string' import { env } from '@codebuff/internal/env' @@ -77,6 +80,11 @@ const FREE_MODE_ALLOWED_COUNTRIES = new Set([ const MIN_ACCOUNT_AGE_DAYS = 3 const MIN_ACCOUNT_AGE_FOR_PAID_MS = MIN_ACCOUNT_AGE_DAYS * 24 * 60 * 60 * 1000 +// Emails allowed to bypass the paid+aged-account gate so integration tests +// (e.g. the SDK prompt-caching test) can run against a real server without +// needing to seed a purchase on every fresh test account. +const PAID_GATE_BYPASS_EMAILS = new Set(['team@codebuff.com']) + function extractClientIp(req: NextRequest): string | undefined { const forwardedFor = req.headers.get('x-forwarded-for') if (forwardedFor) { @@ -354,6 +362,38 @@ export async function postChatCompletions(params: { ) } + // Free-mode requests must use an allowlisted agent+model combination. + // Without this gate, an attacker on a brand-new unpaid account can set + // cost_mode='free' to bypass both the paid-account check and the balance + // check, then request an expensive model (Opus, etc). Our OpenRouter key + // pays for the call; the downstream credit-consumption step records an + // audit row but can't actually deduct from a user who has no grants — + // net result is free Opus for the attacker, real dollars for us. Check + // must happen here, before any call to OpenRouter. + if ( + isFreeModeRequest && + !isFreeModeAllowedAgentModel(agentId, typedBody.model) + ) { + trackEvent({ + event: AnalyticsEvent.CHAT_COMPLETIONS_VALIDATION_ERROR, + userId, + properties: { + error: 'free_mode_invalid_agent_model', + agentId, + model: typedBody.model, + }, + logger, + }) + return NextResponse.json( + { + error: 'free_mode_invalid_agent_model', + message: + 'Free mode is only available for specific agent and model combinations.', + }, + { status: 403 }, + ) + } + // Rate limit free mode requests (after validation so invalid requests don't consume quota) if (isFreeModeRequest) { const rateLimitResult = checkFreeModeRateLimit(userId) @@ -459,9 +499,12 @@ export async function postChatCompletions(params: { ? Date.now() - new Date(userInfo.created_at).getTime() : 0 const accountIsTooNew = accountAgeMs < MIN_ACCOUNT_AGE_FOR_PAID_MS + const isBypassedEmail = + !!userInfo.email && PAID_GATE_BYPASS_EMAILS.has(userInfo.email.toLowerCase()) if ( !isFreeModeRequest && !openrouterApiKeyHeader && + !isBypassedEmail && (!hasPaidRelationship || accountIsTooNew) ) { trackEvent({ diff --git a/web/src/llm-api/fireworks-config.ts b/web/src/llm-api/fireworks-config.ts new file mode 100644 index 0000000000..c19f7dc5bc --- /dev/null +++ b/web/src/llm-api/fireworks-config.ts @@ -0,0 +1,15 @@ +/** + * Static Fireworks deployment config. + * + * Kept in its own module (no imports) so it is safe to pull into edge-runtime + * code paths — e.g. instrumentation.ts — without dragging in the server-only + * modules that fireworks.ts transitively depends on (bigquery, undici, etc). + */ + +export const FIREWORKS_ACCOUNT_ID = 'james-65d217' + +export const FIREWORKS_DEPLOYMENT_MAP: Record = { + // 'minimax/minimax-m2.5': 'accounts/james-65d217/deployments/lnfid5h9', + 'moonshotai/kimi-k2.5': 'accounts/james-65d217/deployments/mx8l5rq2', + 'z-ai/glm-5.1': 'accounts/james-65d217/deployments/mjb4i7ea', +} diff --git a/web/src/llm-api/fireworks.ts b/web/src/llm-api/fireworks.ts index e677700943..83b99abcc9 100644 --- a/web/src/llm-api/fireworks.ts +++ b/web/src/llm-api/fireworks.ts @@ -4,6 +4,7 @@ import { PROFIT_MARGIN } from '@codebuff/common/constants/limits' import { getErrorObject } from '@codebuff/common/util/error' import { env } from '@codebuff/internal/env' +import { FIREWORKS_DEPLOYMENT_MAP } from './fireworks-config' import { consumeCreditsForMessage, extractRequestMetadata, @@ -37,13 +38,6 @@ const FIREWORKS_MODEL_MAP: Record = { /** Flag to enable custom Fireworks deployments (set to false to use global API only) */ const FIREWORKS_USE_CUSTOM_DEPLOYMENT = true -/** Custom deployment IDs for models with dedicated Fireworks deployments */ -const FIREWORKS_DEPLOYMENT_MAP: Record = { - // 'minimax/minimax-m2.5': 'accounts/james-65d217/deployments/lnfid5h9', - 'moonshotai/kimi-k2.5': 'accounts/james-65d217/deployments/mx8l5rq2', - 'z-ai/glm-5.1': 'accounts/james-65d217/deployments/mjb4i7ea', -} - /** Check if current time is within deployment hours (always enabled) */ export function isDeploymentHours(_now: Date = new Date()): boolean { return true diff --git a/web/src/server/fireworks-monitor/__tests__/compute-health.test.ts b/web/src/server/fireworks-monitor/__tests__/compute-health.test.ts new file mode 100644 index 0000000000..30fba28a9e --- /dev/null +++ b/web/src/server/fireworks-monitor/__tests__/compute-health.test.ts @@ -0,0 +1,251 @@ +import { describe, expect, test } from 'bun:test' + +import { + computeDeploymentHealth, + computeSnapshot, + DEFAULT_HEALTH_THRESHOLDS, +} from '../compute-health' +import { parsePrometheusText } from '../parse-prometheus' + +const DEPLOYMENT = 'accounts/test-acc/deployments/d1' + +function fixture(params: { + requestRate?: number + errorRate?: number + errorCode?: string + concurrent?: number + kvBlocks?: number + kvSlots?: number + queueBuckets?: Array<{ le: string; count: number }> + ttftBuckets?: Array<{ le: string; count: number }> +}): string { + const lines: string[] = [] + const labels = `base_model="m",deployment="${DEPLOYMENT}",deployment_account="test-acc",deployment_id="d1"` + if (params.requestRate !== undefined) { + lines.push(`request_counter_total:sum_by_deployment{${labels}} ${params.requestRate}`) + } + if (params.errorRate !== undefined) { + const code = params.errorCode ?? '500' + lines.push( + `requests_error_total:sum_by_deployment{${labels},http_code="${code}"} ${params.errorRate}`, + ) + } + if (params.concurrent !== undefined) { + lines.push( + `requests_coordinator_concurrent_count:avg_by_deployment{${labels}} ${params.concurrent}`, + ) + } + if (params.kvBlocks !== undefined) { + lines.push( + `generator_kv_blocks_fraction:avg_by_deployment{${labels}} ${params.kvBlocks}`, + ) + } + if (params.kvSlots !== undefined) { + lines.push( + `generator_kv_slots_fraction:avg_by_deployment{${labels}} ${params.kvSlots}`, + ) + } + for (const bucket of params.queueBuckets ?? []) { + lines.push( + `latency_generation_queue_ms_bucket:sum_by_deployment{${labels},le="${bucket.le}"} ${bucket.count}`, + ) + } + for (const bucket of params.ttftBuckets ?? []) { + lines.push( + `latency_to_first_token_ms_bucket:sum_by_deployment{${labels},le="${bucket.le}"} ${bucket.count}`, + ) + } + return lines.join('\n') +} + +describe('computeDeploymentHealth', () => { + test('healthy deployment with low error rate and low utilization', () => { + const metrics = parsePrometheusText( + fixture({ + requestRate: 10, + errorRate: 0, + concurrent: 3, + kvBlocks: 0.2, + kvSlots: 0.2, + queueBuckets: [ + { le: '100', count: 50 }, + { le: '1000', count: 100 }, + { le: '+Inf', count: 100 }, + ], + ttftBuckets: [ + { le: '500', count: 60 }, + { le: '2000', count: 100 }, + { le: '+Inf', count: 100 }, + ], + }), + ) + + const health = computeDeploymentHealth({ + deployment: DEPLOYMENT, + metrics, + thresholds: DEFAULT_HEALTH_THRESHOLDS, + }) + + expect(health.status).toBe('healthy') + expect(health.reasons).toEqual([]) + expect(health.deploymentId).toBe('d1') + expect(health.baseModel).toBe('m') + expect(health.metrics.errorFraction).toBe(0) + }) + + test('flags high error rate as unhealthy', () => { + const metrics = parsePrometheusText( + fixture({ requestRate: 10, errorRate: 2, kvBlocks: 0.1 }), + ) + const health = computeDeploymentHealth({ + deployment: DEPLOYMENT, + metrics, + thresholds: DEFAULT_HEALTH_THRESHOLDS, + }) + expect(health.status).toBe('unhealthy') + expect(health.metrics.errorFraction).toBeCloseTo(0.2, 5) + expect(health.reasons.some((r) => r.includes('error rate'))).toBe(true) + }) + + test('flags mid error rate as degraded', () => { + const metrics = parsePrometheusText( + fixture({ requestRate: 100, errorRate: 5, kvBlocks: 0.1 }), + ) + const health = computeDeploymentHealth({ + deployment: DEPLOYMENT, + metrics, + thresholds: DEFAULT_HEALTH_THRESHOLDS, + }) + expect(health.status).toBe('degraded') + expect(health.metrics.errorFraction).toBeCloseTo(0.05, 5) + }) + + test('flags saturated KV cache as unhealthy', () => { + const metrics = parsePrometheusText( + fixture({ requestRate: 10, errorRate: 0, kvBlocks: 0.995 }), + ) + const health = computeDeploymentHealth({ + deployment: DEPLOYMENT, + metrics, + thresholds: DEFAULT_HEALTH_THRESHOLDS, + }) + expect(health.status).toBe('unhealthy') + expect(health.reasons.some((r) => r.includes('KV blocks'))).toBe(true) + }) + + test('flags long queue wait as unhealthy', () => { + const metrics = parsePrometheusText( + fixture({ + requestRate: 10, + errorRate: 0, + kvBlocks: 0.3, + queueBuckets: [ + { le: '5000', count: 0 }, + { le: '20000', count: 100 }, + { le: '+Inf', count: 100 }, + ], + }), + ) + const health = computeDeploymentHealth({ + deployment: DEPLOYMENT, + metrics, + thresholds: DEFAULT_HEALTH_THRESHOLDS, + }) + expect(health.status).toBe('unhealthy') + expect(health.reasons.some((r) => r.includes('queue'))).toBe(true) + }) + + test('skips error-fraction check when request rate is below the floor', () => { + const metrics = parsePrometheusText( + fixture({ requestRate: 0.05, errorRate: 0.05, kvBlocks: 0.1 }), + ) + const health = computeDeploymentHealth({ + deployment: DEPLOYMENT, + metrics, + thresholds: DEFAULT_HEALTH_THRESHOLDS, + }) + expect(health.metrics.errorFraction).toBeCloseTo(1.0, 5) + expect(health.status).toBe('healthy') + expect(health.reasons.some((r) => r.includes('error rate'))).toBe(false) + }) + + test('still applies error-fraction check at or above the floor', () => { + const metrics = parsePrometheusText( + fixture({ requestRate: 0.1, errorRate: 0.05, kvBlocks: 0.1 }), + ) + const health = computeDeploymentHealth({ + deployment: DEPLOYMENT, + metrics, + thresholds: DEFAULT_HEALTH_THRESHOLDS, + }) + expect(health.status).toBe('unhealthy') + expect(health.reasons.some((r) => r.includes('error rate'))).toBe(true) + }) + + test('sums error counters across multiple HTTP codes', () => { + const labels = `base_model="m",deployment="${DEPLOYMENT}",deployment_id="d1"` + const text = [ + `request_counter_total:sum_by_deployment{${labels}} 100`, + `requests_error_total:sum_by_deployment{${labels},http_code="500"} 3`, + `requests_error_total:sum_by_deployment{${labels},http_code="429"} 5`, + `generator_kv_blocks_fraction:avg_by_deployment{${labels}} 0.1`, + ].join('\n') + const metrics = parsePrometheusText(text) + const health = computeDeploymentHealth({ + deployment: DEPLOYMENT, + metrics, + thresholds: DEFAULT_HEALTH_THRESHOLDS, + }) + expect(health.metrics.errorRate).toBe(8) + expect(health.metrics.errorFraction).toBeCloseTo(0.08, 5) + expect(health.status).toBe('degraded') + }) +}) + +describe('computeSnapshot', () => { + test('marks deployments as unknown when metrics have never been fetched', () => { + const snap = computeSnapshot({ + metrics: null, + deployments: [DEPLOYMENT], + now: 1000, + }) + expect(snap.overall).toBe('unknown') + expect(snap.deployments[DEPLOYMENT].status).toBe('unknown') + expect(snap.scrapedAt).toBeNull() + }) + + test('downgrades stale snapshots to unhealthy', () => { + const metrics = parsePrometheusText( + fixture({ requestRate: 10, errorRate: 0, kvBlocks: 0.1 }), + 1000, + ) + const snap = computeSnapshot({ + metrics, + deployments: [DEPLOYMENT], + now: 1000 + DEFAULT_HEALTH_THRESHOLDS.staleSnapshotMs + 1, + }) + expect(snap.overall).toBe('unhealthy') + expect(snap.deployments[DEPLOYMENT].reasons[0]).toBe('snapshot stale') + }) + + test('overall status is the worst across deployments', () => { + const dep2 = 'accounts/test-acc/deployments/d2' + const text = [ + `request_counter_total:sum_by_deployment{deployment="${DEPLOYMENT}",deployment_id="d1"} 100`, + `requests_error_total:sum_by_deployment{deployment="${DEPLOYMENT}",deployment_id="d1",http_code="500"} 0`, + `generator_kv_blocks_fraction:avg_by_deployment{deployment="${DEPLOYMENT}",deployment_id="d1"} 0.1`, + `request_counter_total:sum_by_deployment{deployment="${dep2}",deployment_id="d2"} 100`, + `requests_error_total:sum_by_deployment{deployment="${dep2}",deployment_id="d2",http_code="500"} 30`, + `generator_kv_blocks_fraction:avg_by_deployment{deployment="${dep2}",deployment_id="d2"} 0.1`, + ].join('\n') + const metrics = parsePrometheusText(text, 1000) + const snap = computeSnapshot({ + metrics, + deployments: [DEPLOYMENT, dep2], + now: 1000, + }) + expect(snap.deployments[DEPLOYMENT].status).toBe('healthy') + expect(snap.deployments[dep2].status).toBe('unhealthy') + expect(snap.overall).toBe('unhealthy') + }) +}) diff --git a/web/src/server/fireworks-monitor/__tests__/monitor.test.ts b/web/src/server/fireworks-monitor/__tests__/monitor.test.ts new file mode 100644 index 0000000000..08dbc8ad3a --- /dev/null +++ b/web/src/server/fireworks-monitor/__tests__/monitor.test.ts @@ -0,0 +1,188 @@ +import { afterEach, describe, expect, test } from 'bun:test' + +import { + __resetFireworksMonitorForTests, + getFireworksHealthSnapshot, + isFireworksAdmissible, + refreshFireworksHealthNow, + scrapeFireworksMetrics, + startFireworksMonitor, + stopFireworksMonitor, +} from '../monitor' + +afterEach(() => { + __resetFireworksMonitorForTests() +}) + +const DEPLOYMENT = 'accounts/test-acc/deployments/d1' + +const HEALTHY_BODY = [ + `request_counter_total:sum_by_deployment{deployment="${DEPLOYMENT}",deployment_id="d1"} 10`, + `requests_error_total:sum_by_deployment{deployment="${DEPLOYMENT}",deployment_id="d1",http_code="500"} 0`, + `generator_kv_blocks_fraction:avg_by_deployment{deployment="${DEPLOYMENT}",deployment_id="d1"} 0.1`, +].join('\n') + +function makeFetchMock( + responses: Array<{ status: number; body?: string; headers?: Record }>, +) { + const calls: Array<{ url: string; init?: RequestInit }> = [] + let i = 0 + const impl = (async (url: string, init?: RequestInit): Promise => { + calls.push({ url: String(url), init }) + const { status, body = '', headers = {} } = responses[Math.min(i, responses.length - 1)] + i++ + return new Response(body, { status, headers }) + }) as unknown as typeof globalThis.fetch + return { fetch: impl, calls: () => calls } +} + +describe('scrapeFireworksMetrics', () => { + test('sends Bearer auth + parses Prometheus response', async () => { + const { fetch, calls } = makeFetchMock([ + { status: 200, body: HEALTHY_BODY }, + ]) + + const metrics = await scrapeFireworksMetrics({ + apiKey: 'test-key', + accountId: 'acc-1', + fetch, + }) + + expect(metrics.samples.length).toBeGreaterThan(0) + const recorded = calls() + expect(recorded).toHaveLength(1) + expect(recorded[0].url).toBe('https://api.fireworks.ai/v1/accounts/acc-1/metrics') + const authHeader = (recorded[0].init?.headers as Record)?.Authorization + expect(authHeader).toBe('Bearer test-key') + }) + + test('throws FireworksScrapeError on 429 with retry-after seconds', async () => { + const { fetch } = makeFetchMock([ + { status: 429, body: 'slow down', headers: { 'retry-after': '45' } }, + ]) + + let caught: unknown = null + try { + await scrapeFireworksMetrics({ apiKey: 'k', accountId: 'acc', fetch }) + } catch (err) { + caught = err + } + expect(caught).toBeInstanceOf(Error) + const scrapeError = caught as Error & { status?: number; retryAfterMs?: number | null } + expect(scrapeError.status).toBe(429) + expect(scrapeError.retryAfterMs).toBe(45_000) + }) +}) + +describe('startFireworksMonitor', () => { + test('does not start when FIREWORKS_API_KEY is missing', () => { + const started = startFireworksMonitor({ apiKey: '' }) + expect(started).toBe(false) + }) + + test('first scrape populates the snapshot immediately', async () => { + const { fetch } = makeFetchMock([{ status: 200, body: HEALTHY_BODY }]) + + startFireworksMonitor({ + apiKey: 'test-key', + accountId: 'acc-1', + deployments: [DEPLOYMENT], + pollIntervalMs: 10 * 60_000, + fetch, + }) + + await refreshFireworksHealthNow() + + const snap = getFireworksHealthSnapshot() + expect(snap.overall).toBe('healthy') + expect(snap.scrapedAt).not.toBeNull() + expect(snap.deployments[DEPLOYMENT].status).toBe('healthy') + }) + + test('429 sets lastError and keeps snapshot unknown until a good scrape', async () => { + const { fetch } = makeFetchMock([ + { status: 429, body: 'rate limited', headers: { 'retry-after': '30' } }, + ]) + + startFireworksMonitor({ + apiKey: 'test-key', + accountId: 'acc-1', + deployments: [DEPLOYMENT], + pollIntervalMs: 10 * 60_000, + fetch, + }) + + await refreshFireworksHealthNow() + + const snap = getFireworksHealthSnapshot() + expect(snap.overall).toBe('unknown') + expect(snap.lastError).toMatch(/429/) + }) + + test('returns true and is idempotent on duplicate start', () => { + const { fetch } = makeFetchMock([{ status: 200, body: HEALTHY_BODY }]) + expect(startFireworksMonitor({ apiKey: 'k', fetch })).toBe(true) + expect(startFireworksMonitor({ apiKey: 'k', fetch })).toBe(true) + }) +}) + +describe('isFireworksAdmissible', () => { + test('returns false when monitor not started', () => { + expect(isFireworksAdmissible()).toBe(false) + }) + + test('returns true only when overall is healthy', async () => { + const { fetch } = makeFetchMock([{ status: 200, body: HEALTHY_BODY }]) + startFireworksMonitor({ + apiKey: 'k', + accountId: 'acc', + deployments: [DEPLOYMENT], + pollIntervalMs: 10 * 60_000, + fetch, + }) + await refreshFireworksHealthNow() + expect(isFireworksAdmissible()).toBe(true) + }) + + test('fails closed on unhealthy (stale) snapshot', async () => { + const { fetch } = makeFetchMock([ + { status: 200, body: HEALTHY_BODY }, + { status: 500, body: 'down' }, + ]) + startFireworksMonitor({ + apiKey: 'k', + accountId: 'acc', + deployments: [DEPLOYMENT], + pollIntervalMs: 10 * 60_000, + thresholds: { ...(await import('../compute-health')).DEFAULT_HEALTH_THRESHOLDS, staleSnapshotMs: 0 }, + fetch, + }) + await refreshFireworksHealthNow() // good scrape + + // Force stale by waiting one event-loop tick; staleSnapshotMs=0 makes it stale immediately. + await new Promise((r) => setTimeout(r, 1)) + expect(isFireworksAdmissible()).toBe(false) + }) + + test('can gate on a specific deployment id', async () => { + const { fetch } = makeFetchMock([{ status: 200, body: HEALTHY_BODY }]) + startFireworksMonitor({ + apiKey: 'k', + accountId: 'acc', + deployments: [DEPLOYMENT], + pollIntervalMs: 10 * 60_000, + fetch, + }) + await refreshFireworksHealthNow() + + expect(isFireworksAdmissible('d1')).toBe(true) + expect(isFireworksAdmissible('unknown-id')).toBe(false) + }) +}) + +describe('stopFireworksMonitor', () => { + test('is idempotent and safe to call when not started', () => { + stopFireworksMonitor() + stopFireworksMonitor() + }) +}) diff --git a/web/src/server/fireworks-monitor/__tests__/parse-prometheus.test.ts b/web/src/server/fireworks-monitor/__tests__/parse-prometheus.test.ts new file mode 100644 index 0000000000..062b96427d --- /dev/null +++ b/web/src/server/fireworks-monitor/__tests__/parse-prometheus.test.ts @@ -0,0 +1,116 @@ +import { describe, expect, test } from 'bun:test' + +import { + estimateHistogramPercentile, + findSamples, + parsePrometheusText, +} from '../parse-prometheus' + +describe('parsePrometheusText', () => { + test('parses a sample with labels and a value', () => { + const text = [ + '# HELP request_counter_total:sum_by_deployment Request rate', + '# TYPE request_counter_total:sum_by_deployment gauge', + 'request_counter_total:sum_by_deployment{base_model="m",deployment="accounts/a/deployments/d1",deployment_account="a",deployment_id="d1"} 4.5', + ].join('\n') + + const parsed = parsePrometheusText(text, 1000) + + expect(parsed.scrapedAt).toBe(1000) + expect(parsed.samples).toHaveLength(1) + expect(parsed.samples[0]).toEqual({ + name: 'request_counter_total:sum_by_deployment', + labels: { + base_model: 'm', + deployment: 'accounts/a/deployments/d1', + deployment_account: 'a', + deployment_id: 'd1', + }, + value: 4.5, + }) + }) + + test('skips comments and blank lines', () => { + const text = [ + '# comment', + '', + 'foo 1', + '# another', + 'bar 2', + ].join('\n') + const parsed = parsePrometheusText(text) + expect(parsed.samples.map((s) => s.name)).toEqual(['foo', 'bar']) + }) + + test('parses special numeric values', () => { + const text = [ + 'm_nan NaN', + 'm_pinf +Inf', + 'm_ninf -Inf', + ].join('\n') + const parsed = parsePrometheusText(text) + expect(Number.isNaN(parsed.samples[0].value)).toBe(true) + expect(parsed.samples[1].value).toBe(Number.POSITIVE_INFINITY) + expect(parsed.samples[2].value).toBe(Number.NEGATIVE_INFINITY) + }) + + test('handles escaped quotes in labels', () => { + const text = 'm{path="a\\"b",name="x"} 1' + const parsed = parsePrometheusText(text) + expect(parsed.samples[0].labels).toEqual({ path: 'a"b', name: 'x' }) + }) + + test('ignores trailing timestamp on value', () => { + const text = 'm{a="1"} 42 1700000000000' + const parsed = parsePrometheusText(text) + expect(parsed.samples[0].value).toBe(42) + }) +}) + +describe('findSamples', () => { + test('filters by metric name and labels', () => { + const parsed = parsePrometheusText( + [ + 'm{deployment="d1"} 1', + 'm{deployment="d2"} 2', + 'other{deployment="d1"} 99', + ].join('\n'), + ) + const found = findSamples(parsed, 'm', { deployment: 'd1' }) + expect(found).toHaveLength(1) + expect(found[0].value).toBe(1) + }) +}) + +describe('estimateHistogramPercentile', () => { + test('returns le of first bucket that meets the percentile', () => { + const parsed = parsePrometheusText( + [ + 'h_bucket{le="10"} 10', + 'h_bucket{le="100"} 50', + 'h_bucket{le="1000"} 90', + 'h_bucket{le="+Inf"} 100', + ].join('\n'), + ) + const buckets = findSamples(parsed, 'h_bucket') + expect(estimateHistogramPercentile(buckets, 0.5)).toBe(100) + expect(estimateHistogramPercentile(buckets, 0.9)).toBe(1000) + expect(estimateHistogramPercentile(buckets, 0.1)).toBe(10) + }) + + test('returns null if total is zero', () => { + const parsed = parsePrometheusText( + [ + 'h_bucket{le="10"} 0', + 'h_bucket{le="+Inf"} 0', + ].join('\n'), + ) + expect( + estimateHistogramPercentile(findSamples(parsed, 'h_bucket'), 0.5), + ).toBeNull() + }) + + test('returns null when there are no buckets', () => { + expect(estimateHistogramPercentile([], 0.5)).toBeNull() + }) +}) diff --git a/web/src/server/fireworks-monitor/compute-health.ts b/web/src/server/fireworks-monitor/compute-health.ts new file mode 100644 index 0000000000..72efa8b3a8 --- /dev/null +++ b/web/src/server/fireworks-monitor/compute-health.ts @@ -0,0 +1,274 @@ +import { + avgSamples, + estimateHistogramPercentile, + findSamples, + sumSamples, +} from './parse-prometheus' + +import type { + DeploymentHealth, + DeploymentHealthStatus, + FireworksHealthSnapshot, + PromMetrics, + PromSample, +} from './types' + +export interface HealthThresholds { + /** If no successful scrape for this long, overall status is unhealthy. */ + staleSnapshotMs: number + /** Minimum request rate (req/s) before applying the error-fraction check. Below + * this, a handful of transient errors on a near-idle deployment would flap the + * status unnecessarily. */ + minRequestRateForErrorCheck: number + /** Fraction of requests erroring: above this → degraded. */ + errorFractionDegraded: number + /** Fraction of requests erroring: above this → unhealthy. */ + errorFractionUnhealthy: number + /** KV blocks fraction above this → degraded (queue contention imminent). */ + kvBlocksFractionDegraded: number + /** KV blocks fraction above this → unhealthy (cache thrashing). */ + kvBlocksFractionUnhealthy: number + /** p50 time spent in generation queue above this (ms) → degraded. */ + generationQueueMsDegraded: number + /** p50 time spent in generation queue above this (ms) → unhealthy. */ + generationQueueMsUnhealthy: number + /** p50 TTFT above this (ms) → degraded. */ + ttftMsDegraded: number + /** p50 TTFT above this (ms) → unhealthy. */ + ttftMsUnhealthy: number +} + +// Default thresholds are calibrated to the observed freebuff workload on +// glm-5.1 / kimi-k2.5. They are intentionally loose at first so a cold +// deployment does not flap; expect to tighten once you have a week of +// live data. Override per-instance via startFireworksMonitor({ thresholds }). +export const DEFAULT_HEALTH_THRESHOLDS: HealthThresholds = { + staleSnapshotMs: 3 * 60 * 1000, + minRequestRateForErrorCheck: 0.1, + errorFractionDegraded: 0.02, + errorFractionUnhealthy: 0.1, + kvBlocksFractionDegraded: 0.95, + kvBlocksFractionUnhealthy: 0.99, + generationQueueMsDegraded: 5_000, + generationQueueMsUnhealthy: 15_000, + ttftMsDegraded: 8_000, + ttftMsUnhealthy: 30_000, +} + +const STATUS_RANK: Record = { + healthy: 0, + degraded: 1, + unhealthy: 2, + unknown: 3, +} + +export function computeDeploymentHealth(params: { + deployment: string + metrics: PromMetrics + thresholds: HealthThresholds +}): DeploymentHealth { + const { deployment, metrics, thresholds } = params + const filter = { deployment } + + const requestRateSamples = findSamples( + metrics, + 'request_counter_total:sum_by_deployment', + filter, + ) + const errorRateSamples = findSamples( + metrics, + 'requests_error_total:sum_by_deployment', + filter, + ) + + const requestRate = sumSamples(requestRateSamples) + const errorRate = sumSamples(errorRateSamples) + const errorFraction = requestRate > 0 ? errorRate / requestRate : 0 + + const concurrentRequests = + avgSamples( + findSamples( + metrics, + 'requests_coordinator_concurrent_count:avg_by_deployment', + filter, + ), + ) ?? 0 + + const kvBlocksFraction = + avgSamples( + findSamples(metrics, 'generator_kv_blocks_fraction:avg_by_deployment', filter), + ) ?? 0 + const kvSlotsFraction = + avgSamples( + findSamples(metrics, 'generator_kv_slots_fraction:avg_by_deployment', filter), + ) ?? 0 + + const p50GenerationQueueMs = percentileForDeployment( + metrics, + 'latency_generation_queue_ms_bucket:sum_by_deployment', + deployment, + 0.5, + ) + const p50TimeToFirstTokenMs = percentileForDeployment( + metrics, + 'latency_to_first_token_ms_bucket:sum_by_deployment', + deployment, + 0.5, + ) + + const baseModelSample = [ + ...requestRateSamples, + ...errorRateSamples, + ].find((s) => s.labels.base_model) + const baseModel = baseModelSample?.labels.base_model ?? null + const deploymentId = baseModelSample?.labels.deployment_id ?? parseDeploymentId(deployment) + + const reasons: string[] = [] + let status: DeploymentHealthStatus = 'healthy' + + const upgrade = (next: DeploymentHealthStatus) => { + if (STATUS_RANK[next] > STATUS_RANK[status]) status = next + } + + if (requestRate >= thresholds.minRequestRateForErrorCheck) { + if (errorFraction >= thresholds.errorFractionUnhealthy) { + reasons.push(`error rate ${(errorFraction * 100).toFixed(1)}% ≥ ${(thresholds.errorFractionUnhealthy * 100).toFixed(1)}%`) + upgrade('unhealthy') + } else if (errorFraction >= thresholds.errorFractionDegraded) { + reasons.push(`error rate ${(errorFraction * 100).toFixed(1)}% ≥ ${(thresholds.errorFractionDegraded * 100).toFixed(1)}%`) + upgrade('degraded') + } + } + + if (kvBlocksFraction >= thresholds.kvBlocksFractionUnhealthy) { + reasons.push(`KV blocks ${(kvBlocksFraction * 100).toFixed(0)}% ≥ ${(thresholds.kvBlocksFractionUnhealthy * 100).toFixed(0)}%`) + upgrade('unhealthy') + } else if (kvBlocksFraction >= thresholds.kvBlocksFractionDegraded) { + reasons.push(`KV blocks ${(kvBlocksFraction * 100).toFixed(0)}% ≥ ${(thresholds.kvBlocksFractionDegraded * 100).toFixed(0)}%`) + upgrade('degraded') + } + + if (p50GenerationQueueMs !== null) { + if (p50GenerationQueueMs >= thresholds.generationQueueMsUnhealthy) { + reasons.push(`p50 queue ${Math.round(p50GenerationQueueMs)}ms ≥ ${thresholds.generationQueueMsUnhealthy}ms`) + upgrade('unhealthy') + } else if (p50GenerationQueueMs >= thresholds.generationQueueMsDegraded) { + reasons.push(`p50 queue ${Math.round(p50GenerationQueueMs)}ms ≥ ${thresholds.generationQueueMsDegraded}ms`) + upgrade('degraded') + } + } + + if (p50TimeToFirstTokenMs !== null) { + if (p50TimeToFirstTokenMs >= thresholds.ttftMsUnhealthy) { + reasons.push(`p50 TTFT ${Math.round(p50TimeToFirstTokenMs)}ms ≥ ${thresholds.ttftMsUnhealthy}ms`) + upgrade('unhealthy') + } else if (p50TimeToFirstTokenMs >= thresholds.ttftMsDegraded) { + reasons.push(`p50 TTFT ${Math.round(p50TimeToFirstTokenMs)}ms ≥ ${thresholds.ttftMsDegraded}ms`) + upgrade('degraded') + } + } + + return { + deploymentId, + deployment, + baseModel, + status, + reasons, + metrics: { + requestRate, + errorRate, + errorFraction, + concurrentRequests, + kvBlocksFraction, + kvSlotsFraction, + p50GenerationQueueMs, + p50TimeToFirstTokenMs, + }, + } +} + +function percentileForDeployment( + metrics: PromMetrics, + metricName: string, + deployment: string, + percentile: number, +): number | null { + const buckets: PromSample[] = findSamples(metrics, metricName, { deployment }) + return estimateHistogramPercentile(buckets, percentile) +} + +function parseDeploymentId(deployment: string): string { + const parts = deployment.split('/') + return parts[parts.length - 1] ?? deployment +} + +export function computeSnapshot(params: { + metrics: PromMetrics | null + deployments: string[] + thresholds?: HealthThresholds + now?: number + lastError?: string | null +}): FireworksHealthSnapshot { + const thresholds = params.thresholds ?? DEFAULT_HEALTH_THRESHOLDS + const now = params.now ?? Date.now() + const lastError = params.lastError ?? null + + if (!params.metrics) { + const unknownDeployments: Record = {} + for (const deployment of params.deployments) { + unknownDeployments[deployment] = { + deploymentId: parseDeploymentId(deployment), + deployment, + baseModel: null, + status: 'unknown', + reasons: ['no scrape yet'], + metrics: { + requestRate: 0, + errorRate: 0, + errorFraction: 0, + concurrentRequests: 0, + kvBlocksFraction: 0, + kvSlotsFraction: 0, + p50GenerationQueueMs: null, + p50TimeToFirstTokenMs: null, + }, + } + } + return { + scrapedAt: null, + ageMs: null, + overall: 'unknown', + deployments: unknownDeployments, + lastError, + } + } + + const deployments: Record = {} + let worst: DeploymentHealthStatus = 'healthy' + + const stale = now - params.metrics.scrapedAt > thresholds.staleSnapshotMs + + for (const deployment of params.deployments) { + const health = computeDeploymentHealth({ + deployment, + metrics: params.metrics, + thresholds, + }) + if (stale) { + health.reasons.unshift('snapshot stale') + if (STATUS_RANK['unhealthy'] > STATUS_RANK[health.status]) { + health.status = 'unhealthy' + } + } + deployments[deployment] = health + if (STATUS_RANK[health.status] > STATUS_RANK[worst]) worst = health.status + } + + return { + scrapedAt: params.metrics.scrapedAt, + ageMs: now - params.metrics.scrapedAt, + overall: worst, + deployments, + lastError, + } +} diff --git a/web/src/server/fireworks-monitor/monitor.ts b/web/src/server/fireworks-monitor/monitor.ts new file mode 100644 index 0000000000..ffc452e999 --- /dev/null +++ b/web/src/server/fireworks-monitor/monitor.ts @@ -0,0 +1,267 @@ +import { env } from '@codebuff/internal/env' + +import { computeSnapshot, DEFAULT_HEALTH_THRESHOLDS } from './compute-health' +import { parsePrometheusText } from './parse-prometheus' + +import { FIREWORKS_ACCOUNT_ID, FIREWORKS_DEPLOYMENT_MAP } from '@/llm-api/fireworks-config' +import { logger } from '@/util/logger' + +import type { HealthThresholds } from './compute-health' +import type { FireworksHealthSnapshot, PromMetrics } from './types' + +const FIREWORKS_METRICS_URL = (accountId: string) => + `https://api.fireworks.ai/v1/accounts/${accountId}/metrics` + +const DEFAULT_POLL_INTERVAL_MS = 60_000 +/** Random ± jitter so multiple pods don't line up and collectively exceed + * the Fireworks 6 req/min/account rate limit. */ +const POLL_JITTER_MS = 10_000 +const FETCH_TIMEOUT_MS = 15_000 +/** Cap Retry-After honored on 429 so a bad header cannot stall the monitor + * indefinitely. */ +const MAX_BACKOFF_MS = 5 * 60 * 1000 +/** Fallback backoff if Fireworks returns 429 without a parseable Retry-After. */ +const DEFAULT_429_BACKOFF_MS = 60_000 + +export interface MonitorOptions { + apiKey: string + accountId: string + deployments: string[] + pollIntervalMs?: number + thresholds?: HealthThresholds + fetch?: typeof globalThis.fetch +} + +interface MonitorState { + options: MonitorOptions + metrics: PromMetrics | null + lastError: string | null + /** Earliest time at which the next scrape may fire (honors Retry-After). */ + backoffUntil: number + timer: ReturnType | null + inFlight: Promise | null + /** True once stopFireworksMonitor has been called — suppresses in-flight reschedules. */ + stopped: boolean +} + +let state: MonitorState | null = null + +class FireworksScrapeError extends Error { + constructor( + public readonly status: number, + public readonly statusText: string, + public readonly retryAfterMs: number | null, + bodyPreview: string, + ) { + super(`Fireworks metrics scrape failed: ${status} ${statusText}${bodyPreview ? ` — ${bodyPreview}` : ''}`) + this.name = 'FireworksScrapeError' + } +} + +export async function scrapeFireworksMetrics(params: { + apiKey: string + accountId: string + fetch?: typeof globalThis.fetch + signal?: AbortSignal + now?: number +}): Promise { + const fetchImpl = params.fetch ?? globalThis.fetch + const response = await fetchImpl(FIREWORKS_METRICS_URL(params.accountId), { + method: 'GET', + headers: { + Authorization: `Bearer ${params.apiKey}`, + }, + signal: params.signal, + }) + + if (!response.ok) { + const body = await response.text().catch(() => '') + const retryAfterMs = parseRetryAfter(response.headers.get('retry-after')) + throw new FireworksScrapeError( + response.status, + response.statusText, + retryAfterMs, + body.slice(0, 200), + ) + } + + const text = await response.text() + return parsePrometheusText(text, params.now ?? Date.now()) +} + +function parseRetryAfter(raw: string | null): number | null { + if (!raw) return null + const seconds = Number(raw) + if (Number.isFinite(seconds) && seconds >= 0) { + return Math.min(seconds * 1000, MAX_BACKOFF_MS) + } + const dateMs = Date.parse(raw) + if (!Number.isNaN(dateMs)) { + const delta = dateMs - Date.now() + return Math.min(Math.max(delta, 0), MAX_BACKOFF_MS) + } + return null +} + +function jittered(intervalMs: number): number { + const delta = (Math.random() * 2 - 1) * POLL_JITTER_MS + return Math.max(1_000, Math.round(intervalMs + delta)) +} + +async function pollOnce(): Promise { + if (!state) return + const controller = new AbortController() + const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS) + try { + const metrics = await scrapeFireworksMetrics({ + apiKey: state.options.apiKey, + accountId: state.options.accountId, + fetch: state.options.fetch, + signal: controller.signal, + }) + state.metrics = metrics + state.lastError = null + state.backoffUntil = 0 + } catch (error) { + const message = error instanceof Error ? error.message : String(error) + state.lastError = message + if (error instanceof FireworksScrapeError && error.status === 429) { + const backoffMs = error.retryAfterMs ?? DEFAULT_429_BACKOFF_MS + state.backoffUntil = Date.now() + backoffMs + logger.warn( + { status: 429, backoffMs }, + '[FireworksMonitor] Rate limited, backing off', + ) + } else { + logger.warn({ error: message }, '[FireworksMonitor] Scrape failed') + } + } finally { + clearTimeout(timeout) + } +} + +function scheduleNext() { + if (!state || state.stopped) return + const intervalMs = state.options.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS + const base = jittered(intervalMs) + const untilBackoff = Math.max(0, state.backoffUntil - Date.now()) + const delayMs = Math.max(base, untilBackoff) + const timer = setTimeout(runTick, delayMs) + if (typeof timer.unref === 'function') timer.unref() + state.timer = timer +} + +function runTick() { + if (!state || state.stopped || state.inFlight) { + scheduleNext() + return + } + state.inFlight = pollOnce().finally(() => { + if (!state) return + state.inFlight = null + scheduleNext() + }) +} + +export function startFireworksMonitor(options: Partial = {}): boolean { + if (state) return true + + const apiKey = options.apiKey ?? env.FIREWORKS_API_KEY + if (!apiKey) { + logger.warn({}, '[FireworksMonitor] FIREWORKS_API_KEY not set — monitor not started') + return false + } + + const accountId = options.accountId ?? FIREWORKS_ACCOUNT_ID + const deployments = + options.deployments ?? Object.values(FIREWORKS_DEPLOYMENT_MAP) + const pollIntervalMs = options.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS + const thresholds = options.thresholds ?? DEFAULT_HEALTH_THRESHOLDS + + state = { + options: { + apiKey, + accountId, + deployments, + pollIntervalMs, + thresholds, + fetch: options.fetch, + }, + metrics: null, + lastError: null, + backoffUntil: 0, + timer: null, + inFlight: null, + stopped: false, + } + + // First scrape runs immediately; subsequent scrapes are self-scheduled via + // scheduleNext() with jitter so N pods don't synchronise. + runTick() + + logger.info( + { + accountId, + deployments, + pollIntervalMs, + }, + '[FireworksMonitor] Started', + ) + return true +} + +export function stopFireworksMonitor(): void { + if (!state) return + state.stopped = true + if (state.timer) clearTimeout(state.timer) + state = null +} + +export function getFireworksHealthSnapshot(now: number = Date.now()): FireworksHealthSnapshot { + if (!state) { + return { + scrapedAt: null, + ageMs: null, + overall: 'unknown', + deployments: {}, + lastError: 'monitor not started', + } + } + return computeSnapshot({ + metrics: state.metrics, + deployments: state.options.deployments, + thresholds: state.options.thresholds, + now, + lastError: state.lastError, + }) +} + +/** + * Gate free-session admission: ONLY returns true when the latest snapshot is + * 'healthy'. Any other status — 'degraded', 'unhealthy', 'unknown' — fails + * closed so the waiting room catches requests during incidents, cold starts, + * or monitor failures. + * + * Pass `deploymentId` to gate on a specific deployment instead of the overall + * worst-case. + */ +export function isFireworksAdmissible(deploymentId?: string): boolean { + const snapshot = getFireworksHealthSnapshot() + if (deploymentId) { + const match = Object.values(snapshot.deployments).find( + (d) => d.deploymentId === deploymentId || d.deployment === deploymentId, + ) + return match?.status === 'healthy' + } + return snapshot.overall === 'healthy' +} + +/** Force an immediate scrape (for tests / admin endpoints). Resolves when done. */ +export async function refreshFireworksHealthNow(): Promise { + if (!state) return + await pollOnce() +} + +export function __resetFireworksMonitorForTests(): void { + stopFireworksMonitor() +} diff --git a/web/src/server/fireworks-monitor/parse-prometheus.ts b/web/src/server/fireworks-monitor/parse-prometheus.ts new file mode 100644 index 0000000000..1518fa4e41 --- /dev/null +++ b/web/src/server/fireworks-monitor/parse-prometheus.ts @@ -0,0 +1,147 @@ +import type { PromMetrics, PromSample } from './types' + +const LINE_RE = /^([a-zA-Z_:][a-zA-Z0-9_:]*)(\{([^}]*)\})?\s+(.+)$/ + +export function parsePrometheusText(text: string, now: number = Date.now()): PromMetrics { + const samples: PromSample[] = [] + + for (const rawLine of text.split('\n')) { + const line = rawLine.trim() + if (line === '' || line.startsWith('#')) continue + + const match = LINE_RE.exec(line) + if (!match) continue + + const name = match[1] + const labelBlob = match[3] ?? '' + const valueStr = match[4].trim() + + const value = parsePromValue(valueStr) + if (value === null) continue + + samples.push({ + name, + labels: parseLabels(labelBlob), + value, + }) + } + + return { samples, scrapedAt: now } +} + +function parsePromValue(raw: string): number | null { + const trimmed = raw.split(/\s+/)[0] + if (trimmed === 'NaN') return NaN + if (trimmed === '+Inf') return Number.POSITIVE_INFINITY + if (trimmed === '-Inf') return Number.NEGATIVE_INFINITY + const n = Number(trimmed) + return Number.isFinite(n) || Number.isNaN(n) ? n : null +} + +function parseLabels(blob: string): Record { + const labels: Record = {} + if (blob === '') return labels + + let i = 0 + while (i < blob.length) { + while (i < blob.length && (blob[i] === ' ' || blob[i] === ',')) i++ + if (i >= blob.length) break + + const eq = blob.indexOf('=', i) + if (eq === -1) break + const key = blob.slice(i, eq).trim() + + let j = eq + 1 + if (blob[j] !== '"') break + j++ + let value = '' + while (j < blob.length && blob[j] !== '"') { + if (blob[j] === '\\' && j + 1 < blob.length) { + const next = blob[j + 1] + value += next === 'n' ? '\n' : next === 't' ? '\t' : next + j += 2 + } else { + value += blob[j] + j++ + } + } + labels[key] = value + i = j + 1 + } + + return labels +} + +export function findSamples( + metrics: PromMetrics, + name: string, + labelFilter: Record = {}, +): PromSample[] { + return metrics.samples.filter((s) => { + if (s.name !== name) return false + for (const [k, v] of Object.entries(labelFilter)) { + if (s.labels[k] !== v) return false + } + return true + }) +} + +export function sumSamples(samples: PromSample[]): number { + let sum = 0 + for (const s of samples) { + if (Number.isFinite(s.value)) sum += s.value + } + return sum +} + +export function avgSamples(samples: PromSample[]): number | null { + if (samples.length === 0) return null + const finite = samples.filter((s) => Number.isFinite(s.value)) + if (finite.length === 0) return null + return sumSamples(finite) / finite.length +} + +export function estimateHistogramPercentile( + buckets: PromSample[], + percentile: number, +): number | null { + if (buckets.length === 0) return null + + const sorted = [...buckets] + .map((b) => { + const leRaw = b.labels.le + const le = leRaw === '+Inf' ? Number.POSITIVE_INFINITY : Number(leRaw) + return { le, count: b.value } + }) + .filter((b) => !Number.isNaN(b.le)) + .sort((a, b) => a.le - b.le) + + if (sorted.length === 0) return null + const total = sorted[sorted.length - 1].count + if (!Number.isFinite(total) || total <= 0) return null + + const target = total * percentile + for (let idx = 0; idx < sorted.length; idx++) { + if (sorted[idx].count >= target) { + if (sorted[idx].le === Number.POSITIVE_INFINITY) { + return idx > 0 ? sorted[idx - 1].le : null + } + return sorted[idx].le + } + } + return null +} + +export function groupBucketsByLabels( + samples: PromSample[], + groupKeys: string[], +): Map { + const groups = new Map() + for (const s of samples) { + const key = groupKeys.map((k) => `${k}=${s.labels[k] ?? ''}`).join('|') + const arr = groups.get(key) ?? [] + arr.push(s) + groups.set(key, arr) + } + return groups +} diff --git a/web/src/server/fireworks-monitor/types.ts b/web/src/server/fireworks-monitor/types.ts new file mode 100644 index 0000000000..51f45ed8a5 --- /dev/null +++ b/web/src/server/fireworks-monitor/types.ts @@ -0,0 +1,38 @@ +export interface PromSample { + name: string + labels: Record + value: number +} + +export interface PromMetrics { + samples: PromSample[] + scrapedAt: number +} + +export type DeploymentHealthStatus = 'healthy' | 'degraded' | 'unhealthy' | 'unknown' + +export interface DeploymentHealth { + deploymentId: string + deployment: string + baseModel: string | null + status: DeploymentHealthStatus + reasons: string[] + metrics: { + requestRate: number + errorRate: number + errorFraction: number + concurrentRequests: number + kvBlocksFraction: number + kvSlotsFraction: number + p50GenerationQueueMs: number | null + p50TimeToFirstTokenMs: number | null + } +} + +export interface FireworksHealthSnapshot { + scrapedAt: number | null + ageMs: number | null + overall: DeploymentHealthStatus + deployments: Record + lastError: string | null +}