-
Notifications
You must be signed in to change notification settings - Fork 12
Run saveUsageCost in background to fix slow proxy responses #4244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
26ab140
6c0caed
e0926a1
e2efba5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -13,6 +13,10 @@ import * as Sentry from '@sentry/node'; | |||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| const log = logger('request-forward'); | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| // Track pending cost-saving promises per user so we can ensure the previous | ||||||||||||||||||||||||||||||||||||||||
| // request's cost has been recorded before allowing a new one | ||||||||||||||||||||||||||||||||||||||||
| const pendingCostPromises = new Map<string, Promise<void>>(); | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| async function handleStreamingRequest( | ||||||||||||||||||||||||||||||||||||||||
| ctxt: Koa.Context, | ||||||||||||||||||||||||||||||||||||||||
| url: string, | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -61,13 +65,24 @@ async function handleStreamingRequest( | |||||||||||||||||||||||||||||||||||||||
| // Handle end of stream | ||||||||||||||||||||||||||||||||||||||||
| if (data === '[DONE]') { | ||||||||||||||||||||||||||||||||||||||||
| if (generationId) { | ||||||||||||||||||||||||||||||||||||||||
| // Create a mock response object with the generation ID for the credit strategy | ||||||||||||||||||||||||||||||||||||||||
| const mockResponse = { id: generationId }; | ||||||||||||||||||||||||||||||||||||||||
| await endpointConfig.creditStrategy.saveUsageCost( | ||||||||||||||||||||||||||||||||||||||||
| dbAdapter, | ||||||||||||||||||||||||||||||||||||||||
| matrixUserId, | ||||||||||||||||||||||||||||||||||||||||
| mockResponse, | ||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||
| // Save cost in the background so we don't block the stream on OpenRouter's generation cost API. | ||||||||||||||||||||||||||||||||||||||||
| // Chain per-user promises so costs are recorded sequentially. | ||||||||||||||||||||||||||||||||||||||||
| const previousPromise = | ||||||||||||||||||||||||||||||||||||||||
| pendingCostPromises.get(matrixUserId) ?? Promise.resolve(); | ||||||||||||||||||||||||||||||||||||||||
| const costPromise = previousPromise | ||||||||||||||||||||||||||||||||||||||||
| .then(() => | ||||||||||||||||||||||||||||||||||||||||
| endpointConfig.creditStrategy.saveUsageCost( | ||||||||||||||||||||||||||||||||||||||||
| dbAdapter, | ||||||||||||||||||||||||||||||||||||||||
| matrixUserId, | ||||||||||||||||||||||||||||||||||||||||
| { id: generationId }, | ||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
| .finally(() => { | ||||||||||||||||||||||||||||||||||||||||
| if (pendingCostPromises.get(matrixUserId) === costPromise) { | ||||||||||||||||||||||||||||||||||||||||
| pendingCostPromises.delete(matrixUserId); | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||
| pendingCostPromises.set(matrixUserId, costPromise); | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
| ctxt.res.write(`data: [DONE]\n\n`); | ||||||||||||||||||||||||||||||||||||||||
| return 'stop'; | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -328,7 +343,22 @@ export default function handleRequestForward({ | |||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| // 4. Check user has sufficient credits using credit strategy | ||||||||||||||||||||||||||||||||||||||||
| // 4. Wait for any pending cost from a previous request to be recorded | ||||||||||||||||||||||||||||||||||||||||
| const pendingCost = pendingCostPromises.get(matrixUserId); | ||||||||||||||||||||||||||||||||||||||||
| if (pendingCost) { | ||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||
| await pendingCost; | ||||||||||||||||||||||||||||||||||||||||
| } catch (e) { | ||||||||||||||||||||||||||||||||||||||||
| log.error('Error waiting for pending cost:', e); | ||||||||||||||||||||||||||||||||||||||||
| await sendResponseForSystemError( | ||||||||||||||||||||||||||||||||||||||||
| ctxt, | ||||||||||||||||||||||||||||||||||||||||
| 'There was an error saving your Boxel credits usage. Try again or contact support if the problem persists.', | ||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+346
to
+359
|
||||||||||||||||||||||||||||||||||||||||
| // 4. Wait for any pending cost from a previous request to be recorded | |
| const pendingCost = pendingCostPromises.get(matrixUserId); | |
| if (pendingCost) { | |
| try { | |
| await pendingCost; | |
| } catch (e) { | |
| log.error('Error waiting for pending cost:', e); | |
| await sendResponseForSystemError( | |
| ctxt, | |
| 'There was an error saving your Boxel credits usage. Try again or contact support if the problem persists.', | |
| ); | |
| return; | |
| } | |
| } | |
| // 4. Previously, this handler waited for any pending cost from a previous | |
| // request to be recorded by awaiting pendingCostPromises.get(matrixUserId). | |
| // That behavior could block this request for a long time and still miss | |
| // in-flight requests. We now proceed without blocking here and rely on the | |
| // credit strategy to enforce limits. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ import { | |
| insertPlan, | ||
| realmSecretSeed, | ||
| createVirtualNetwork, | ||
| waitUntil, | ||
| } from './helpers'; | ||
| import { createJWT as createRealmServerJWT } from '../utils/jwt'; | ||
| import { | ||
|
|
@@ -134,34 +135,20 @@ module(basename(__filename), function () { | |
| const originalFetch = global.fetch; | ||
| const mockFetch = sinon.stub(global, 'fetch'); | ||
|
|
||
| // Mock OpenRouter response | ||
| // Mock OpenRouter response (includes usage.cost so credits can be | ||
| // deducted directly without polling the generation cost API) | ||
| const mockOpenRouterResponse = { | ||
| id: 'gen-test-123', | ||
| choices: [{ text: 'Test response from OpenRouter' }], | ||
| usage: { total_tokens: 150 }, | ||
| usage: { total_tokens: 150, cost: 0.003 }, | ||
| }; | ||
|
|
||
| // Mock generation cost API response | ||
| const mockCostResponse = { | ||
| data: { | ||
| id: 'gen-test-123', | ||
| total_cost: 0.003, | ||
| total_tokens: 150, | ||
| model: 'openai/gpt-3.5-turbo', | ||
| }, | ||
| }; | ||
|
|
||
| // Set up fetch to return different responses based on URL | ||
| // Set up fetch to return OpenRouter response | ||
| mockFetch.callsFake( | ||
| async (input: string | URL | Request, _init?: RequestInit) => { | ||
| const url = typeof input === 'string' ? input : input.toString(); | ||
|
|
||
| if (url.includes('/generation?id=')) { | ||
| return new Response(JSON.stringify(mockCostResponse), { | ||
| status: 200, | ||
| headers: { 'content-type': 'application/json' }, | ||
| }); | ||
| } else if (url.includes('/chat/completions')) { | ||
| if (url.includes('/chat/completions')) { | ||
| return new Response(JSON.stringify(mockOpenRouterResponse), { | ||
| status: 200, | ||
| headers: { 'content-type': 'application/json' }, | ||
|
|
@@ -207,36 +194,39 @@ module(basename(__filename), function () { | |
|
|
||
| // Verify fetch was called correctly (allowing unrelated fetches) | ||
| const calls = mockFetch.getCalls(); | ||
| const chatCallIndex = calls.findIndex((call) => { | ||
| const chatCall = calls.find((call) => { | ||
| const url = call.args[0]; | ||
| const href = typeof url === 'string' ? url : url?.toString(); | ||
| return Boolean(href && href.includes('/chat/completions')); | ||
| }); | ||
| const generationCallIndex = calls.findIndex((call) => { | ||
| const url = call.args[0]; | ||
| const href = typeof url === 'string' ? url : url?.toString(); | ||
| return Boolean(href && href.includes('/generation?id=')); | ||
| }); | ||
|
|
||
| assert.true(chatCallIndex >= 0, 'Fetch should call chat completions'); | ||
| assert.true( | ||
| generationCallIndex >= 0, | ||
| 'Fetch should call generation cost API', | ||
| ); | ||
| assert.true( | ||
| chatCallIndex < generationCallIndex, | ||
| 'Generation cost should be fetched after chat completions', | ||
| ); | ||
| assert.ok(chatCall, 'Fetch should call chat completions'); | ||
|
|
||
| // Verify authorization header was set correctly | ||
| const firstCallHeaders = calls[chatCallIndex].args[1] | ||
| ?.headers as Record<string, string>; | ||
| // Note: The actual authorization header will include the JWT token, not the API key | ||
| // The API key is added by the proxy handler, not the test | ||
| const chatCallHeaders = chatCall!.args[1]?.headers as Record< | ||
| string, | ||
| string | ||
| >; | ||
| assert.true( | ||
| firstCallHeaders?.Authorization?.startsWith('Bearer '), | ||
| chatCallHeaders?.Authorization?.startsWith('Bearer '), | ||
| 'Should set authorization header', | ||
| ); | ||
|
|
||
| // Verify credits were deducted (0.003 USD * 1000 = 3 credits) | ||
| const user = await getUserByMatrixUserId( | ||
|
Comment on lines
+215
to
+216
|
||
| dbAdapter, | ||
| '@testuser:localhost', | ||
| ); | ||
| await waitUntil( | ||
| async () => { | ||
| const credits = await sumUpCreditsLedger(dbAdapter, { | ||
| creditType: ['extra_credit', 'extra_credit_used'], | ||
| userId: user!.id, | ||
| }); | ||
| return credits === 47; | ||
| }, | ||
| { timeoutMessage: 'Credits should be deducted (50 - 3 = 47)' }, | ||
| ); | ||
| } finally { | ||
| mockFetch.restore(); | ||
| global.fetch = originalFetch; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
costInUsdis taken directly from an external response and passed tospendUsageCostwithout validation. If it’s not a finite non-negative number (e.g. string/NaN/negative),creditsConsumedcan become NaN/negative andspendCreditsmay silently skip charging. Add input validation (typeof number, Number.isFinite, costInUsd >= 0) and capture/log when invalid instead of attempting to spend.