diff --git a/Cargo.lock b/Cargo.lock index 84e63204dd..6d7e68136d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2478,6 +2478,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9171a2ea8a68358193d15dd5d70c1c10a2afc3e7e4c5bc92bc9f025cebd7359c" + [[package]] name = "httparse" version = "1.10.1" @@ -6066,13 +6072,16 @@ version = "2.3.0-rc.12" dependencies = [ "anyhow", "async-trait", + "axum 0.8.4", "base64 0.22.1", + "bytes", "ciborium", "fs_extra", "futures", "getrandom 0.2.16", "http 1.3.1", "include_dir", + "http-body-util", "js-sys", "nix 0.30.1", "parking_lot", @@ -6097,7 +6106,9 @@ dependencies = [ "subtle", "tempfile", "tokio", + "tokio-stream", "tokio-util", + "tower-http", "tracing", "tracing-subscriber", "url", @@ -7837,10 +7848,19 @@ checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" dependencies = [ "bitflags 2.10.0", "bytes", + "futures-core", "futures-util", "http 1.3.1", "http-body 1.0.1", + "http-body-util", + "http-range-header", + "httpdate", + "mime", + "mime_guess", + "percent-encoding", "pin-project-lite", + "tokio", + "tokio-util", "tower 0.5.2", "tower-layer", "tower-service", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3446c6ad79..3beb6588ad 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -3039,12 +3039,6 @@ importers: rivetkit-typescript/packages/rivetkit: dependencies: - '@hono/node-server': - specifier: ^1.18.2 - version: 1.19.9(hono@4.11.9) - '@hono/node-ws': - specifier: ^1.1.1 - version: 1.3.0(@hono/node-server@1.19.9(hono@4.11.9))(hono@4.11.9) '@hono/zod-openapi': specifier: ^1.1.5 version: 1.1.5(hono@4.11.9)(zod@4.1.13) @@ -3087,9 +3081,6 @@ importers: drizzle-orm: specifier: ^0.44.2 version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.11)(kysely@0.28.15)(pg@8.17.2)(sql.js@1.13.0) - get-port: - specifier: ^7.1.0 - version: 7.1.0 hono: specifier: ^4.7.0 version: 4.11.9 @@ -3118,6 +3109,12 @@ importers: '@copilotkit/llmock': specifier: ^1.6.0 version: 1.7.1 + '@hono/node-server': + specifier: ^1.18.2 + version: 1.19.9(hono@4.11.9) + '@hono/node-ws': + specifier: ^1.1.1 + version: 1.3.0(@hono/node-server@1.19.9(hono@4.11.9))(hono@4.11.9) '@rivet-dev/agent-os-common': specifier: '*' version: 0.0.260331072558 @@ -3136,6 +3133,9 @@ importers: eventsource: specifier: ^4.0.0 version: 4.0.0 + get-port: + specifier: ^7.1.0 + version: 7.1.0 tsup: specifier: ^8.4.0 version: 8.5.1(@microsoft/api-extractor@7.53.2(@types/node@22.19.10))(@swc/core@1.15.11(@swc/helpers@0.5.17))(jiti@2.6.1)(postcss@8.5.6)(tsx@4.21.0)(typescript@5.9.3)(yaml@2.9.0) diff --git a/rivetkit-rust/packages/rivetkit-core/Cargo.toml b/rivetkit-rust/packages/rivetkit-core/Cargo.toml index b11d713ea3..b67b5c055a 100644 --- a/rivetkit-rust/packages/rivetkit-core/Cargo.toml +++ b/rivetkit-rust/packages/rivetkit-core/Cargo.toml @@ -15,6 +15,11 @@ default = ["native-runtime"] native-runtime = [ "dep:nix", "dep:reqwest", + "dep:axum", + "dep:bytes", + "dep:http-body-util", + "dep:tokio-stream", + "dep:tower-http", "rivet-envoy-client/native-transport", ] wasm-runtime = ["rivet-envoy-client/wasm-transport"] @@ -29,11 +34,14 @@ fs_extra = { workspace = true } [dependencies] anyhow.workspace = true async-trait = { workspace = true, optional = true } +axum = { workspace = true, optional = true } base64.workspace = true +bytes = { workspace = true, optional = true } ciborium.workspace = true futures.workspace = true http.workspace = true include_dir = { workspace = true } +http-body-util = { workspace = true, optional = true } nix = { workspace = true, optional = true } parking_lot.workspace = true rand.workspace = true @@ -54,7 +62,9 @@ serde_bare.workspace = true serde_bytes.workspace = true sha2.workspace = true subtle.workspace = true +tokio-stream = { workspace = true, optional = true } tokio-util.workspace = true +tower-http = { workspace = true, optional = true, features = ["fs"] } tracing.workspace = true url.workspace = true vbare.workspace = true diff --git a/rivetkit-rust/packages/rivetkit-core/src/lib.rs b/rivetkit-rust/packages/rivetkit-core/src/lib.rs index 6e8b6a8d24..9ec56b9714 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/lib.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/lib.rs @@ -12,6 +12,8 @@ pub mod metrics_endpoint; pub mod registry; pub mod runtime; pub mod serverless; +#[cfg(feature = "native-runtime")] +pub mod serverless_http; pub(crate) mod time { use std::fmt; use std::future::Future; diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs index d7b76a0c21..52624ef87e 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs @@ -102,6 +102,9 @@ impl CoreEnvoyHandle { } } + /// Engine-reported drain threshold in milliseconds. `None` until the + /// envoy has completed its first protocol-metadata exchange with the + /// engine. pub async fn actor_stop_threshold_ms(&self) -> Option { self.handle .get_protocol_metadata() diff --git a/rivetkit-rust/packages/rivetkit-core/src/serverless.rs b/rivetkit-rust/packages/rivetkit-core/src/serverless.rs index 8b90c7f300..75d939fda5 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/serverless.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/serverless.rs @@ -136,10 +136,9 @@ struct NamespaceMismatch { "message", "incoming_too_long", "Incoming message too long.", - "Incoming message too long. Received {size} bytes, limit is {limit} bytes." + "Incoming message too long. Exceeded limit of {limit} bytes." )] struct IncomingMessageTooLong { - size: usize, limit: usize, } @@ -252,6 +251,29 @@ impl CoreServerlessRuntime { CoreEnvoyHandle::new(handle).actor_stop_threshold_ms().await } + /// Listener-side body cap; reuses the `/start` payload limit. + pub fn max_request_body_bytes(&self) -> usize { + self.settings.max_start_payload_bytes + } + + /// Canonical 413 response built through the `RivetError` system. + pub fn incoming_too_long_response(&self) -> ServerlessResponse { + let error = IncomingMessageTooLong { + limit: self.settings.max_start_payload_bytes, + } + .build(); + error_response(error) + } + + /// Canonical 400 response for malformed requests. + pub fn invalid_request_response(&self, reason: impl Into) -> ServerlessResponse { + let error = InvalidRequest { + reason: reason.into(), + } + .build(); + error_response(error) + } + pub async fn handle_request(&self, req: ServerlessRequest) -> ServerlessResponse { let cors = cors_headers(&req); match self.handle_request_inner(req).await { @@ -334,7 +356,6 @@ impl CoreServerlessRuntime { ); if req.body.len() > self.settings.max_start_payload_bytes { return Err(IncomingMessageTooLong { - size: req.body.len(), limit: self.settings.max_start_payload_bytes, } .build()); diff --git a/rivetkit-rust/packages/rivetkit-core/src/serverless_http.rs b/rivetkit-rust/packages/rivetkit-core/src/serverless_http.rs new file mode 100644 index 0000000000..8ec129a64c --- /dev/null +++ b/rivetkit-rust/packages/rivetkit-core/src/serverless_http.rs @@ -0,0 +1,209 @@ +use std::collections::HashMap; +use std::path::PathBuf; +use std::pin::Pin; +use std::task::{Context as TaskContext, Poll}; + +use anyhow::{Context, Result}; +use axum::Router; +use axum::body::{Body, Bytes}; +use axum::extract::{Request, State}; +use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; +use axum::response::IntoResponse; +use axum::routing::any; +use futures::Stream; +use futures::StreamExt; +use http_body_util::LengthLimitError; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_util::sync::CancellationToken; +use tower_http::services::ServeDir; + +use crate::serverless::{CoreServerlessRuntime, ServerlessRequest, ServerlessResponse}; + +#[derive(Debug, Clone)] +pub struct ListenerConfig { + /// Host to bind; accepts numeric IPs or DNS names. Defaults to `0.0.0.0`. + pub host: Option, + pub port: u16, + pub public_dir: Option, +} + +#[derive(Clone)] +struct AppState { + runtime: CoreServerlessRuntime, + shutdown_token: CancellationToken, +} + +/// Bind a TCP listener and serve `runtime` over HTTP until `shutdown` fires. +pub async fn serve( + runtime: CoreServerlessRuntime, + listener: ListenerConfig, + shutdown: CancellationToken, +) -> Result<()> { + let host = listener.host.as_deref().unwrap_or("0.0.0.0"); + let port = listener.port; + + let state = AppState { + runtime, + shutdown_token: shutdown.clone(), + }; + + let forward_service = any(forward_request).with_state(state); + + let router = match listener.public_dir.as_ref() { + Some(dir) => Router::new().fallback_service( + ServeDir::new(dir) + .call_fallback_on_method_not_allowed(true) + .fallback(forward_service), + ), + None => Router::new().fallback_service(forward_service), + }; + + let tcp = tokio::net::TcpListener::bind((host, port)) + .await + .with_context(|| format!("bind tcp listener on {host}:{port}"))?; + let bound = tcp + .local_addr() + .context("read local address of bound listener")?; + tracing::info!(host = %bound.ip(), port = bound.port(), "rivetkit server listening"); + + let shutdown_fut = { + let shutdown = shutdown.clone(); + async move { shutdown.cancelled().await } + }; + + axum::serve(tcp, router.into_make_service()) + .with_graceful_shutdown(shutdown_fut) + .await + .context("axum::serve returned an error")?; + + Ok(()) +} + +async fn forward_request( + State(state): State, + request: Request, +) -> axum::response::Response { + let (parts, body) = request.into_parts(); + let body_limit = state.runtime.max_request_body_bytes(); + let request_token = state.shutdown_token.child_token(); + let body_bytes = match axum::body::to_bytes(body, body_limit).await { + Ok(bytes) => bytes, + Err(error) if is_length_limit_error(&error) => { + tracing::warn!(body_limit, "request body exceeded limit"); + return into_axum_response(state.runtime.incoming_too_long_response(), request_token); + } + Err(error) => { + tracing::warn!(?error, "failed to read request body"); + return into_axum_response( + state + .runtime + .invalid_request_response("failed to read request body"), + request_token, + ); + } + }; + + let path_and_query = parts + .uri + .path_and_query() + .map(|pq| pq.as_str()) + .unwrap_or("/"); + let url = format!("http://internal{path_and_query}"); + + // Repeated header names get comma-joined per RFC 9110 ยง5.3. + let mut headers: HashMap = HashMap::new(); + for (name, value) in parts.headers.iter() { + let Ok(value_str) = value.to_str() else { + continue; + }; + let key = name.as_str().to_ascii_lowercase(); + headers + .entry(key) + .and_modify(|existing| { + existing.push_str(", "); + existing.push_str(value_str); + }) + .or_insert_with(|| value_str.to_owned()); + } + + let req = ServerlessRequest { + method: parts.method.as_str().to_owned(), + url, + headers, + body: body_bytes.to_vec(), + cancel_token: request_token.clone(), + }; + + into_axum_response(state.runtime.handle_request(req).await, request_token) +} + +fn into_axum_response( + response: ServerlessResponse, + request_token: CancellationToken, +) -> axum::response::Response { + let status = StatusCode::from_u16(response.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + let mut header_map = HeaderMap::with_capacity(response.headers.len()); + for (name, value) in response.headers { + if let (Ok(name), Ok(value)) = ( + HeaderName::try_from(name.as_str()), + HeaderValue::from_str(&value), + ) { + header_map.append(name, value); + } + } + + let stream = UnboundedReceiverStream::new(response.body).map(|chunk| match chunk { + Ok(bytes) => Ok::(Bytes::from(bytes)), + Err(error) => { + tracing::warn!(?error, "serverless stream error"); + Err(std::io::Error::other(format!( + "{}.{}: {}", + error.group, error.code, error.message + ))) + } + }); + + // Cancel the runtime task when the response body is dropped. + let guarded = CancelOnDropStream { + inner: stream, + _guard: CancelOnDrop { + token: request_token, + }, + }; + + (status, header_map, Body::from_stream(guarded)).into_response() +} + +fn is_length_limit_error(error: &axum::Error) -> bool { + let mut source: Option<&dyn std::error::Error> = Some(error); + while let Some(err) = source { + if err.is::() { + return true; + } + source = err.source(); + } + false +} + +struct CancelOnDrop { + token: CancellationToken, +} + +impl Drop for CancelOnDrop { + fn drop(&mut self) { + self.token.cancel(); + } +} + +struct CancelOnDropStream { + inner: S, + _guard: CancelOnDrop, +} + +impl Stream for CancelOnDropStream { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) + } +} diff --git a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts index abd33164cb..6d99414903 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts +++ b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts @@ -187,6 +187,16 @@ export interface JsServeConfig { serverlessValidateEndpoint: boolean serverlessMaxStartPayloadBytes: number } +export interface JsListenerConfig { + /** Host to bind. Defaults to `0.0.0.0` when not provided. */ + host?: string + port: number + /** + * Optional static file root mounted as a fallback below the framework + * routes. + */ + publicDir?: string +} export interface JsServerlessRequest { method: string url: string @@ -344,6 +354,7 @@ export declare class CoreRegistry { health(): Promise metadata(): JsRegistryRouteResponse metrics(): JsRegistryRouteResponse + serveListener(listener: JsListenerConfig, config: JsServeConfig): Promise handleServerlessRequest(req: JsServerlessRequest, onStreamEvent: (...args: any[]) => any, cancelToken: CancellationToken, config: JsServeConfig): Promise } export declare class Schedule { diff --git a/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs b/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs index f9ad0a9b9c..7bcddea0e2 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs +++ b/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs @@ -10,6 +10,7 @@ use parking_lot::Mutex as ParkingMutex; use rivetkit_core::{ CoreRegistry as NativeCoreRegistry, CoreServerlessRuntime, EngineSpawnMode, ServeConfig, ServerlessRequest, registry::CoreEnvoyHandle, serverless::ServerlessStreamError, + serverless_http::{self, ListenerConfig}, }; use tokio::sync::{Mutex as TokioMutex, Notify}; use tokio_util::sync::CancellationToken as CoreCancellationToken; @@ -38,6 +39,16 @@ pub struct JsServeConfig { pub serverless_max_start_payload_bytes: u32, } +#[napi(object)] +pub struct JsListenerConfig { + /// Host to bind. Defaults to `0.0.0.0` when not provided. + pub host: Option, + pub port: u32, + /// Optional static file root mounted as a fallback below the framework + /// routes. + pub public_dir: Option, +} + #[napi(object)] pub struct JsServerlessRequest { pub method: String, @@ -350,6 +361,27 @@ impl CoreRegistry { }) } + #[napi] + pub async fn serve_listener( + &self, + listener: JsListenerConfig, + config: JsServeConfig, + ) -> napi::Result<()> { + let port: u16 = listener + .port + .try_into() + .map_err(|_| napi_anyhow_error(anyhow::anyhow!("port out of range")))?; + let listener_config = ListenerConfig { + host: listener.host, + port, + public_dir: listener.public_dir.map(PathBuf::from), + }; + let runtime = self.ensure_serverless_runtime(config).await?; + serverless_http::serve(runtime, listener_config, self.shutdown_token.clone()) + .await + .map_err(napi_anyhow_error) + } + #[napi(ts_return_type = "Promise")] pub fn handle_serverless_request( &self, diff --git a/rivetkit-typescript/packages/rivetkit/package.json b/rivetkit-typescript/packages/rivetkit/package.json index 4b864ab335..5a0beee2a6 100644 --- a/rivetkit-typescript/packages/rivetkit/package.json +++ b/rivetkit-typescript/packages/rivetkit/package.json @@ -188,8 +188,6 @@ "actor-config-schema-gen": "tsx scripts/actor-config-schema-gen.ts" }, "dependencies": { - "@hono/node-server": "^1.18.2", - "@hono/node-ws": "^1.1.1", "@hono/zod-openapi": "^1.1.5", "@rivet-dev/agent-os-core": "^0.1.1", "@rivetkit/bare-ts": "^0.6.2", @@ -203,7 +201,6 @@ "@rivetkit/workflow-engine": "workspace:*", "cbor-x": "^1.6.0", "drizzle-orm": "^0.44.2", - "get-port": "^7.1.0", "hono": "^4.7.0", "invariant": "^2.2.4", "p-retry": "^6.2.1", @@ -215,12 +212,15 @@ "devDependencies": { "@biomejs/biome": "^2.3", "@copilotkit/llmock": "^1.6.0", + "@hono/node-server": "^1.18.2", + "@hono/node-ws": "^1.1.1", "@rivet-dev/agent-os-common": "*", "@rivet-dev/agent-os-pi": "^0.1.1", "@standard-schema/spec": "^1.0.0", "@types/invariant": "^2", "@types/node": "^22.13.1", "eventsource": "^4.0.0", + "get-port": "^7.1.0", "tsup": "^8.4.0", "tsx": "^4.19.4", "typescript": "^5.7.3", diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts index ab81ed3c77..8a3b993082 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts @@ -1,8 +1,11 @@ -import { Hono } from "hono"; import { isLocalEngineEndpoint } from "@/common/engine"; import { configureServerlessPool } from "@/serverless/configure"; -import { detectRuntime, VERSION } from "@/utils"; -import { crossPlatformServe, loadRuntimeServeStatic } from "@/utils/serve"; +import { VERSION } from "@/utils"; +import { + getRivetkitPublicDir, + getRivetkitRuntimeMode, + parsePortEnv, +} from "@/utils/env-vars"; import { type RegistryActors, type RegistryConfig, @@ -89,6 +92,12 @@ export class Registry { }; } + /** + * Fires `configureServerlessPool` once per process when the registry + * config opts into it. Cached on the instance so repeated calls (from + * `handler()` and `listen()`) only run the upsert once. The retry loop + * inside `configureServerlessPool` tolerates the engine still warming up. + */ #ensureServerlessPoolConfigured( config: RegistryConfig, ): Promise | undefined { @@ -322,13 +331,15 @@ export class Registry { } /** - * Starts an HTTP server that dispatches every request through the - * serverless handler. Uses `crossPlatformServe` to pick the right - * runtime (Node, Bun, Deno). + * Bind an HTTP listener provided by the native (Rust) runtime and serve + * the registry's serverless endpoints over it. Resolves only after the + * registry is shut down (SIGINT/SIGTERM or `nativeRegistry.shutdown()`). * - * @param opts.port Port to listen on. Defaults to 3000. + * @param opts.port Port to listen on. Defaults to `process.env.RIVET_PORT` + * if set, otherwise 3000. + * @param opts.host Address to bind. Defaults to `0.0.0.0`. * @param opts.publicDir If set, serves static files from this directory - * before falling through to the registry handler. + * as a fallback below the framework routes. * * @example * ```ts @@ -337,18 +348,34 @@ export class Registry { * ``` */ public async listen( - opts: { port?: number; publicDir?: string } = {}, + opts: { port?: number; host?: string; publicDir?: string } = {}, ): Promise { - const port = opts.port ?? 3000; + const port = opts.port ?? parsePortEnv(process.env.RIVET_PORT) ?? 3000; + const publicDir = opts.publicDir ?? getRivetkitPublicDir(); const config = this.parseConfig(); - const runtime = detectRuntime(); - const app = new Hono(); - if (opts.publicDir) { - const serveStatic = await loadRuntimeServeStatic(runtime); - app.use("*", serveStatic({ root: opts.publicDir })); - } - app.all("*", (c) => this.handler(c.req.raw)); - await crossPlatformServe(config, port, app, runtime); + + // Cache on both promise fields so the shutdown drain sees Mode A and B. + const configuredRegistryPromise = buildConfiguredRegistry(config); + this.#runtimeServeConfiguredPromise = configuredRegistryPromise; + this.#runtimeServerlessPromise = configuredRegistryPromise; + this.#installSignalHandlers(config); + + this.#printWelcome(config, "serverless", { + port, + host: opts.host, + publicDir, + }); + + // Background fire; the retry loop tolerates engine warm-up. + this.#ensureServerlessPoolConfigured(config); + + const { runtime, registry, serveConfig } = + await configuredRegistryPromise; + await runtime.serveListener( + registry, + { port, host: opts.host, publicDir }, + serveConfig, + ); } /** @@ -666,6 +693,10 @@ export class Registry { /** * Starts the actor envoy for standalone server deployments. * + * Set `RIVETKIT_RUNTIME_MODE=serverless` to instead bind an HTTP listener + * via `listen()` (Mode B). Mode A (envoy) and Mode B (listener) are + * mutually exclusive per registry instance. + * * @example * ```ts * const registry = setup({ use: { counter } }); @@ -673,6 +704,21 @@ export class Registry { * ``` */ public start() { + if (getRivetkitRuntimeMode() === "serverless") { + // start() defaults publicDir to "/public" unless overridden by env. + const publicDir = getRivetkitPublicDir() ?? "/public"; + // Detached listener; bind failures are fatal so exit hard. + this.listen({ publicDir }).catch((error) => { + logger().error({ error }, "auto-listen failed; exiting"); + if ( + typeof process !== "undefined" && + typeof process.exit === "function" + ) { + process.exit(1); + } + }); + return; + } const config = this.parseConfig(); this.#startEnvoy(config, true); } @@ -680,6 +726,7 @@ export class Registry { #printWelcome( config: RegistryConfig, kind: "serverless" | "serverful", + listener?: { port: number; host?: string; publicDir?: string }, ): void { if (config.noWelcome || this.#welcomePrinted) return; this.#welcomePrinted = true; @@ -711,6 +758,15 @@ export class Registry { } logLine("Actors", Object.keys(config.use).length.toString()); + + if (listener) { + const host = listener.host ?? "0.0.0.0"; + logLine("Listening", `http://${host}:${listener.port}`); + if (listener.publicDir) { + logLine("Public Dir", listener.publicDir); + } + } + console.log(); } } diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts index 2165a420a2..50bb1dec0b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts @@ -18,6 +18,7 @@ import type { RuntimeHttpRequest, RuntimeKvEntry, RuntimeKvListOptions, + RuntimeListenerConfig, RuntimeQueueEnqueueAndWaitOptions, RuntimeQueueMessage, RuntimeQueueNextBatchOptions, @@ -259,6 +260,21 @@ export class NapiCoreRuntime implements CoreRuntime { ); } + async serveListener( + registry: RegistryHandle, + listener: RuntimeListenerConfig, + config: RuntimeServeConfig, + ): Promise { + await asNativeRegistry(registry).serveListener( + { + port: listener.port, + host: listener.host, + publicDir: listener.publicDir, + }, + config, + ); + } + createActorFactory( callbacks: object, config?: RuntimeActorConfig | undefined | null, diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts index 35f8e4748d..0727dfc05e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts @@ -263,6 +263,12 @@ export interface RuntimeServeConfig { serverlessMaxStartPayloadBytes: number; } +export interface RuntimeListenerConfig { + port: number; + host?: string; + publicDir?: string; +} + export interface RuntimeServerlessRequest { method: string; url: string; @@ -338,6 +344,11 @@ export interface CoreRuntime { cancelToken: CancellationTokenHandle, config: RuntimeServeConfig, ): Promise; + serveListener( + registry: RegistryHandle, + listener: RuntimeListenerConfig, + config: RuntimeServeConfig, + ): Promise; registryHealth?( registry: RegistryHandle, ): Promise; diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts index a5c34b3eb8..2c829495db 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts @@ -26,6 +26,7 @@ import type { RuntimeInspectorSnapshot, RuntimeKvEntry, RuntimeKvListOptions, + RuntimeListenerConfig, RuntimeQueueEnqueueAndWaitOptions, RuntimeQueueInspectMessage, RuntimeQueueMessage, @@ -302,6 +303,16 @@ export class WasmCoreRuntime implements CoreRuntime { ); } + async serveListener( + _registry: RegistryHandle, + _listener: RuntimeListenerConfig, + _config: RuntimeServeConfig, + ): Promise { + throw new Error( + "registry.listen() is not supported on the wasm runtime; use registry.serve() and mount the handler in your platform's HTTP server instead", + ); + } + createActorFactory( callbacks: object, config?: RuntimeActorConfig | undefined | null, diff --git a/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts b/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts index 9f3a269b66..8a41419c36 100644 --- a/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts +++ b/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts @@ -50,6 +50,33 @@ export const getRivetkitStoragePath = (): string | undefined => getEnvUniversal("RIVETKIT_STORAGE_PATH"); export const getRivetkitRuntime = (): string | undefined => getEnvUniversal("RIVETKIT_RUNTIME"); +export type RuntimeMode = "envoy" | "serverless"; + +export const getRivetkitRuntimeMode = (): RuntimeMode => + getEnvUniversal("RIVETKIT_RUNTIME_MODE") === "serverless" + ? "serverless" + : "envoy"; + +export const getRivetkitPublicDir = (): string | undefined => { + const value = getEnvUniversal("RIVETKIT_PUBLIC_DIR"); + return value === undefined || value === "" ? undefined : value; +}; + +export function parsePortEnv(raw: string | undefined): number | undefined { + if (raw === undefined || raw === "") return undefined; + const parsed = Number.parseInt(raw, 10); + if ( + !Number.isFinite(parsed) || + parsed < 1 || + parsed > 65535 || + String(parsed) !== raw.trim() + ) { + throw new Error( + `RIVET_PORT env var must be an integer between 1 and 65535; got "${raw}"`, + ); + } + return parsed; +} // Logging configuration // DEPRECATED: LOG_LEVEL will be removed in a future version diff --git a/rivetkit-typescript/packages/rivetkit/src/utils/serve.ts b/rivetkit-typescript/packages/rivetkit/src/utils/serve.ts deleted file mode 100644 index a735716966..0000000000 --- a/rivetkit-typescript/packages/rivetkit/src/utils/serve.ts +++ /dev/null @@ -1,216 +0,0 @@ -// TODO: Go back to dynamic import for this -import getPort from "get-port"; -import type { Hono } from "hono"; -import type { RegistryConfig } from "@/registry/config"; -import { logger } from "@/registry/log"; -import { detectRuntime, type Runtime, stringifyError } from "../utils"; - -const DEFAULT_PORT = 6421; -export type ServeStatic = - typeof import("@hono/node-server/serve-static").serveStatic; -const serveStaticLoaderPromises: Partial< - Record> -> = {}; - -/** - * Finds a free port starting from the given port. - * - * Tries ports incrementally until a free one is found. - */ -export async function findFreePort( - startPort: number = DEFAULT_PORT, -): Promise { - // TODO: Fix this - // const getPortModule = "get-port"; - // const { default: getPort } = await import(/* webpackIgnore: true */ getPortModule); - - // Create an iterable of ports starting from startPort - function* portRange(start: number, count: number = 100): Iterable { - for (let i = 0; i < count; i++) { - yield start + i; - } - } - - return getPort({ port: portRange(startPort) }); -} - -export async function crossPlatformServe( - config: RegistryConfig, - httpPort: number, - app: Hono, - runtime: Runtime = detectRuntime(), -): Promise<{ upgradeWebSocket: any; closeServer?: () => void }> { - logger().debug({ msg: "detected runtime for serve", runtime }); - - switch (runtime) { - case "deno": - return serveDeno(config, httpPort, app); - case "bun": - return serveBun(config, httpPort, app); - case "node": - return serveNode(config, httpPort, app); - default: - return serveNode(config, httpPort, app); - } -} - -export async function loadRuntimeServeStatic( - runtime: Runtime, -): Promise { - if (!serveStaticLoaderPromises[runtime]) { - if (runtime === "node") { - const nodeServeStaticModule = "@hono/node-server/serve-static"; - serveStaticLoaderPromises[runtime] = import( - /* webpackIgnore: true */ - nodeServeStaticModule - ).then((x) => x.serveStatic); - } else if (runtime === "bun") { - const bunModule = "hono/bun"; - serveStaticLoaderPromises[runtime] = import( - /* webpackIgnore: true */ - bunModule - ).then((x) => x.serveStatic as ServeStatic); - } else if (runtime === "deno") { - const denoModule = "hono/deno"; - serveStaticLoaderPromises[runtime] = import( - /* webpackIgnore: true */ - denoModule - ).then((x) => x.serveStatic as ServeStatic); - } else { - throw new Error(`unsupported runtime: ${runtime}`); - } - } - - return await serveStaticLoaderPromises[runtime]!; -} - -async function serveNode( - config: RegistryConfig, - httpPort: number, - app: Hono, -): Promise<{ upgradeWebSocket: any; closeServer: () => void }> { - // Import @hono/node-server using string variable to prevent static analysis - const nodeServerModule = "@hono/node-server"; - let serve: any; - try { - const dep = await import( - /* webpackIgnore: true */ - nodeServerModule - ); - serve = dep.serve; - } catch (err) { - logger().error({ - msg: "failed to import @hono/node-server. please run 'npm install @hono/node-server @hono/node-ws'", - error: stringifyError(err), - }); - process.exit(1); - } - - // Import @hono/node-ws using string variable to prevent static analysis - const nodeWsModule = "@hono/node-ws"; - let createNodeWebSocket: any; - try { - const dep = await import( - /* webpackIgnore: true */ - nodeWsModule - ); - createNodeWebSocket = dep.createNodeWebSocket; - } catch (err) { - logger().error({ - msg: "failed to import @hono/node-ws. please run 'npm install @hono/node-server @hono/node-ws'", - error: stringifyError(err), - }); - process.exit(1); - } - - // Inject WS - const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ - app: app, - }); - - // Start server - const port = httpPort; - const hostname = config.httpHost; - const server = serve({ fetch: app.fetch, port, hostname }, () => - logger().info({ msg: "server listening", port, hostname }), - ); - injectWebSocket(server); - - const closeServer = () => { - server.close(); - }; - - return { upgradeWebSocket, closeServer }; -} - -async function serveDeno( - config: RegistryConfig, - httpPort: number, - app: Hono, -): Promise<{ upgradeWebSocket: any }> { - // Import hono/deno using string variable to prevent static analysis - const honoDenoModule = "hono/deno"; - let upgradeWebSocket: any; - try { - const dep = await import( - /* webpackIgnore: true */ - honoDenoModule - ); - upgradeWebSocket = dep.upgradeWebSocket; - } catch (err) { - logger().error({ - msg: "failed to import hono/deno", - error: stringifyError(err), - }); - process.exit(1); - } - - const port = httpPort; - const hostname = config.httpHost; - - // Use Deno.serve - Deno.serve({ port, hostname }, app.fetch); - logger().info({ msg: "server listening", port, hostname }); - - return { upgradeWebSocket }; -} - -async function serveBun( - config: RegistryConfig, - httpPort: number, - app: Hono, -): Promise<{ upgradeWebSocket: any }> { - // Import hono/bun using string variable to prevent static analysis - const honoBunModule = "hono/bun"; - let createBunWebSocket: any; - try { - const dep = await import( - /* webpackIgnore: true */ - honoBunModule - ); - createBunWebSocket = dep.createBunWebSocket; - } catch (err) { - logger().error({ - msg: "failed to import hono/bun", - error: stringifyError(err), - }); - process.exit(1); - } - - const { websocket, upgradeWebSocket } = createBunWebSocket(); - - const port = httpPort; - const hostname = config.httpHost; - - // Use Bun.serve - // @ts-expect-error - Bun global - Bun.serve({ - fetch: app.fetch, - port, - hostname, - websocket, - }); - logger().info({ msg: "server listening", port, hostname }); - - return { upgradeWebSocket }; -} diff --git a/rivetkit-typescript/packages/rivetkit/tests/listener.test.ts b/rivetkit-typescript/packages/rivetkit/tests/listener.test.ts new file mode 100644 index 0000000000..521cd18b4b --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/tests/listener.test.ts @@ -0,0 +1,183 @@ +import getPort from "get-port"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { actor, setup } from "@/mod"; +import type { Registry } from "@/registry"; +import { getRivetkitRuntimeMode, parsePortEnv } from "@/utils/env-vars"; + +describe("getRivetkitRuntimeMode", () => { + let snapshot: string | undefined; + + beforeEach(() => { + snapshot = process.env.RIVETKIT_RUNTIME_MODE; + delete process.env.RIVETKIT_RUNTIME_MODE; + }); + afterEach(() => { + if (snapshot === undefined) delete process.env.RIVETKIT_RUNTIME_MODE; + else process.env.RIVETKIT_RUNTIME_MODE = snapshot; + }); + + test("default (unset) is envoy", () => { + expect(getRivetkitRuntimeMode()).toBe("envoy"); + }); + + test("explicit serverless", () => { + process.env.RIVETKIT_RUNTIME_MODE = "serverless"; + expect(getRivetkitRuntimeMode()).toBe("serverless"); + }); + + test("explicit envoy", () => { + process.env.RIVETKIT_RUNTIME_MODE = "envoy"; + expect(getRivetkitRuntimeMode()).toBe("envoy"); + }); + + test("empty string is envoy", () => { + process.env.RIVETKIT_RUNTIME_MODE = ""; + expect(getRivetkitRuntimeMode()).toBe("envoy"); + }); + + test("unrecognized value is envoy", () => { + process.env.RIVETKIT_RUNTIME_MODE = "potato"; + expect(getRivetkitRuntimeMode()).toBe("envoy"); + }); +}); + +describe("parsePortEnv", () => { + test("undefined input returns undefined", () => { + expect(parsePortEnv(undefined)).toBeUndefined(); + }); + + test("empty string returns undefined", () => { + expect(parsePortEnv("")).toBeUndefined(); + }); + + test("valid integer string parses", () => { + expect(parsePortEnv("8080")).toBe(8080); + }); + + test("port 1 is accepted (lower bound)", () => { + expect(parsePortEnv("1")).toBe(1); + }); + + test("port 65535 is accepted (upper bound)", () => { + expect(parsePortEnv("65535")).toBe(65535); + }); + + test("port 0 is rejected", () => { + expect(() => parsePortEnv("0")).toThrow(/RIVET_PORT env var must be/); + }); + + test("port 65536 is rejected", () => { + expect(() => parsePortEnv("65536")).toThrow( + /RIVET_PORT env var must be/, + ); + }); + + test("non-numeric input is rejected", () => { + expect(() => parsePortEnv("notaport")).toThrow( + /RIVET_PORT env var must be/, + ); + }); + + test("partial numeric input is rejected (parseInt would silently succeed)", () => { + expect(() => parsePortEnv("8080abc")).toThrow( + /RIVET_PORT env var must be/, + ); + }); + + test("negative input is rejected", () => { + expect(() => parsePortEnv("-1")).toThrow(/RIVET_PORT env var must be/); + }); +}); + +const testActor = actor({ + state: {}, + actions: {}, +}); + +describe("registry.listen() end-to-end", () => { + let registry: Registry | undefined; + let listenPromise: Promise | undefined; + + afterEach(async () => { + if (registry) { + await registry.shutdown(); + registry = undefined; + } + if (listenPromise) { + await listenPromise.catch(() => undefined); + listenPromise = undefined; + } + }, 30_000); + + test("binds the requested port and serves /api/rivet/metadata", async () => { + const port = await getPort({ host: "127.0.0.1" }); + registry = setup({ + use: { test: testActor }, + startEngine: false, + endpoint: "http://127.0.0.1:65535", + token: "dev", + namespace: "default", + noWelcome: true, + shutdown: { disableSignalHandlers: true }, + }) as Registry; + + listenPromise = registry.listen({ port, host: "127.0.0.1" }); + + const baseUrl = `http://127.0.0.1:${port}`; + const response = await waitForResponse( + `${baseUrl}/api/rivet/metadata`, + 15_000, + ); + expect(response.status).toBe(200); + const body = await response.json(); + expect(body.runtime).toBe("rivetkit"); + expect(body.actorNames).toBeDefined(); + expect(body.actorNames).toHaveProperty("test"); + }, 30_000); + + test("/api/rivet/health returns ok", async () => { + const port = await getPort({ host: "127.0.0.1" }); + registry = setup({ + use: { test: testActor }, + startEngine: false, + endpoint: "http://127.0.0.1:65535", + token: "dev", + namespace: "default", + noWelcome: true, + shutdown: { disableSignalHandlers: true }, + }) as Registry; + + listenPromise = registry.listen({ port, host: "127.0.0.1" }); + + const response = await waitForResponse( + `http://127.0.0.1:${port}/api/rivet/health`, + 15_000, + ); + expect(response.status).toBe(200); + const body = await response.json(); + expect(body.runtime).toBe("rivetkit"); + expect(body.status).toBeDefined(); + }, 30_000); +}); + +/** + * Poll the URL until it responds (the listener takes a moment to bind and + * build the serverless runtime on first request). + */ +async function waitForResponse( + url: string, + timeoutMs: number, +): Promise { + const deadline = Date.now() + timeoutMs; + let lastError: unknown; + while (Date.now() < deadline) { + try { + const response = await fetch(url); + return response; + } catch (error) { + lastError = error; + await new Promise((resolve) => setTimeout(resolve, 100)); + } + } + throw new Error(`timed out waiting for ${url}: ${String(lastError)}`); +} diff --git a/website/src/content/docs/general/environment-variables.mdx b/website/src/content/docs/general/environment-variables.mdx index 737f587e19..b0fe3a72b6 100644 --- a/website/src/content/docs/general/environment-variables.mdx +++ b/website/src/content/docs/general/environment-variables.mdx @@ -64,6 +64,14 @@ These variables configure how clients connect to your actors. | `RIVETKIT_RUNTIME` | Runtime binding to use for RivetKit core: `auto`, `native`, or `wasm`. Defaults to `auto`. | | `RIVETKIT_STORAGE_PATH` | Overrides the default file-system storage path used by RivetKit when using the default driver. | +## Lifecycle + +| Environment Variable | Description | +|---------------------|-------------| +| `RIVETKIT_RUNTIME_MODE` | Controls how `registry.start()` runs. Defaults to `envoy`: opens a long-lived WebSocket to the engine (Mode A). Set to `serverless` to bind an HTTP listener via `registry.listen()` (Mode B). | +| `RIVETKIT_PUBLIC_DIR` | Directory of static assets to serve alongside the framework routes when calling `registry.listen()`. Used as a fallback when `opts.publicDir` is not passed. On auto-listen via `registry.start()`, defaults to `/public` when this env var is unset. | +| `RIVET_PORT` | Port the listener binds when calling `registry.listen()` without an explicit `opts.port`. Must be an integer between 1 and 65535. Defaults to `3000`. | + ## Logging | Environment Variable | Description |