Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion web/src/app/api/_utils/backend-proxy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
withBackendAuth: withBackendAuthMock,
}))

import { apiBaseUrl, proxyJson, proxyText } from "./backend-proxy"
import { apiBaseUrl, proxyJson, proxyStream, proxyText } from "./backend-proxy"

describe("apiBaseUrl", () => {
it("uses the shared backend base URL helper", () => {
Expand Down Expand Up @@ -172,4 +172,81 @@
)
expect(res.status).toBe(200)
})

it("passes through SSE responses without buffering", async () => {
withBackendAuthMock.mockResolvedValue({
"Content-Type": "application/json",
authorization: "Bearer stream-token",
})

const fetchMock = vi.fn(async () =>
new Response("data: ping\n\n", {
status: 200,
headers: { "Content-Type": "text/event-stream" },
}),
)
global.fetch = fetchMock as typeof fetch

Check warning on line 188 in web/src/app/api/_utils/backend-proxy.test.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Prefer `globalThis` over `global`.

See more on https://sonarcloud.io/project/issues?id=jerry609_PaperBot&issues=AZzwXydwa2oXHuy7bYkA&open=AZzwXydwa2oXHuy7bYkA&pullRequest=409

const req = new Request("https://localhost/api/studio/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt: "hello" }),
})
const res = await proxyStream(req, "https://backend/api/studio/chat", "POST", {
auth: true,
responseContentType: "text/event-stream",
})

expect(withBackendAuthMock).toHaveBeenCalledTimes(1)
const calls = fetchMock.mock.calls as unknown[][]
const init = calls[0]?.[1] as
| (RequestInit & { dispatcher?: unknown })
| undefined
expect(init).toBeDefined()
expect(init).toMatchObject({
method: "POST",
headers: {
"Content-Type": "application/json",
authorization: "Bearer stream-token",
},
body: JSON.stringify({ prompt: "hello" }),
})
expect(init?.signal).toBeUndefined()
expect(init?.dispatcher).toBeDefined()
expect(res.headers.get("content-type")).toContain("text/event-stream")
expect(await res.text()).toContain("data: ping")
})

it("returns JSON when a stream route falls back to a non-stream response", async () => {
withBackendAuthMock.mockResolvedValue({
Accept: "text/event-stream, application/json",
"Content-Type": "application/json",
authorization: "Bearer stream-token",
})

const fetchMock = vi.fn(async () =>
new Response(new TextEncoder().encode(JSON.stringify({ done: true })), { status: 200 }),
)
global.fetch = fetchMock as typeof fetch

Check warning on line 230 in web/src/app/api/_utils/backend-proxy.test.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Prefer `globalThis` over `global`.

See more on https://sonarcloud.io/project/issues?id=jerry609_PaperBot&issues=AZzwXydwa2oXHuy7bYkB&open=AZzwXydwa2oXHuy7bYkB&pullRequest=409

const req = new Request("https://localhost/api/research/paperscool/daily", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ query: "llm" }),
})
const res = await proxyStream(
req,
"https://backend/api/research/paperscool/daily",
"POST",
{
accept: "text/event-stream, application/json",
auth: true,
passthroughNonStreamResponse: true,
responseContentType: "text/event-stream",
},
)

expect(res.headers.get("content-type")).toContain("application/json")
await expect(res.json()).resolves.toEqual({ done: true })
})
})
62 changes: 61 additions & 1 deletion web/src/app/api/_utils/backend-proxy.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Agent } from "undici"

import { backendBaseUrl, withBackendAuth } from "./auth-headers"

export type ProxyMethod = "DELETE" | "GET" | "HEAD" | "PATCH" | "POST" | "PUT"
Expand All @@ -22,7 +24,17 @@ type TextProxyOptions = ProxyOptions & {
responseHeaders?: HeadersInit
}

type StreamProxyOptions = ProxyOptions & {
dispatcher?: Agent
passthroughNonStreamResponse?: boolean
responseContentType?: string
}

const DEFAULT_TIMEOUT_MS = 120_000
const SSE_DISPATCHER = new Agent({
bodyTimeout: 0,
headersTimeout: 0,
})

export function apiBaseUrl(): string {
return backendBaseUrl()
Expand Down Expand Up @@ -65,11 +77,58 @@ export async function proxyText(
}
}

export async function proxyStream(
req: Request,
upstreamUrl: string,
method: ProxyMethod,
options: StreamProxyOptions = {},
): Promise<Response> {
const requestOptions = {
responseContentType: "text/event-stream",
timeoutMs: 0,
...options,
}

try {
const upstream = await fetchUpstream(req, upstreamUrl, method, requestOptions, {
dispatcher: requestOptions.dispatcher ?? SSE_DISPATCHER,
})
const upstreamContentType = upstream.headers.get("content-type") || ""

if (
requestOptions.passthroughNonStreamResponse &&
!upstreamContentType.includes("text/event-stream")
) {
const text = await upstream.text()
return buildTextResponse(text, upstream, {
responseContentType: "application/json",
responseHeaders: undefined,
})
}

const headers = new Headers()
headers.set(
"Content-Type",
upstreamContentType || requestOptions.responseContentType || "text/event-stream",
)
headers.set("Cache-Control", "no-cache")
headers.set("Connection", "keep-alive")

return new Response(upstream.body, {
status: upstream.status,
headers,
})
} catch (error) {
return handleProxyError(error, upstreamUrl, requestOptions.onError)
}
}

async function fetchUpstream(
req: Request,
upstreamUrl: string,
method: ProxyMethod,
options: ProxyOptions,
init: RequestInit & { dispatcher?: Agent } = {},
): Promise<Response> {
const controller = options.timeoutMs === 0 ? null : new AbortController()
const timeoutMs = options.timeoutMs ?? DEFAULT_TIMEOUT_MS
Expand All @@ -78,12 +137,13 @@ async function fetchUpstream(

try {
return await fetch(upstreamUrl, {
...init,
method,
headers: await resolveHeaders(req, body, options),
body,
cache: options.cache,
signal: controller?.signal,
})
} as RequestInit & { dispatcher?: Agent })
} finally {
if (timeout) {
clearTimeout(timeout)
Expand Down
118 changes: 118 additions & 0 deletions web/src/app/api/_utils/stream-route-contracts.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { beforeEach, describe, expect, it, vi } from "vitest"

const { apiBaseUrlMock, proxyStreamMock } = vi.hoisted(() => ({
apiBaseUrlMock: vi.fn(() => "http://backend.example.com"),
proxyStreamMock: vi.fn(),
}))

vi.mock("@/app/api/_utils/backend-proxy", () => ({
apiBaseUrl: apiBaseUrlMock,
proxyStream: proxyStreamMock,
}))

import { POST as genCodePost } from "@/app/api/gen-code/route"
import { POST as paperscoolAnalyzePost } from "@/app/api/research/paperscool/analyze/route"
import { POST as paperscoolDailyPost } from "@/app/api/research/paperscool/daily/route"
import { POST as studioChatPost } from "@/app/api/studio/chat/route"

type StreamRouteCase = {
expectedOptions: Record<string, unknown>
handler: (req: Request) => Promise<Response>
name: string
path: string
}

const streamRouteCases: StreamRouteCase[] = [
{
name: "studio chat",
handler: studioChatPost,
path: "/api/studio/chat",
expectedOptions: {
auth: true,
responseContentType: "text/event-stream",
},
},
{
name: "gen code",
handler: genCodePost,
path: "/api/gen-code",
expectedOptions: {
auth: true,
responseContentType: "text/event-stream",
},
},
{
name: "paperscool analyze",
handler: paperscoolAnalyzePost,
path: "/api/research/paperscool/analyze",
expectedOptions: {
accept: "text/event-stream",
responseContentType: "text/event-stream",
onError: expect.any(Function),
},
},
]

describe("stream route contracts", () => {
beforeEach(() => {
vi.resetAllMocks()
apiBaseUrlMock.mockReturnValue("http://backend.example.com")
})

it.each(streamRouteCases)("proxies $name through the shared stream helper", async ({
expectedOptions,
handler,
path,
}) => {
const req = new Request(`http://localhost${path}`, { method: "POST" })
proxyStreamMock.mockResolvedValueOnce(new Response("data: ping\n\n"))

await handler(req)

expect(proxyStreamMock).toHaveBeenCalledWith(
req,
`http://backend.example.com${path}`,
"POST",
expectedOptions,
)
})

it("preserves the daily stream/json passthrough fallback contract", async () => {
const req = new Request("http://localhost/api/research/paperscool/daily", {
method: "POST",
})
proxyStreamMock.mockResolvedValueOnce(new Response("data: ping\n\n"))

await paperscoolDailyPost(req)

expect(proxyStreamMock).toHaveBeenCalledWith(
req,
"http://backend.example.com/api/research/paperscool/daily",
"POST",
expect.objectContaining({
accept: "text/event-stream, application/json",
auth: true,
passthroughNonStreamResponse: true,
responseContentType: "text/event-stream",
onError: expect.any(Function),
}),
)

const calls = vi.mocked(proxyStreamMock).mock.calls as unknown[][]
const options = calls[0]?.[3] as
| { onError?: (context: { error: unknown; isTimeout: boolean; upstreamUrl: string }) => Response }
| undefined
const fallback = options?.onError?.({
error: new Error("offline"),
isTimeout: false,
upstreamUrl: "http://backend.example.com/api/research/paperscool/daily",
})

expect(fallback).toBeInstanceOf(Response)
expect(fallback?.status).toBe(502)
await expect(fallback?.json()).resolves.toEqual({
detail: "Upstream API unreachable",
error: "offline",
})
})
})
26 changes: 4 additions & 22 deletions web/src/app/api/gen-code/route.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,10 @@
export const runtime = "nodejs"

function apiBaseUrl() {
return process.env.PAPERBOT_API_BASE_URL || "http://127.0.0.1:8000"
}

import { withBackendAuth } from "../_utils/auth-headers"
import { apiBaseUrl, proxyStream } from "@/app/api/_utils/backend-proxy"

export async function POST(req: Request) {
const body = await req.text()
const upstream = await fetch(`${apiBaseUrl()}/api/gen-code`, {
method: "POST",
headers: await withBackendAuth(req, {
"Content-Type": req.headers.get("content-type") || "application/json",
}),
body,
})

const headers = new Headers()
headers.set("Content-Type", upstream.headers.get("content-type") || "text/event-stream")
headers.set("Cache-Control", "no-cache")
headers.set("Connection", "keep-alive")

return new Response(upstream.body, {
status: upstream.status,
headers,
return proxyStream(req, `${apiBaseUrl()}/api/gen-code`, "POST", {
auth: true,
responseContentType: "text/event-stream",
})
}
49 changes: 12 additions & 37 deletions web/src/app/api/research/paperscool/analyze/route.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,18 @@
export const runtime = "nodejs"

import { Agent } from "undici"

import { apiBaseUrl } from "../../_base"

// Analyze can stream for a long time; disable body timeout on the proxy hop.
const sseDispatcher = new Agent({
bodyTimeout: 0,
headersTimeout: 0,
})
import { apiBaseUrl, proxyStream } from "@/app/api/_utils/backend-proxy"

export async function POST(req: Request) {
const body = await req.text()
let upstream: Response
try {
upstream = await fetch(`${apiBaseUrl()}/api/research/paperscool/analyze`, {
method: "POST",
headers: {
"Content-Type": req.headers.get("content-type") || "application/json",
Accept: "text/event-stream",
},
body,
dispatcher: sseDispatcher,
} as RequestInit & { dispatcher: Agent })
} catch (error) {
const detail = error instanceof Error ? error.message : String(error)
return Response.json(
{ detail: "Upstream API unreachable", error: detail },
{ status: 502 },
)
}

const headers = new Headers()
headers.set("Content-Type", upstream.headers.get("content-type") || "text/event-stream")
headers.set("Cache-Control", "no-cache")
headers.set("Connection", "keep-alive")

return new Response(upstream.body, {
status: upstream.status,
headers,
return proxyStream(req, `${apiBaseUrl()}/api/research/paperscool/analyze`, "POST", {
accept: "text/event-stream",
responseContentType: "text/event-stream",
onError: ({ error }) =>
Response.json(
{
detail: "Upstream API unreachable",
error: error instanceof Error ? error.message : String(error),
},
{ status: 502 },
),
})
}
Loading
Loading