From ce43ed95ae5d671896e729c51c4a00c2a90b7abc Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Wed, 21 Jan 2026 20:25:26 -0800 Subject: [PATCH] fix(rivetkit/next-js): fix request handler --- .../packages/next-js/src/mod.ts | 153 ++++++++++-------- scripts/run/dev-env.sh | 16 ++ 2 files changed, 104 insertions(+), 65 deletions(-) create mode 100644 scripts/run/dev-env.sh diff --git a/rivetkit-typescript/packages/next-js/src/mod.ts b/rivetkit-typescript/packages/next-js/src/mod.ts index 4d84e4f748..f94cab0cbe 100644 --- a/rivetkit-typescript/packages/next-js/src/mod.ts +++ b/rivetkit-typescript/packages/next-js/src/mod.ts @@ -4,15 +4,23 @@ import type { Registry } from "rivetkit"; import { stringifyError } from "rivetkit/utils"; import { logger } from "./log"; +const ROUTE_FILE = join( + process.cwd(), + ".next/server/app/api/rivet/[...all]/route.js", +); +const WATCH_INTERVAL_MS = 500; + export const toNextHandler = (registry: Registry) => { // Don't run server locally since we're using the fetch handler directly registry.config.serveManager = false; // Set basePath to "/" since Next.js route strips the /api/rivet prefix - registry.config.serverless = { ...registry.config.serverless, basePath: "/" }; + registry.config.serverless = { + ...registry.config.serverless, + basePath: "/", + }; if (process.env.NODE_ENV !== "production") { - // Auto-configure serverless runner if not in prod logger().debug( "detected development environment, auto-starting engine and auto-configuring serverless", ); @@ -42,30 +50,30 @@ export const toNextHandler = (registry: Registry) => { // Next logs this on every request registry.config.noWelcome = true; - // Function that Next will call when handling requests const fetchWrapper = async ( request: Request, { params }: { params: Promise<{ all: string[] }> }, ): Promise => { const { all } = await params; - - const newUrl = new URL(request.url); - newUrl.pathname = `/${all.join("/")}`; - - if (process.env.NODE_ENV === "development") { - // Special request handling for file watching - return await handleRequestWithFileWatcher(request, newUrl, registry.handler); - } else { - // Handle request - const newReq = new Request(newUrl, request); - return await registry.handler(newReq); + const targetUrl = new URL(request.url); + targetUrl.pathname = `/${all.join("/")}`; + + if (process.env.NODE_ENV !== "production") { + return await handleRequestWithFileWatcher( + registry, + request, + targetUrl, + ); } + + return await registry.handler(new Request(targetUrl, request)); }; return { GET: fetchWrapper, POST: fetchWrapper, PUT: fetchWrapper, + DELETE: fetchWrapper, PATCH: fetchWrapper, HEAD: fetchWrapper, OPTIONS: fetchWrapper, @@ -79,69 +87,70 @@ export const toNextHandler = (registry: Registry) => { * See docs on watchRouteFile for more information. */ async function handleRequestWithFileWatcher( + registry: Registry, request: Request, newUrl: URL, - fetch: (request: Request, ...args: any) => Response | Promise, ): Promise { - // Create a new abort controller that we can abort, since the signal on - // the request we cannot control + // Create a new abort controller that we can abort since we cannot control the + // signal on the Request passed in by Next.js const mergedController = new AbortController(); const abortMerged = () => mergedController.abort(); - request.signal?.addEventListener("abort", abortMerged); + request.signal.addEventListener("abort", abortMerged); - // Watch for file changes in dev - // - // We spawn one watcher per-request since there is not a clean way of - // cleaning up global watchers when hot reloading in Next const watchIntervalId = watchRouteFile(mergedController); + const clearWatcher = () => clearInterval(watchIntervalId); // Clear interval if request is aborted request.signal.addEventListener("abort", () => { logger().debug("clearing file watcher interval: request aborted"); - clearInterval(watchIntervalId); + clearWatcher(); }); - // Replace URL and abort signal - const newReq = new Request(newUrl, { - // Copy old request properties - method: request.method, - headers: request.headers, - body: request.body, - credentials: request.credentials, - cache: request.cache, - redirect: request.redirect, - referrer: request.referrer, - integrity: request.integrity, - // Override with new signal - signal: mergedController.signal, - // Required for streaming body - duplex: "half", - } as RequestInit); - - // Handle request - const response = await fetch(newReq); - - // HACK: Next.js does not provide a way to detect when a request - // finishes, so we need to tap the response stream - // - // We can't just wait for `await fetch` to finish since SSE streams run - // for longer + const newReq = cloneRequestWithSignal( + newUrl, + request, + mergedController.signal, + ); + + let response: Response; + try { + // Handle request with merged abort signal + response = await registry.handler(newReq); + } catch (err) { + logger().warn({ + msg: "file watcher handler failed, falling back to direct handler", + err: stringifyError(err), + }); + clearWatcher(); + return await registry.handler(new Request(newUrl, request)); + } + + // HACK: Next.js does not provide a way to detect when a request finishes, so + // we need to tap the response stream. if (response.body) { const wrappedStream = waitForStreamFinish(response.body, () => { logger().debug("clearing file watcher interval: stream finished"); - clearInterval(watchIntervalId); + clearWatcher(); }); return new Response(wrappedStream, { status: response.status, statusText: response.statusText, headers: response.headers, }); - } else { - // No response body, clear interval immediately - logger().debug("clearing file watcher interval: no response body"); - clearInterval(watchIntervalId); - return response; } + + logger().debug("clearing file watcher interval: no response body"); + clearWatcher(); + return response; +} + +function cloneRequestWithSignal( + newUrl: URL, + request: Request, + signal: AbortSignal, +): Request { + const baseReq = new Request(newUrl, request); + return new Request(baseReq, { signal }); } /** @@ -156,24 +165,38 @@ async function handleRequestWithFileWatcher( function watchRouteFile(abortController: AbortController): NodeJS.Timeout { logger().debug("starting file watcher"); - const routePath = join( - process.cwd(), - ".next/server/app/api/rivet/[...all]/route.js", - ); + let lastMtime: number | undefined; + let missingWarningShown = false; - let lastMtime: number | null = null; const checkFile = () => { - logger().debug({ msg: "checking for file changes", routePath }); + logger().debug({ + msg: "checking for file changes", + routePath: ROUTE_FILE, + }); + try { - if (!existsSync(routePath)) { + if (!existsSync(ROUTE_FILE)) { + if (!missingWarningShown) { + logger().warn({ + msg: "route file missing, hot reloading disabled until it recompiles", + routePath: ROUTE_FILE, + }); + missingWarningShown = true; + } + lastMtime = undefined; return; } - const stats = statSync(routePath); + missingWarningShown = false; + + const stats = statSync(ROUTE_FILE); const mtime = stats.mtimeMs; - if (lastMtime !== null && mtime !== lastMtime) { - logger().info({ msg: "route file changed", routePath }); + if (lastMtime !== undefined && mtime !== lastMtime) { + logger().info({ + msg: "route file changed", + routePath: ROUTE_FILE, + }); abortController.abort(); } @@ -188,7 +211,7 @@ function watchRouteFile(abortController: AbortController): NodeJS.Timeout { checkFile(); - return setInterval(checkFile, 1000); + return setInterval(checkFile, WATCH_INTERVAL_MS); } /** diff --git a/scripts/run/dev-env.sh b/scripts/run/dev-env.sh new file mode 100644 index 0000000000..ca1eed4e7c --- /dev/null +++ b/scripts/run/dev-env.sh @@ -0,0 +1,16 @@ +# Development environment variables for running the Rivet engine locally. +# Source this file before running the engine: source scripts/run/dev-env.sh + +# Reduce backoff for runner recovery (in milliseconds) +export RIVET__PEGBOARD__RETRY_RESET_DURATION="100" +export RIVET__PEGBOARD__BASE_RETRY_TIMEOUT="100" +export RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT="1" + +# Reduce thresholds for faster development iteration (in milliseconds) +export RIVET__PEGBOARD__RUNNER_ELIGIBLE_THRESHOLD="5000" +export RIVET__PEGBOARD__RUNNER_LOST_THRESHOLD="7000" + +# Reduce shutdown durations for faster development iteration (in seconds) +export RIVET__RUNTIME__WORKER_SHUTDOWN_DURATION="1" +export RIVET__RUNTIME__GUARD_SHUTDOWN_DURATION="1" +export RIVET__RUNTIME__FORCE_SHUTDOWN_DURATION="2"