From 665419ee64e50c84c724b9f7502b379ffad05b44 Mon Sep 17 00:00:00 2001 From: Nicolas Dorseuil Date: Fri, 8 May 2026 15:26:12 +0200 Subject: [PATCH 1/3] Refactor body handling to use ReadableStream across converters and middleware Co-authored-by: Copilot --- .../src/core/routing/cacheInterceptor.ts | 2 +- .../open-next/src/core/routing/middleware.ts | 6 ++- packages/open-next/src/core/routing/util.ts | 9 +---- packages/open-next/src/http/request.ts | 38 +++++++++++++------ .../src/overrides/converters/aws-apigw-v1.ts | 5 +-- .../src/overrides/converters/aws-apigw-v2.ts | 14 ++++--- .../overrides/converters/aws-cloudfront.ts | 4 +- .../src/overrides/converters/edge.ts | 8 ++-- .../src/overrides/converters/node.ts | 12 ++---- .../overrides/proxyExternalRequest/node.ts | 8 +++- packages/open-next/src/types/open-next.ts | 2 +- .../tests/converters/aws-apigw-v1.test.ts | 14 ++++--- .../tests/converters/aws-apigw-v2.test.ts | 10 +++-- .../tests/converters/aws-cloudfront.test.ts | 8 ++-- .../tests-unit/tests/converters/node.test.ts | 21 +++++----- .../core/routing/cacheInterceptor.test.ts | 4 +- .../tests/core/routing/i18n.test.ts | 3 +- .../tests/core/routing/matcher.test.ts | 3 +- .../tests/core/routing/middleware.test.ts | 2 +- .../tests/core/routing/util.test.ts | 8 +++- 20 files changed, 106 insertions(+), 75 deletions(-) diff --git a/packages/open-next/src/core/routing/cacheInterceptor.ts b/packages/open-next/src/core/routing/cacheInterceptor.ts index 271b123f..53f6edb4 100644 --- a/packages/open-next/src/core/routing/cacheInterceptor.ts +++ b/packages/open-next/src/core/routing/cacheInterceptor.ts @@ -144,7 +144,7 @@ function createPprPartialResult( "next-resume": "1", }, rawPath: localizedPath, - body: Buffer.from(cachedValue.meta?.postponed || "", "utf-8"), + body: toReadableStream(cachedValue.meta?.postponed || ""), }, result: { type: "core", diff --git a/packages/open-next/src/core/routing/middleware.ts b/packages/open-next/src/core/routing/middleware.ts index 78638c95..7c730877 100644 --- a/packages/open-next/src/core/routing/middleware.ts +++ b/packages/open-next/src/core/routing/middleware.ts @@ -65,6 +65,8 @@ export async function handleMiddleware( const middleware = await middlewareLoader(); + const [bodyForMiddleware, bodyForForward] = internalEvent.body?.tee() ?? [undefined, undefined]; + const result: Response = await middleware.default({ // `geo` is pre Next 15. geo: { @@ -84,7 +86,7 @@ export async function handleMiddleware( trailingSlash: NextConfig.trailingSlash, }, url, - body: convertBodyToReadableStream(internalEvent.method, internalEvent.body), + body: convertBodyToReadableStream(internalEvent.method, bodyForMiddleware), } as unknown as Request); const statusCode = result.status; @@ -175,7 +177,7 @@ export async function handleMiddleware( rawPath: new URL(newUrl).pathname, type: internalEvent.type, headers: { ...internalEvent.headers, ...reqHeaders }, - body: internalEvent.body, + body: bodyForForward, method: internalEvent.method, query: middlewareQuery, cookies: internalEvent.cookies, diff --git a/packages/open-next/src/core/routing/util.ts b/packages/open-next/src/core/routing/util.ts index 0d4ead50..13e4f034 100644 --- a/packages/open-next/src/core/routing/util.ts +++ b/packages/open-next/src/core/routing/util.ts @@ -203,15 +203,10 @@ export function unescapeRegex(str: string) { /** * @__PURE__ */ -export function convertBodyToReadableStream(method: string, body?: string | Buffer) { +export function convertBodyToReadableStream(method: string, body?: ReadableStream) { if (method === "GET" || method === "HEAD") return undefined; if (!body) return undefined; - return new ReadableStream({ - start(controller) { - controller.enqueue(body); - controller.close(); - }, - }); + return body; } enum CommonHeaders { diff --git a/packages/open-next/src/http/request.ts b/packages/open-next/src/http/request.ts index a70eaa9f..39b73e87 100644 --- a/packages/open-next/src/http/request.ts +++ b/packages/open-next/src/http/request.ts @@ -4,6 +4,7 @@ // @ts-nocheck import http from "node:http"; +import type { ReadableStream } from "node:stream/web"; export class IncomingMessage extends http.IncomingMessage { constructor({ @@ -16,7 +17,7 @@ export class IncomingMessage extends http.IncomingMessage { method: string; url: string; headers: Record; - body?: Buffer; + body?: ReadableStream; remoteAddress?: string; }) { super({ @@ -28,12 +29,6 @@ export class IncomingMessage extends http.IncomingMessage { destroy: Function.prototype, }); - // Set the content length when there is a body. - // See https://httpwg.org/specs/rfc9110.html#field.content-length - if (body) { - headers["content-length"] ??= String(Buffer.byteLength(body)); - } - Object.assign(this, { ip: remoteAddress, complete: true, @@ -46,9 +41,30 @@ export class IncomingMessage extends http.IncomingMessage { url, }); - this._read = () => { - this.push(body); - this.push(null); - }; + this._read = (() => { + if (!body) { + return () => { + this.push(null); + }; + } + let started = false; + const reader = body.getReader(); + const pump = () => { + reader.read().then(({ done, value }) => { + if (done) { + this.push(null); + } else { + this.push(value); + pump(); + } + }); + }; + return () => { + if (!started) { + started = true; + pump(); + } + }; + })(); } } diff --git a/packages/open-next/src/overrides/converters/aws-apigw-v1.ts b/packages/open-next/src/overrides/converters/aws-apigw-v1.ts index fdc83a07..784326f5 100644 --- a/packages/open-next/src/overrides/converters/aws-apigw-v1.ts +++ b/packages/open-next/src/overrides/converters/aws-apigw-v1.ts @@ -2,14 +2,13 @@ import type { APIGatewayProxyEvent, APIGatewayProxyResult } from "aws-lambda"; import type { InternalEvent, InternalResult } from "@/types/open-next"; import type { Converter } from "@/types/overrides"; -import { fromReadableStream } from "@/utils/stream"; +import { fromReadableStream, toReadableStream } from "@/utils/stream"; import { debug } from "../../adapters/logger"; import { extractHostFromHeaders, removeUndefinedFromQuery } from "./utils"; function normalizeAPIGatewayProxyEventHeaders(event: APIGatewayProxyEvent): Record { - event.multiValueHeaders; const headers: Record = {}; for (const [key, values] of Object.entries(event.multiValueHeaders || {})) { @@ -73,7 +72,7 @@ async function convertFromAPIGatewayProxyEvent(event: APIGatewayProxyEvent): Pro method: httpMethod, rawPath: path, url: `https://${extractHostFromHeaders(headers)}${path}${normalizeAPIGatewayProxyEventQueryParams(event)}`, - body: Buffer.from(body ?? "", isBase64Encoded ? "base64" : "utf8"), + body: body ? toReadableStream(body, isBase64Encoded) : undefined, headers, remoteAddress: requestContext.identity.sourceIp, query: removeUndefinedFromQuery(normalizeAPIGatewayProxyEventMultiValueQueryStringParameters(event)), diff --git a/packages/open-next/src/overrides/converters/aws-apigw-v2.ts b/packages/open-next/src/overrides/converters/aws-apigw-v2.ts index 87c53b84..9760fc7d 100644 --- a/packages/open-next/src/overrides/converters/aws-apigw-v2.ts +++ b/packages/open-next/src/overrides/converters/aws-apigw-v2.ts @@ -1,9 +1,11 @@ +import type { ReadableStream } from "node:stream/web"; + import type { APIGatewayProxyEventV2, APIGatewayProxyResultV2 } from "aws-lambda"; import { parseSetCookieHeader } from "@/http/util"; import type { InternalEvent, InternalResult } from "@/types/open-next"; import type { Converter } from "@/types/overrides"; -import { fromReadableStream } from "@/utils/stream"; +import { fromReadableStream, toReadableStream } from "@/utils/stream"; import { debug } from "../../adapters/logger"; import { convertToQuery } from "../../core/routing/util"; @@ -36,18 +38,18 @@ const CloudFrontBlacklistedHeaders = [ "via", ]; -function normalizeAPIGatewayProxyEventV2Body(event: APIGatewayProxyEventV2): Buffer { +function normalizeAPIGatewayProxyEventV2Body(event: APIGatewayProxyEventV2): ReadableStream | undefined { const { body, isBase64Encoded } = event; if (Buffer.isBuffer(body)) { - return body; + return toReadableStream(body); } if (typeof body === "string") { - return Buffer.from(body, isBase64Encoded ? "base64" : "utf8"); + return toReadableStream(body, isBase64Encoded); } if (typeof body === "object") { - return Buffer.from(JSON.stringify(body)); + return toReadableStream(JSON.stringify(body)); } - return Buffer.from("", "utf8"); + return undefined; } function normalizeAPIGatewayProxyEventV2Headers(event: APIGatewayProxyEventV2): Record { diff --git a/packages/open-next/src/overrides/converters/aws-cloudfront.ts b/packages/open-next/src/overrides/converters/aws-cloudfront.ts index 678b114e..1689c8ea 100644 --- a/packages/open-next/src/overrides/converters/aws-cloudfront.ts +++ b/packages/open-next/src/overrides/converters/aws-cloudfront.ts @@ -11,7 +11,7 @@ import type { import { parseSetCookieHeader } from "@/http/util"; import type { InternalEvent, InternalResult, MiddlewareResult } from "@/types/open-next"; import type { Converter } from "@/types/overrides"; -import { fromReadableStream } from "@/utils/stream"; +import { fromReadableStream, toReadableStream } from "@/utils/stream"; import { debug } from "../../adapters/logger"; import { convertToQuery, convertToQueryString } from "../../core/routing/util"; @@ -85,7 +85,7 @@ async function convertFromCloudFrontRequestEvent(event: CloudFrontRequestEvent): method, rawPath: uri, url: `https://${extractHostFromHeaders(headers)}${uri}${querystring ? `?${querystring}` : ""}`, - body: Buffer.from(body?.data ?? "", body?.encoding === "base64" ? "base64" : "utf8"), + body: body?.data ? toReadableStream(body.data, body.encoding === "base64") : undefined, headers, remoteAddress: clientIp, query: convertToQuery(querystring), diff --git a/packages/open-next/src/overrides/converters/edge.ts b/packages/open-next/src/overrides/converters/edge.ts index 5261283d..2f4cd2ae 100644 --- a/packages/open-next/src/overrides/converters/edge.ts +++ b/packages/open-next/src/overrides/converters/edge.ts @@ -1,4 +1,4 @@ -import { Buffer } from "node:buffer"; +import type { ReadableStream } from "node:stream/web"; import cookieParser from "cookie"; @@ -32,7 +32,7 @@ const converter: Converter = { const shouldHaveBody = method !== "GET" && method !== "HEAD"; // Only read body for methods that should have one - const body = shouldHaveBody ? Buffer.from(await request.arrayBuffer()) : undefined; + const body = shouldHaveBody ? ((request.body as ReadableStream | undefined) ?? undefined) : undefined; const cookieHeader = request.headers.get("cookie"); const cookies = cookieHeader ? (cookieParser.parse(cookieHeader) as Record) : {}; @@ -106,7 +106,9 @@ const converter: Converter = { } // We should not return a body for statusCode's that doesn't allow bodies - const body = NULL_BODY_STATUSES.has(result.statusCode) ? null : (result.body as ReadableStream); + const body = NULL_BODY_STATUSES.has(result.statusCode) + ? null + : (result.body as unknown as globalThis.ReadableStream); return new Response(body, { status: result.statusCode, diff --git a/packages/open-next/src/overrides/converters/node.ts b/packages/open-next/src/overrides/converters/node.ts index 60c5fffa..bdffaef3 100644 --- a/packages/open-next/src/overrides/converters/node.ts +++ b/packages/open-next/src/overrides/converters/node.ts @@ -1,4 +1,6 @@ import type { IncomingMessage } from "node:http"; +import { Readable } from "node:stream"; +import type { ReadableStream } from "node:stream/web"; import cookieParser from "cookie"; @@ -10,15 +12,7 @@ import { extractHostFromHeaders, getQueryFromSearchParams } from "./utils.js"; const converter: Converter = { convertFrom: async (event: unknown) => { const req = event as IncomingMessage & { protocol?: string }; - const body = await new Promise((resolve) => { - const chunks: Uint8Array[] = []; - req.on("data", (chunk) => { - chunks.push(chunk); - }); - req.on("end", () => { - resolve(Buffer.concat(chunks)); - }); - }); + const body: ReadableStream = Readable.toWeb(req); const headers = Object.fromEntries( Object.entries(req.headers ?? {}) diff --git a/packages/open-next/src/overrides/proxyExternalRequest/node.ts b/packages/open-next/src/overrides/proxyExternalRequest/node.ts index 97fdef0a..a42d56e7 100644 --- a/packages/open-next/src/overrides/proxyExternalRequest/node.ts +++ b/packages/open-next/src/overrides/proxyExternalRequest/node.ts @@ -1,5 +1,6 @@ import { request } from "node:https"; import { Readable } from "node:stream"; +import type { ReadableStream } from "node:stream/web"; import type { InternalEvent, InternalResult } from "@/types/open-next"; import type { ProxyExternalRequest } from "@/types/overrides"; @@ -73,9 +74,12 @@ const nodeProxy: ProxyExternalRequest = { ); if (body && method !== "GET" && method !== "HEAD") { - req.write(body); + Readable.fromWeb(body as ReadableStream) + .on("error", reject) + .pipe(req); + } else { + req.end(); } - req.end(); }); }, }; diff --git a/packages/open-next/src/types/open-next.ts b/packages/open-next/src/types/open-next.ts index 8216e491..207d500b 100644 --- a/packages/open-next/src/types/open-next.ts +++ b/packages/open-next/src/types/open-next.ts @@ -26,7 +26,7 @@ export type InternalEvent = { readonly rawPath: string; // Full URL - starts with "https://on/" when the host is not available readonly url: string; - readonly body?: Buffer; + readonly body?: ReadableStream; //TODO: change the type of headers to Record readonly headers: Record; readonly query: Record; diff --git a/packages/tests-unit/tests/converters/aws-apigw-v1.test.ts b/packages/tests-unit/tests/converters/aws-apigw-v1.test.ts index ca7870ef..d191b015 100644 --- a/packages/tests-unit/tests/converters/aws-apigw-v1.test.ts +++ b/packages/tests-unit/tests/converters/aws-apigw-v1.test.ts @@ -1,6 +1,7 @@ import { Readable } from "node:stream"; import converter from "@opennextjs/aws/overrides/converters/aws-apigw-v1.js"; +import { fromReadableStream } from "@opennextjs/aws/utils/stream.js"; import type { APIGatewayProxyEvent, APIGatewayProxyResult } from "aws-lambda"; describe("convertTo", () => { @@ -88,7 +89,7 @@ describe("convertFrom", () => { method: "POST", rawPath: "/", url: "https://on/", - body: Buffer.from('{"message":"Hello, world!"}'), + body: expect.any(ReadableStream), headers: { "content-type": "application/json", }, @@ -127,7 +128,7 @@ describe("convertFrom", () => { method: "POST", rawPath: "/", url: "https://on/", - body: Buffer.from('{"message":"Hello, world!"}'), + body: expect.any(ReadableStream), headers: { test: "test1,test2", }, @@ -168,7 +169,7 @@ describe("convertFrom", () => { method: "POST", rawPath: "/", url: "https://on/?test=test", - body: Buffer.from('{"message":"Hello, world!"}'), + body: expect.any(ReadableStream), headers: {}, remoteAddress: "::1", query: { @@ -209,7 +210,7 @@ describe("convertFrom", () => { method: "POST", rawPath: "/", url: "https://on/?test=testA&test=testB", - body: Buffer.from('{"message":"Hello, world!"}'), + body: expect.any(ReadableStream), headers: {}, remoteAddress: "::1", query: { @@ -250,7 +251,7 @@ describe("convertFrom", () => { method: "POST", rawPath: "/", url: "https://on/", - body: Buffer.from('{"message":"Hello, world!"}'), + body: expect.any(ReadableStream), headers: { "content-type": "application/json", cookie: "test1=1,test2=2", @@ -293,7 +294,7 @@ describe("convertFrom", () => { method: "GET", rawPath: "/", url: "https://on/", - body: Buffer.from("Hello, world!"), + body: expect.any(ReadableStream), headers: { "content-type": "application/json", }, @@ -301,5 +302,6 @@ describe("convertFrom", () => { query: {}, cookies: {}, }); + expect(await fromReadableStream(response.body!)).toEqual("Hello, world!"); }); }); diff --git a/packages/tests-unit/tests/converters/aws-apigw-v2.test.ts b/packages/tests-unit/tests/converters/aws-apigw-v2.test.ts index 3137b000..b05e4960 100644 --- a/packages/tests-unit/tests/converters/aws-apigw-v2.test.ts +++ b/packages/tests-unit/tests/converters/aws-apigw-v2.test.ts @@ -1,6 +1,7 @@ import { Readable } from "node:stream"; import converter from "@opennextjs/aws/overrides/converters/aws-apigw-v2.js"; +import { fromReadableStream } from "@opennextjs/aws/utils/stream.js"; import type { APIGatewayProxyEventV2 } from "aws-lambda"; import { vi } from "vitest"; @@ -128,7 +129,7 @@ describe("convertFrom", () => { method: "POST", rawPath: "/", url: "https://on/", - body: Buffer.from('{"message":"Hello, world!"}'), + body: expect.any(ReadableStream), headers: { "content-type": "application/json", }, @@ -165,7 +166,7 @@ describe("convertFrom", () => { method: "POST", rawPath: "/", url: "https://on/", - body: Buffer.from('{"message":"Hello, world!"}'), + body: expect.any(ReadableStream), headers: { "content-type": "application/json", cookie: "foo=bar; hello=world", @@ -206,7 +207,7 @@ describe("convertFrom", () => { method: "POST", rawPath: "/", url: "https://on/?hello=world&foo=1&foo=2", - body: Buffer.from('{"message":"Hello, world!"}'), + body: expect.any(ReadableStream), headers: { "content-type": "application/json", }, @@ -246,7 +247,7 @@ describe("convertFrom", () => { method: "POST", rawPath: "/", url: "https://on/", - body: Buffer.from('{"message":"Hello, world!"}'), + body: expect.any(ReadableStream), headers: { "content-type": "application/json", }, @@ -254,5 +255,6 @@ describe("convertFrom", () => { query: {}, remoteAddress: "::1", }); + expect(await fromReadableStream(response.body!)).toEqual('{"message":"Hello, world!"}'); }); }); diff --git a/packages/tests-unit/tests/converters/aws-cloudfront.test.ts b/packages/tests-unit/tests/converters/aws-cloudfront.test.ts index 6c18e956..bfdff37f 100644 --- a/packages/tests-unit/tests/converters/aws-cloudfront.test.ts +++ b/packages/tests-unit/tests/converters/aws-cloudfront.test.ts @@ -1,6 +1,7 @@ import { Readable } from "node:stream"; import converter from "@opennextjs/aws/overrides/converters/aws-cloudfront.js"; +import { fromReadableStream } from "@opennextjs/aws/utils/stream.js"; import type { CloudFrontRequestEvent, CloudFrontRequestResult } from "aws-lambda"; import { vi } from "vitest"; @@ -226,7 +227,7 @@ describe("convertFrom", () => { method: "POST", rawPath: "/", url: "https://on/", - body: Buffer.from('{"message":"Hello, world!"}'), + body: expect.any(ReadableStream), headers: { "content-type": "application/json", }, @@ -266,7 +267,7 @@ describe("convertFrom", () => { method: "POST", rawPath: "/", url: "https://on/?hello=world&foo=1&foo=2", - body: Buffer.from('{"message":"Hello, world!"}'), + body: expect.any(ReadableStream), headers: { "content-type": "application/json", }, @@ -309,7 +310,7 @@ describe("convertFrom", () => { method: "POST", rawPath: "/", url: "https://on/", - body: Buffer.from('{"message":"Hello, world!"}'), + body: expect.any(ReadableStream), headers: { "content-type": "application/json", }, @@ -317,5 +318,6 @@ describe("convertFrom", () => { query: {}, cookies: {}, }); + expect(await fromReadableStream(response.body!)).toEqual('{"message":"Hello, world!"}'); }); }); diff --git a/packages/tests-unit/tests/converters/node.test.ts b/packages/tests-unit/tests/converters/node.test.ts index c1705ef3..66106caa 100644 --- a/packages/tests-unit/tests/converters/node.test.ts +++ b/packages/tests-unit/tests/converters/node.test.ts @@ -1,5 +1,6 @@ import { IncomingMessage } from "@opennextjs/aws/http/request.js"; import converter from "@opennextjs/aws/overrides/converters/node.js"; +import { fromReadableStream, toReadableStream } from "@opennextjs/aws/utils/stream.js"; describe("convertFrom", () => { it("should convert GET request", async () => { @@ -23,7 +24,7 @@ describe("convertFrom", () => { "content-length": "0", }, remoteAddress: "::1", - body: Buffer.from(""), + body: expect.any(ReadableStream), cookies: {}, query: {}, }); @@ -52,7 +53,7 @@ describe("convertFrom", () => { host: "localhost", }, remoteAddress: "127.0.0.1", - body: Buffer.from(""), + body: expect.any(ReadableStream), cookies: {}, query: {}, }); @@ -78,7 +79,7 @@ describe("convertFrom", () => { "content-length": "0", }, remoteAddress: "::1", - body: Buffer.from(""), + body: expect.any(ReadableStream), cookies: {}, query: {}, }); @@ -107,7 +108,7 @@ describe("convertFrom", () => { "x-forwarded-for": "127.0.0.2", }, remoteAddress: "127.0.0.2", - body: Buffer.from(""), + body: expect.any(ReadableStream), cookies: {}, query: {}, }); @@ -136,7 +137,7 @@ describe("convertFrom", () => { host: "localhost", }, remoteAddress: "::1", - body: Buffer.from(""), + body: expect.any(ReadableStream), cookies: {}, query: { search: "1", @@ -155,7 +156,7 @@ describe("convertFrom", () => { cookie: "foo=bar", }, remoteAddress: "::1", - body: Buffer.from("{}"), + body: toReadableStream("{}"), }) ); @@ -170,12 +171,13 @@ describe("convertFrom", () => { cookie: "foo=bar", }, remoteAddress: "::1", - body: Buffer.from("{}"), + body: expect.any(ReadableStream), cookies: { foo: "bar", }, query: {}, }); + expect(await fromReadableStream(result.body!)).toEqual("{}"); }); it("should convert PUT request with multiple cookie headers", async () => { @@ -189,7 +191,7 @@ describe("convertFrom", () => { cookie: "foo=bar; hello=world", }, remoteAddress: "::1", - body: Buffer.from("{}"), + body: toReadableStream("{}"), }) ); @@ -204,12 +206,13 @@ describe("convertFrom", () => { cookie: "foo=bar; hello=world", }, remoteAddress: "::1", - body: Buffer.from("{}"), + body: expect.any(ReadableStream), cookies: { foo: "bar", hello: "world", }, query: {}, }); + expect(await fromReadableStream(result.body!)).toEqual("{}"); }); }); diff --git a/packages/tests-unit/tests/core/routing/cacheInterceptor.test.ts b/packages/tests-unit/tests/core/routing/cacheInterceptor.test.ts index 2b79831c..10b3a345 100644 --- a/packages/tests-unit/tests/core/routing/cacheInterceptor.test.ts +++ b/packages/tests-unit/tests/core/routing/cacheInterceptor.test.ts @@ -2,7 +2,7 @@ import { cacheInterceptor } from "@opennextjs/aws/core/routing/cacheInterceptor. import { convertFromQueryString } from "@opennextjs/aws/core/routing/util.js"; import type { MiddlewareEvent } from "@opennextjs/aws/types/open-next.js"; import type { Queue } from "@opennextjs/aws/types/overrides.js"; -import { fromReadableStream } from "@opennextjs/aws/utils/stream.js"; +import { fromReadableStream, toReadableStream } from "@opennextjs/aws/utils/stream.js"; import { vi } from "vitest"; vi.mock("@opennextjs/aws/adapters/config/index.js", () => ({ @@ -37,7 +37,7 @@ function createEvent(event: PartialEvent): MiddlewareEvent { method: event.method ?? "GET", rawPath, url: event.url ?? "/", - body: Buffer.from(event.body ?? ""), + body: event.body !== undefined ? toReadableStream(event.body) : undefined, headers: event.headers ?? {}, query: convertFromQueryString(qs ?? ""), cookies: event.cookies ?? {}, diff --git a/packages/tests-unit/tests/core/routing/i18n.test.ts b/packages/tests-unit/tests/core/routing/i18n.test.ts index d9b93083..da17510a 100644 --- a/packages/tests-unit/tests/core/routing/i18n.test.ts +++ b/packages/tests-unit/tests/core/routing/i18n.test.ts @@ -2,6 +2,7 @@ import { NextConfig } from "@opennextjs/aws/adapters/config/index.js"; import { handleLocaleRedirect, localizePath } from "@opennextjs/aws/core/routing/i18n/index.js"; import { convertFromQueryString } from "@opennextjs/aws/core/routing/util.js"; import type { InternalEvent } from "@opennextjs/aws/types/open-next.js"; +import { toReadableStream } from "@opennextjs/aws/utils/stream.js"; import { expect, vi } from "vitest"; vi.mock("@opennextjs/aws/adapters/config/index.js", () => { @@ -27,7 +28,7 @@ function createEvent(event: PartialEvent): InternalEvent { method: event.method ?? "GET", rawPath, url: event.url ?? "/", - body: Buffer.from(event.body ?? ""), + body: event.body !== undefined ? toReadableStream(event.body) : undefined, headers: event.headers ?? {}, query: convertFromQueryString(qs), cookies: event.cookies ?? {}, diff --git a/packages/tests-unit/tests/core/routing/matcher.test.ts b/packages/tests-unit/tests/core/routing/matcher.test.ts index dce38517..87f5c689 100644 --- a/packages/tests-unit/tests/core/routing/matcher.test.ts +++ b/packages/tests-unit/tests/core/routing/matcher.test.ts @@ -7,6 +7,7 @@ import { } from "@opennextjs/aws/core/routing/matcher.js"; import { convertFromQueryString } from "@opennextjs/aws/core/routing/util.js"; import type { InternalEvent } from "@opennextjs/aws/types/open-next.js"; +import { toReadableStream } from "@opennextjs/aws/utils/stream.js"; import { vi } from "vitest"; vi.mock("@opennextjs/aws/adapters/config/index.js", () => ({ @@ -97,7 +98,7 @@ function createEvent(event: PartialEvent): InternalEvent { method: event.method ?? "GET", rawPath: pathname, url: event.url ?? "/", - body: Buffer.from(event.body ?? ""), + body: event.body !== undefined ? toReadableStream(event.body) : undefined, headers: event.headers ?? {}, query: convertFromQueryString(search.slice(1)), cookies: event.cookies ?? {}, diff --git a/packages/tests-unit/tests/core/routing/middleware.test.ts b/packages/tests-unit/tests/core/routing/middleware.test.ts index d389d5bd..8b6ba50e 100644 --- a/packages/tests-unit/tests/core/routing/middleware.test.ts +++ b/packages/tests-unit/tests/core/routing/middleware.test.ts @@ -54,7 +54,7 @@ function createEvent(event: PartialEvent): InternalEvent { method: event.method ?? "GET", rawPath: pathname, url, - body: Buffer.from(event.body ?? ""), + body: event.body !== undefined ? toReadableStream(event.body) : undefined, headers: event.headers ?? {}, query: convertFromQueryString(search.slice(1)), cookies: event.cookies ?? {}, diff --git a/packages/tests-unit/tests/core/routing/util.test.ts b/packages/tests-unit/tests/core/routing/util.test.ts index 910e6a11..f1e3e785 100644 --- a/packages/tests-unit/tests/core/routing/util.test.ts +++ b/packages/tests-unit/tests/core/routing/util.test.ts @@ -451,7 +451,13 @@ describe("convertBodyToReadableStream", () => { }); it("returns readable stream for when body is provided", async () => { - const result = convertBodyToReadableStream("PUT", Buffer.from("body")); + const body = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode("body")); + controller.close(); + }, + }); + const result = convertBodyToReadableStream("PUT", body); expect(await fromReadableStream(result as any)).toEqual("body"); }); }); From 4a00e1319059d3b4a25287a3aaa1f7bc4dca18bf Mon Sep 17 00:00:00 2001 From: Nicolas Dorseuil Date: Sat, 9 May 2026 12:42:22 +0200 Subject: [PATCH 2/3] review fix Co-authored-by: Copilot --- packages/open-next/src/http/request.ts | 45 ++++++++++++++----- .../src/overrides/converters/node.ts | 3 +- .../overrides/proxyExternalRequest/node.ts | 17 ++++++- 3 files changed, 51 insertions(+), 14 deletions(-) diff --git a/packages/open-next/src/http/request.ts b/packages/open-next/src/http/request.ts index 39b73e87..e91a3a46 100644 --- a/packages/open-next/src/http/request.ts +++ b/packages/open-next/src/http/request.ts @@ -47,21 +47,44 @@ export class IncomingMessage extends http.IncomingMessage { this.push(null); }; } - let started = false; const reader = body.getReader(); + let reading = false; + let streamDone = false; + + this.once("close", () => { + if (!streamDone) { + streamDone = true; + reader.cancel().catch(() => {}); + } + }); + const pump = () => { - reader.read().then(({ done, value }) => { - if (done) { - this.push(null); - } else { - this.push(value); - pump(); - } - }); + reading = true; + reader + .read() + .then(({ done, value }) => { + if (done) { + streamDone = true; + reader.releaseLock(); + this.push(null); + } else { + const canContinue = this.push(value); + if (canContinue) { + pump(); + } else { + reading = false; + } + } + }) + .catch((err) => { + streamDone = true; + reader.cancel().catch(() => {}); + this.destroy(err); + }); }; + return () => { - if (!started) { - started = true; + if (!reading) { pump(); } }; diff --git a/packages/open-next/src/overrides/converters/node.ts b/packages/open-next/src/overrides/converters/node.ts index bdffaef3..efe5ffb4 100644 --- a/packages/open-next/src/overrides/converters/node.ts +++ b/packages/open-next/src/overrides/converters/node.ts @@ -12,7 +12,8 @@ import { extractHostFromHeaders, getQueryFromSearchParams } from "./utils.js"; const converter: Converter = { convertFrom: async (event: unknown) => { const req = event as IncomingMessage & { protocol?: string }; - const body: ReadableStream = Readable.toWeb(req); + const shouldHaveBody = req.method !== "GET" && req.method !== "HEAD"; + const body: ReadableStream | undefined = shouldHaveBody ? Readable.toWeb(req) : undefined; const headers = Object.fromEntries( Object.entries(req.headers ?? {}) diff --git a/packages/open-next/src/overrides/proxyExternalRequest/node.ts b/packages/open-next/src/overrides/proxyExternalRequest/node.ts index a42d56e7..3ab57d93 100644 --- a/packages/open-next/src/overrides/proxyExternalRequest/node.ts +++ b/packages/open-next/src/overrides/proxyExternalRequest/node.ts @@ -36,6 +36,15 @@ const nodeProxy: ProxyExternalRequest = { const { url, headers, method, body } = internalEvent; debug("proxyRequest", url); return new Promise((resolve, reject) => { + let hasRejected = false; + const rejectOnce = (e: Error) => { + if (hasRejected) { + return; + } + + hasRejected = true; + reject(e); + }; const filteredHeaders = filterHeadersForProxy(headers); debug("filteredHeaders", filteredHeaders); const req = request( @@ -68,14 +77,18 @@ const nodeProxy: ProxyExternalRequest = { _res.on("error", (e) => { error("proxyRequest error", e); - reject(e); + rejectOnce(e); }); } ); + req.on("error", (e) => { + error("proxyRequest error", e); + rejectOnce(e); + }); if (body && method !== "GET" && method !== "HEAD") { Readable.fromWeb(body as ReadableStream) - .on("error", reject) + .on("error", rejectOnce) .pipe(req); } else { req.end(); From bd0627896b4b16ac8bc9032c61922812aecb89c5 Mon Sep 17 00:00:00 2001 From: Nicolas Dorseuil Date: Sat, 9 May 2026 12:50:48 +0200 Subject: [PATCH 3/3] Update tests to expect undefined for body instead of ReadableStream --- packages/tests-unit/tests/converters/node.test.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/tests-unit/tests/converters/node.test.ts b/packages/tests-unit/tests/converters/node.test.ts index 66106caa..497d2e17 100644 --- a/packages/tests-unit/tests/converters/node.test.ts +++ b/packages/tests-unit/tests/converters/node.test.ts @@ -24,7 +24,7 @@ describe("convertFrom", () => { "content-length": "0", }, remoteAddress: "::1", - body: expect.any(ReadableStream), + body: undefined, cookies: {}, query: {}, }); @@ -53,7 +53,7 @@ describe("convertFrom", () => { host: "localhost", }, remoteAddress: "127.0.0.1", - body: expect.any(ReadableStream), + body: undefined, cookies: {}, query: {}, }); @@ -79,7 +79,7 @@ describe("convertFrom", () => { "content-length": "0", }, remoteAddress: "::1", - body: expect.any(ReadableStream), + body: undefined, cookies: {}, query: {}, }); @@ -108,7 +108,7 @@ describe("convertFrom", () => { "x-forwarded-for": "127.0.0.2", }, remoteAddress: "127.0.0.2", - body: expect.any(ReadableStream), + body: undefined, cookies: {}, query: {}, }); @@ -137,7 +137,7 @@ describe("convertFrom", () => { host: "localhost", }, remoteAddress: "::1", - body: expect.any(ReadableStream), + body: undefined, cookies: {}, query: { search: "1",