Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 55 additions & 17 deletions src/routes/chat-completions/handler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Context } from "hono"

import consola from "consola"
import { streamSSE, type SSEMessage } from "hono/streaming"

import { awaitApproval } from "~/lib/approval"
import { checkRateLimit } from "~/lib/rate-limit"
Expand All @@ -10,7 +9,6 @@ import { getTokenCount } from "~/lib/tokenizer"
import { isNullish } from "~/lib/utils"
import {
createChatCompletions,
type ChatCompletionResponse,
type ChatCompletionsPayload,
} from "~/services/copilot/create-chat-completions"

Expand Down Expand Up @@ -39,7 +37,23 @@ export async function handleCompletion(c: Context) {

if (state.manualApprove) await awaitApproval()

if (isNullish(payload.max_tokens)) {
// Normalize max_completion_tokens to max_tokens for upstream compatibility
const payloadAny = payload as unknown as Record<string, unknown>
if (
payloadAny.max_completion_tokens !== null
&& payloadAny.max_completion_tokens !== undefined
&& isNullish(payload.max_tokens)
) {
payload = {
...payload,
max_tokens: payloadAny.max_completion_tokens as number,
}
delete (payload as unknown as Record<string, unknown>).max_completion_tokens
consola.debug(
"Normalized max_completion_tokens to max_tokens:",
payload.max_tokens,
)
} else if (isNullish(payload.max_tokens)) {
payload = {
...payload,
max_tokens: selectedModel?.capabilities.limits.max_output_tokens,
Expand All @@ -49,20 +63,44 @@ export async function handleCompletion(c: Context) {

const response = await createChatCompletions(payload)

if (isNonStreaming(response)) {
consola.debug("Non-streaming response:", JSON.stringify(response))
return c.json(response)
}
// Streaming: response is a raw fetch Response — pipe body directly
if (response instanceof Response) {
const startTime = Date.now()
let byteCount = 0
let chunkCount = 0

const monitor = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk: Uint8Array, controller) {
chunkCount++
byteCount += chunk.byteLength
if (chunkCount === 1) {
consola.info(`TTFT: ${Date.now() - startTime}ms`)
}
controller.enqueue(chunk)
},
flush() {
consola.info(
`Stream complete: ${chunkCount} chunks, ${byteCount} bytes in ${Date.now() - startTime}ms`,
)
},
})

consola.debug("Streaming response")
return streamSSE(c, async (stream) => {
for await (const chunk of response) {
consola.debug("Streaming chunk:", JSON.stringify(chunk))
await stream.writeSSE(chunk as SSEMessage)
const body = response.body
if (!body) {
return c.text("No response body", 502)
}
})
}

const isNonStreaming = (
response: Awaited<ReturnType<typeof createChatCompletions>>,
): response is ChatCompletionResponse => Object.hasOwn(response, "choices")
return new Response(body.pipeThrough(monitor), {
status: 200,
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
})
}

// Non-streaming: response is a parsed JSON object
consola.debug("Non-streaming response:", JSON.stringify(response))
return c.json(response)
}
86 changes: 42 additions & 44 deletions src/routes/messages/handler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Context } from "hono"

import consola from "consola"
import { events } from "fetch-event-stream"
import { streamSSE } from "hono/streaming"

import { awaitApproval } from "~/lib/approval"
Expand All @@ -9,7 +10,6 @@ import { state } from "~/lib/state"
import {
createChatCompletions,
type ChatCompletionChunk,
type ChatCompletionResponse,
} from "~/services/copilot/create-chat-completions"

import {
Expand Down Expand Up @@ -40,52 +40,50 @@ export async function handleCompletion(c: Context) {

const response = await createChatCompletions(openAIPayload)

if (isNonStreaming(response)) {
consola.debug(
"Non-streaming response from Copilot:",
JSON.stringify(response).slice(-400),
)
const anthropicResponse = translateToAnthropic(response)
consola.debug(
"Translated Anthropic response:",
JSON.stringify(anthropicResponse),
)
return c.json(anthropicResponse)
}

consola.debug("Streaming response from Copilot")
return streamSSE(c, async (stream) => {
const streamState: AnthropicStreamState = {
messageStartSent: false,
contentBlockIndex: 0,
contentBlockOpen: false,
toolCalls: {},
}

for await (const rawEvent of response) {
consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent))
if (rawEvent.data === "[DONE]") {
break
// Streaming: response is a raw fetch Response
if (response instanceof Response) {
consola.debug("Streaming response from Copilot")
return streamSSE(c, async (stream) => {
const streamState: AnthropicStreamState = {
messageStartSent: false,
contentBlockIndex: 0,
contentBlockOpen: false,
toolCalls: {},
}

if (!rawEvent.data) {
continue
}
for await (const rawEvent of events(response)) {
consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent))
if (rawEvent.data === "[DONE]") {
break
}

if (!rawEvent.data) {
continue
}

const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
const events = translateChunkToAnthropicEvents(chunk, streamState)
const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
const sseEvents = translateChunkToAnthropicEvents(chunk, streamState)

for (const event of events) {
consola.debug("Translated Anthropic event:", JSON.stringify(event))
await stream.writeSSE({
event: event.type,
data: JSON.stringify(event),
})
for (const event of sseEvents) {
consola.debug("Translated Anthropic event:", JSON.stringify(event))
await stream.writeSSE({
event: event.type,
data: JSON.stringify(event),
})
}
}
}
})
}
})
}

const isNonStreaming = (
response: Awaited<ReturnType<typeof createChatCompletions>>,
): response is ChatCompletionResponse => Object.hasOwn(response, "choices")
// Non-streaming: response is a parsed JSON object
consola.debug(
"Non-streaming response from Copilot:",
JSON.stringify(response).slice(-400),
)
const anthropicResponse = translateToAnthropic(response)
consola.debug(
"Translated Anthropic response:",
JSON.stringify(anthropicResponse),
)
return c.json(anthropicResponse)
}
3 changes: 1 addition & 2 deletions src/services/copilot/create-chat-completions.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import consola from "consola"
import { events } from "fetch-event-stream"

import { copilotHeaders, copilotBaseUrl } from "~/lib/api-config"
import { HTTPError } from "~/lib/error"
Expand Down Expand Up @@ -40,7 +39,7 @@ export const createChatCompletions = async (
}

if (payload.stream) {
return events(response)
return response
}

return (await response.json()) as ChatCompletionResponse
Expand Down
5 changes: 5 additions & 0 deletions src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ export async function runServer(options: RunServerOptions): Promise<void> {
serve({
fetch: server.fetch as ServerHandler,
port: options.port,
bun: {
// Disable idle timeout to prevent Bun from closing SSE/streaming connections
// Default is 10 seconds which kills long-running LLM streams
idleTimeout: 0,
},
})
}

Expand Down