Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 88 additions & 65 deletions rivetkit-typescript/packages/next-js/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@ import type { Registry } from "rivetkit";
import { stringifyError } from "rivetkit/utils";
import { logger } from "./log";
Comment on lines 4 to 5

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import statements need to be sorted according to Biome linter rules. Built-in modules like 'path' should come first, followed by external packages, then local imports. Reorder to: import { existsSync, join, statSync } from 'path'; import { Registry } from 'rivetkit/registry'; import { stringifyError } from 'rivetkit/utils'; import { logger } from './log';

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


const ROUTE_FILE = join(
process.cwd(),
".next/server/app/api/rivet/[...all]/route.js",
);
const WATCH_INTERVAL_MS = 500;

export const toNextHandler = (registry: Registry<any>) => {
// Don't run server locally since we're using the fetch handler directly
registry.config.serveManager = false;

// Set basePath to "/" since Next.js route strips the /api/rivet prefix
registry.config.serverless = { ...registry.config.serverless, basePath: "/" };
registry.config.serverless = {
...registry.config.serverless,
basePath: "/",
};

if (process.env.NODE_ENV !== "production") {
// Auto-configure serverless runner if not in prod
logger().debug(
"detected development environment, auto-starting engine and auto-configuring serverless",
);
Expand Down Expand Up @@ -42,30 +50,30 @@ export const toNextHandler = (registry: Registry<any>) => {
// Next logs this on every request
registry.config.noWelcome = true;

// Function that Next will call when handling requests
const fetchWrapper = async (
request: Request,
{ params }: { params: Promise<{ all: string[] }> },
): Promise<Response> => {
const { all } = await params;

const newUrl = new URL(request.url);
newUrl.pathname = `/${all.join("/")}`;

if (process.env.NODE_ENV === "development") {
// Special request handling for file watching
return await handleRequestWithFileWatcher(request, newUrl, registry.handler);
} else {
// Handle request
const newReq = new Request(newUrl, request);
return await registry.handler(newReq);
const targetUrl = new URL(request.url);
targetUrl.pathname = `/${all.join("/")}`;

if (process.env.NODE_ENV !== "production") {
return await handleRequestWithFileWatcher(
registry,
request,
targetUrl,
);
}

return await registry.handler(new Request(targetUrl, request));
};

return {
GET: fetchWrapper,
POST: fetchWrapper,
PUT: fetchWrapper,
DELETE: fetchWrapper,
PATCH: fetchWrapper,
HEAD: fetchWrapper,
OPTIONS: fetchWrapper,
Expand All @@ -79,69 +87,70 @@ export const toNextHandler = (registry: Registry<any>) => {
* See docs on watchRouteFile for more information.
*/
async function handleRequestWithFileWatcher(
registry: Registry<any>,
request: Request,
newUrl: URL,
fetch: (request: Request, ...args: any) => Response | Promise<Response>,
): Promise<Response> {
// Create a new abort controller that we can abort, since the signal on
// the request we cannot control
// Create a new abort controller that we can abort since we cannot control the
// signal on the Request passed in by Next.js
const mergedController = new AbortController();
const abortMerged = () => mergedController.abort();
request.signal?.addEventListener("abort", abortMerged);
request.signal.addEventListener("abort", abortMerged);

// Watch for file changes in dev
//
// We spawn one watcher per-request since there is not a clean way of
// cleaning up global watchers when hot reloading in Next
const watchIntervalId = watchRouteFile(mergedController);
const clearWatcher = () => clearInterval(watchIntervalId);

// Clear interval if request is aborted
request.signal.addEventListener("abort", () => {
logger().debug("clearing file watcher interval: request aborted");
clearInterval(watchIntervalId);
clearWatcher();
});

// Replace URL and abort signal
const newReq = new Request(newUrl, {
// Copy old request properties
method: request.method,
headers: request.headers,
body: request.body,
credentials: request.credentials,
cache: request.cache,
redirect: request.redirect,
referrer: request.referrer,
integrity: request.integrity,
// Override with new signal
signal: mergedController.signal,
// Required for streaming body
duplex: "half",
} as RequestInit);

// Handle request
const response = await fetch(newReq);

// HACK: Next.js does not provide a way to detect when a request
// finishes, so we need to tap the response stream
//
// We can't just wait for `await fetch` to finish since SSE streams run
// for longer
const newReq = cloneRequestWithSignal(
newUrl,
request,
mergedController.signal,
);

let response: Response;
try {
// Handle request with merged abort signal
response = await registry.handler(newReq);
} catch (err) {
logger().warn({
msg: "file watcher handler failed, falling back to direct handler",
err: stringifyError(err),
});
clearWatcher();
return await registry.handler(new Request(newUrl, request));
}

// HACK: Next.js does not provide a way to detect when a request finishes, so
// we need to tap the response stream.
if (response.body) {
const wrappedStream = waitForStreamFinish(response.body, () => {
logger().debug("clearing file watcher interval: stream finished");
clearInterval(watchIntervalId);
clearWatcher();
});
return new Response(wrappedStream, {
status: response.status,
statusText: response.statusText,
headers: response.headers,
});
} else {
// No response body, clear interval immediately
logger().debug("clearing file watcher interval: no response body");
clearInterval(watchIntervalId);
return response;
}

logger().debug("clearing file watcher interval: no response body");
clearWatcher();
return response;
}

function cloneRequestWithSignal(
newUrl: URL,
request: Request,
signal: AbortSignal,
): Request {
const baseReq = new Request(newUrl, request);
return new Request(baseReq, { signal });
}

/**
Expand All @@ -156,24 +165,38 @@ async function handleRequestWithFileWatcher(
function watchRouteFile(abortController: AbortController): NodeJS.Timeout {
logger().debug("starting file watcher");

const routePath = join(
process.cwd(),
".next/server/app/api/rivet/[...all]/route.js",
);
let lastMtime: number | undefined;
let missingWarningShown = false;

let lastMtime: number | null = null;
const checkFile = () => {
logger().debug({ msg: "checking for file changes", routePath });
logger().debug({
msg: "checking for file changes",
routePath: ROUTE_FILE,
});

try {
if (!existsSync(routePath)) {
if (!existsSync(ROUTE_FILE)) {
if (!missingWarningShown) {
logger().warn({
msg: "route file missing, hot reloading disabled until it recompiles",
routePath: ROUTE_FILE,
});
missingWarningShown = true;
}
lastMtime = undefined;
return;
}

const stats = statSync(routePath);
missingWarningShown = false;

const stats = statSync(ROUTE_FILE);
const mtime = stats.mtimeMs;

if (lastMtime !== null && mtime !== lastMtime) {
logger().info({ msg: "route file changed", routePath });
if (lastMtime !== undefined && mtime !== lastMtime) {
logger().info({
msg: "route file changed",
routePath: ROUTE_FILE,
});
abortController.abort();
}

Expand All @@ -188,7 +211,7 @@ function watchRouteFile(abortController: AbortController): NodeJS.Timeout {

checkFile();

return setInterval(checkFile, 1000);
return setInterval(checkFile, WATCH_INTERVAL_MS);
}

/**
Expand Down
16 changes: 16 additions & 0 deletions scripts/run/dev-env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Development environment variables for running the Rivet engine locally.
# Source this file before running the engine: source scripts/run/dev-env.sh

# Reduce backoff for runner recovery (in milliseconds)
export RIVET__PEGBOARD__RETRY_RESET_DURATION="100"
export RIVET__PEGBOARD__BASE_RETRY_TIMEOUT="100"
export RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT="1"

# Reduce thresholds for faster development iteration (in milliseconds)
export RIVET__PEGBOARD__RUNNER_ELIGIBLE_THRESHOLD="5000"
export RIVET__PEGBOARD__RUNNER_LOST_THRESHOLD="7000"

# Reduce shutdown durations for faster development iteration (in seconds)
export RIVET__RUNTIME__WORKER_SHUTDOWN_DURATION="1"
export RIVET__RUNTIME__GUARD_SHUTDOWN_DURATION="1"
export RIVET__RUNTIME__FORCE_SHUTDOWN_DURATION="2"
Loading