Skip to content

Commit b22d244

Browse files
authored
Add chat completion concurrency logging (#733)
1 parent a2884a2 commit b22d244

4 files changed

Lines changed: 253 additions & 1 deletion

File tree

web/src/app/api/v1/chat/completions/__tests__/completions.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1779,6 +1779,7 @@ describe('/api/v1/chat/completions POST endpoint', () => {
17791779
expect(response.headers.get('Content-Type')).toBe('text/event-stream')
17801780
expect(response.headers.get('Cache-Control')).toBe('no-cache')
17811781
expect(response.headers.get('Connection')).toBe('keep-alive')
1782+
expect(await response.text()).toContain(' stream')
17821783
},
17831784
FETCH_PATH_TEST_TIMEOUT_MS,
17841785
)
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import { describe, expect, it, mock } from 'bun:test'
2+
3+
import {
4+
beginChatCompletionRequestMetrics,
5+
getActiveChatCompletionRequestCount,
6+
} from '../request-metrics'
7+
8+
import type { Logger } from '@codebuff/common/types/contracts/logger'
9+
10+
const createLogger = (): Logger => ({
11+
debug: mock(() => {}),
12+
error: mock(() => {}),
13+
info: mock(() => {}),
14+
warn: mock(() => {}),
15+
})
16+
17+
const baseParams = (logger: Logger) => ({
18+
logger,
19+
userId: 'user-1',
20+
agentId: 'agent-1',
21+
runId: 'run-1',
22+
model: 'provider/model',
23+
streaming: true,
24+
costMode: 'normal',
25+
logSampleRate: 1,
26+
})
27+
28+
const drainStream = async (stream: ReadableStream<Uint8Array>) => {
29+
const reader = stream.getReader()
30+
while (true) {
31+
const { done } = await reader.read()
32+
if (done) return
33+
}
34+
}
35+
36+
describe('chat completion request metrics', () => {
37+
it('increments and decrements when manually ended', () => {
38+
const logger = createLogger()
39+
const metrics = beginChatCompletionRequestMetrics(baseParams(logger))
40+
41+
expect(getActiveChatCompletionRequestCount()).toBe(1)
42+
43+
metrics.end('completed')
44+
metrics.end('completed')
45+
46+
expect(getActiveChatCompletionRequestCount()).toBe(0)
47+
expect(logger.info).toHaveBeenCalledTimes(2)
48+
})
49+
50+
it('tracks requests without logging when sampling skips the request', () => {
51+
const logger = createLogger()
52+
const metrics = beginChatCompletionRequestMetrics({
53+
...baseParams(logger),
54+
logSampleRate: 0,
55+
})
56+
57+
expect(getActiveChatCompletionRequestCount()).toBe(1)
58+
59+
metrics.end('completed')
60+
61+
expect(getActiveChatCompletionRequestCount()).toBe(0)
62+
expect(logger.info).toHaveBeenCalledTimes(0)
63+
})
64+
65+
it('decrements when a wrapped stream completes', async () => {
66+
const logger = createLogger()
67+
const metrics = beginChatCompletionRequestMetrics(baseParams(logger))
68+
const stream = new ReadableStream<Uint8Array>({
69+
start(controller) {
70+
controller.enqueue(new TextEncoder().encode('data: test\n\n'))
71+
controller.close()
72+
},
73+
})
74+
75+
await drainStream(metrics.wrapStream(stream))
76+
77+
expect(getActiveChatCompletionRequestCount()).toBe(0)
78+
expect(logger.info).toHaveBeenCalledTimes(2)
79+
})
80+
81+
it('decrements when a wrapped stream is cancelled', async () => {
82+
const logger = createLogger()
83+
const metrics = beginChatCompletionRequestMetrics(baseParams(logger))
84+
const stream = new ReadableStream<Uint8Array>({
85+
start(controller) {
86+
controller.enqueue(new TextEncoder().encode('data: test\n\n'))
87+
},
88+
})
89+
90+
const reader = metrics.wrapStream(stream).getReader()
91+
await reader.cancel('client disconnected')
92+
93+
expect(getActiveChatCompletionRequestCount()).toBe(0)
94+
expect(logger.info).toHaveBeenCalledTimes(2)
95+
})
96+
97+
it('decrements when a wrapped stream errors', async () => {
98+
const logger = createLogger()
99+
const metrics = beginChatCompletionRequestMetrics(baseParams(logger))
100+
const stream = new ReadableStream<Uint8Array>({
101+
pull() {
102+
throw new Error('provider stream failed')
103+
},
104+
})
105+
106+
await expect(drainStream(metrics.wrapStream(stream))).rejects.toThrow(
107+
'provider stream failed',
108+
)
109+
110+
expect(getActiveChatCompletionRequestCount()).toBe(0)
111+
expect(logger.info).toHaveBeenCalledTimes(2)
112+
})
113+
})

web/src/app/api/v1/chat/completions/_post.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ import type {
116116
import { extractApiKeyFromHeader } from '@/util/auth'
117117
import { withDefaultProperties } from '@codebuff/common/analytics'
118118
import { checkFreeModeRateLimit as defaultCheckFreeModeRateLimit } from './free-mode-rate-limiter'
119+
import { beginChatCompletionRequestMetrics } from './request-metrics'
119120

120121
export const formatQuotaResetCountdown = (
121122
nextQuotaReset: string | null | undefined,
@@ -794,6 +795,16 @@ export async function postChatCompletions(params: {
794795
insertChatCompletionTraceBigquery,
795796
})
796797

798+
const requestMetrics = beginChatCompletionRequestMetrics({
799+
logger,
800+
userId,
801+
agentId,
802+
runId: runIdFromBody,
803+
model: typedBody.model,
804+
streaming: bodyStream,
805+
costMode,
806+
})
807+
797808
// Handle streaming vs non-streaming
798809
try {
799810
if (bodyStream) {
@@ -859,7 +870,7 @@ export async function postChatCompletions(params: {
859870
logger,
860871
})
861872

862-
return new NextResponse(stream, {
873+
return new NextResponse(requestMetrics.wrapStream(stream), {
863874
headers: {
864875
'Content-Type': 'text/event-stream',
865876
'Cache-Control': 'no-cache',
@@ -934,9 +945,11 @@ export async function postChatCompletions(params: {
934945
logger,
935946
})
936947

948+
requestMetrics.end('completed')
937949
return NextResponse.json(result)
938950
}
939951
} catch (error) {
952+
requestMetrics.end('error', { error: getErrorObject(error) })
940953
let openrouterError: OpenRouterError | undefined
941954
if (error instanceof OpenRouterError) {
942955
openrouterError = error
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import os from 'os'
2+
3+
import { getErrorObject } from '@codebuff/common/util/error'
4+
5+
import type { Logger } from '@codebuff/common/types/contracts/logger'
6+
7+
const HOSTNAME = os.hostname()
8+
const DEFAULT_LOG_SAMPLE_RATE = 0.05
9+
10+
let activeChatCompletionRequests = 0
11+
let nextRequestSequence = 0
12+
13+
type RequestMetricsParams = {
14+
logger: Logger
15+
userId: string
16+
agentId: string
17+
runId: string
18+
model: string
19+
streaming: boolean
20+
costMode: string | undefined
21+
logSampleRate?: number
22+
}
23+
24+
type EndReason = 'completed' | 'cancelled' | 'error'
25+
26+
export function beginChatCompletionRequestMetrics({
27+
logger,
28+
userId,
29+
agentId,
30+
runId,
31+
model,
32+
streaming,
33+
costMode,
34+
logSampleRate = DEFAULT_LOG_SAMPLE_RATE,
35+
}: RequestMetricsParams) {
36+
const requestSequence = ++nextRequestSequence
37+
const startedAt = Date.now()
38+
activeChatCompletionRequests += 1
39+
const activeRequestsAtStart = activeChatCompletionRequests
40+
const normalizedLogSampleRate = Math.max(0, Math.min(1, logSampleRate))
41+
const shouldLog = Math.random() < normalizedLogSampleRate
42+
43+
const baseFields = {
44+
metric: 'chat_completion_concurrency',
45+
host: HOSTNAME,
46+
pid: process.pid,
47+
requestSequence,
48+
userId,
49+
agentId,
50+
runId,
51+
model,
52+
streaming,
53+
costMode,
54+
logSampleRate: normalizedLogSampleRate,
55+
}
56+
57+
if (shouldLog) {
58+
logger.info(
59+
{
60+
...baseFields,
61+
event: 'start',
62+
activeChatCompletionRequests: activeRequestsAtStart,
63+
},
64+
'Chat completion request started',
65+
)
66+
}
67+
68+
let ended = false
69+
70+
const end = (reason: EndReason, extra?: Record<string, unknown>) => {
71+
if (ended) return
72+
ended = true
73+
activeChatCompletionRequests = Math.max(0, activeChatCompletionRequests - 1)
74+
75+
if (!shouldLog) return
76+
77+
logger.info(
78+
{
79+
...baseFields,
80+
...extra,
81+
event: 'finish',
82+
endReason: reason,
83+
durationMs: Date.now() - startedAt,
84+
activeRequestsAtStart,
85+
activeChatCompletionRequests,
86+
},
87+
'Chat completion request finished',
88+
)
89+
}
90+
91+
return {
92+
end,
93+
wrapStream(stream: ReadableStream<Uint8Array>) {
94+
const reader = stream.getReader()
95+
96+
return new ReadableStream<Uint8Array>({
97+
async pull(controller) {
98+
try {
99+
const { done, value } = await reader.read()
100+
if (done) {
101+
end('completed')
102+
controller.close()
103+
return
104+
}
105+
controller.enqueue(value)
106+
} catch (error) {
107+
end('error', { error: getErrorObject(error) })
108+
controller.error(error)
109+
}
110+
},
111+
async cancel(reason) {
112+
end('cancelled', {
113+
cancelReason:
114+
typeof reason === 'string' ? reason : getErrorObject(reason),
115+
})
116+
await reader.cancel(reason)
117+
},
118+
})
119+
},
120+
}
121+
}
122+
123+
export function getActiveChatCompletionRequestCount() {
124+
return activeChatCompletionRequests
125+
}

0 commit comments

Comments
 (0)