From 40d8a3612c179a93d5367a710434a4514ed62cb7 Mon Sep 17 00:00:00 2001 From: ABCxFF <79597906+abcxff@users.noreply.github.com> Date: Wed, 3 Jun 2026 17:03:44 +0000 Subject: [PATCH 1/2] feat(rivetkit): move HTTP listener to Rust, auto-listen on prod start() --- Cargo.lock | 20 ++ pnpm-lock.yaml | 18 +- .../packages/rivetkit-core/Cargo.toml | 10 + .../packages/rivetkit-core/src/lib.rs | 2 + .../src/registry/envoy_callbacks.rs | 1 + .../rivetkit-core/src/registry/mod.rs | 9 +- .../src/registry/runner_config.rs | 14 +- .../packages/rivetkit-core/src/serverless.rs | 27 ++- .../rivetkit-core/src/serverless_http.rs | 209 +++++++++++++++++ .../rivetkit-core/tests/serverless.rs | 1 + .../packages/rivetkit-napi/index.d.ts | 12 + .../packages/rivetkit-napi/src/registry.rs | 36 +++ .../packages/rivetkit/package.json | 6 +- .../packages/rivetkit/src/registry/index.ts | 97 ++++++-- .../rivetkit/src/registry/napi-runtime.ts | 16 ++ .../packages/rivetkit/src/registry/runtime.ts | 16 ++ .../rivetkit/src/registry/wasm-runtime.ts | 11 + .../packages/rivetkit/src/utils/env-vars.ts | 31 +++ .../packages/rivetkit/src/utils/serve.ts | 216 ------------------ .../packages/rivetkit/tests/listener.test.ts | 210 +++++++++++++++++ .../docs/general/environment-variables.mdx | 8 + 21 files changed, 713 insertions(+), 257 deletions(-) create mode 100644 rivetkit-rust/packages/rivetkit-core/src/serverless_http.rs delete mode 100644 rivetkit-typescript/packages/rivetkit/src/utils/serve.ts create mode 100644 rivetkit-typescript/packages/rivetkit/tests/listener.test.ts diff --git a/Cargo.lock b/Cargo.lock index 84e63204dd..6d7e68136d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2478,6 +2478,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9171a2ea8a68358193d15dd5d70c1c10a2afc3e7e4c5bc92bc9f025cebd7359c" + [[package]] name = "httparse" version = "1.10.1" @@ -6066,13 +6072,16 @@ version = "2.3.0-rc.12" dependencies = [ "anyhow", "async-trait", + "axum 0.8.4", "base64 0.22.1", + "bytes", "ciborium", "fs_extra", "futures", "getrandom 0.2.16", "http 1.3.1", "include_dir", + "http-body-util", "js-sys", "nix 0.30.1", "parking_lot", @@ -6097,7 +6106,9 @@ dependencies = [ "subtle", "tempfile", "tokio", + "tokio-stream", "tokio-util", + "tower-http", "tracing", "tracing-subscriber", "url", @@ -7837,10 +7848,19 @@ checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" dependencies = [ "bitflags 2.10.0", "bytes", + "futures-core", "futures-util", "http 1.3.1", "http-body 1.0.1", + "http-body-util", + "http-range-header", + "httpdate", + "mime", + "mime_guess", + "percent-encoding", "pin-project-lite", + "tokio", + "tokio-util", "tower 0.5.2", "tower-layer", "tower-service", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3446c6ad79..3beb6588ad 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -3039,12 +3039,6 @@ importers: rivetkit-typescript/packages/rivetkit: dependencies: - '@hono/node-server': - specifier: ^1.18.2 - version: 1.19.9(hono@4.11.9) - '@hono/node-ws': - specifier: ^1.1.1 - version: 1.3.0(@hono/node-server@1.19.9(hono@4.11.9))(hono@4.11.9) '@hono/zod-openapi': specifier: ^1.1.5 version: 1.1.5(hono@4.11.9)(zod@4.1.13) @@ -3087,9 +3081,6 @@ importers: drizzle-orm: specifier: ^0.44.2 version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.11)(kysely@0.28.15)(pg@8.17.2)(sql.js@1.13.0) - get-port: - specifier: ^7.1.0 - version: 7.1.0 hono: specifier: ^4.7.0 version: 4.11.9 @@ -3118,6 +3109,12 @@ importers: '@copilotkit/llmock': specifier: ^1.6.0 version: 1.7.1 + '@hono/node-server': + specifier: ^1.18.2 + version: 1.19.9(hono@4.11.9) + '@hono/node-ws': + specifier: ^1.1.1 + version: 1.3.0(@hono/node-server@1.19.9(hono@4.11.9))(hono@4.11.9) '@rivet-dev/agent-os-common': specifier: '*' version: 0.0.260331072558 @@ -3136,6 +3133,9 @@ importers: eventsource: specifier: ^4.0.0 version: 4.0.0 + get-port: + specifier: ^7.1.0 + version: 7.1.0 tsup: specifier: ^8.4.0 version: 8.5.1(@microsoft/api-extractor@7.53.2(@types/node@22.19.10))(@swc/core@1.15.11(@swc/helpers@0.5.17))(jiti@2.6.1)(postcss@8.5.6)(tsx@4.21.0)(typescript@5.9.3)(yaml@2.9.0) diff --git a/rivetkit-rust/packages/rivetkit-core/Cargo.toml b/rivetkit-rust/packages/rivetkit-core/Cargo.toml index b11d713ea3..b67b5c055a 100644 --- a/rivetkit-rust/packages/rivetkit-core/Cargo.toml +++ b/rivetkit-rust/packages/rivetkit-core/Cargo.toml @@ -15,6 +15,11 @@ default = ["native-runtime"] native-runtime = [ "dep:nix", "dep:reqwest", + "dep:axum", + "dep:bytes", + "dep:http-body-util", + "dep:tokio-stream", + "dep:tower-http", "rivet-envoy-client/native-transport", ] wasm-runtime = ["rivet-envoy-client/wasm-transport"] @@ -29,11 +34,14 @@ fs_extra = { workspace = true } [dependencies] anyhow.workspace = true async-trait = { workspace = true, optional = true } +axum = { workspace = true, optional = true } base64.workspace = true +bytes = { workspace = true, optional = true } ciborium.workspace = true futures.workspace = true http.workspace = true include_dir = { workspace = true } +http-body-util = { workspace = true, optional = true } nix = { workspace = true, optional = true } parking_lot.workspace = true rand.workspace = true @@ -54,7 +62,9 @@ serde_bare.workspace = true serde_bytes.workspace = true sha2.workspace = true subtle.workspace = true +tokio-stream = { workspace = true, optional = true } tokio-util.workspace = true +tower-http = { workspace = true, optional = true, features = ["fs"] } tracing.workspace = true url.workspace = true vbare.workspace = true diff --git a/rivetkit-rust/packages/rivetkit-core/src/lib.rs b/rivetkit-rust/packages/rivetkit-core/src/lib.rs index 6e8b6a8d24..9ec56b9714 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/lib.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/lib.rs @@ -12,6 +12,8 @@ pub mod metrics_endpoint; pub mod registry; pub mod runtime; pub mod serverless; +#[cfg(feature = "native-runtime")] +pub mod serverless_http; pub(crate) mod time { use std::fmt; use std::future::Future; diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/envoy_callbacks.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/envoy_callbacks.rs index 77bababf2d..0526eb431f 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/envoy_callbacks.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/envoy_callbacks.rs @@ -227,6 +227,7 @@ impl ServeConfig { serverless_validate_endpoint: settings.serverless_validate_endpoint, serverless_max_start_payload_bytes: settings.serverless_max_start_payload_bytes, serverless_cache_envoy: true, + force_normal_runner_config_upsert: false, } } } diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs index d7b76a0c21..8dddbd8507 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs @@ -102,6 +102,9 @@ impl CoreEnvoyHandle { } } + /// Engine-reported drain threshold in milliseconds. `None` until the + /// envoy has completed its first protocol-metadata exchange with the + /// engine. pub async fn actor_stop_threshold_ms(&self) -> Option { self.handle .get_protocol_metadata() @@ -239,6 +242,10 @@ pub struct ServeConfig { pub serverless_validate_endpoint: bool, pub serverless_max_start_payload_bytes: usize, pub serverless_cache_envoy: bool, + /// When true, upsert a normal runner config to the engine on startup + /// even if the endpoint is not a local engine. Set by the TS Registry + /// when `RIVETKIT_RUNTIME_MODE` resolves to `envoy`. + pub force_normal_runner_config_upsert: bool, } #[derive(Debug, Default, Deserialize)] @@ -594,7 +601,7 @@ impl CoreRegistry { } #[cfg(feature = "native-runtime")] - runner_config::ensure_local_normal_runner_config(&config).await?; + runner_config::ensure_normal_runner_config(&config).await?; let callbacks = Arc::new(RegistryCallbacks { dispatcher: dispatcher.clone(), }); diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/runner_config.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/runner_config.rs index 88b1480b55..340a5545db 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/runner_config.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/runner_config.rs @@ -17,8 +17,8 @@ struct Datacenter { name: String, } -pub(super) async fn ensure_local_normal_runner_config(config: &ServeConfig) -> Result<()> { - if !is_local_engine_endpoint(&config.endpoint) { +pub(super) async fn ensure_normal_runner_config(config: &ServeConfig) -> Result<()> { + if !is_local_engine_endpoint(&config.endpoint) && !config.force_normal_runner_config_upsert { return Ok(()); } @@ -51,7 +51,7 @@ pub(super) async fn ensure_local_normal_runner_config(config: &ServeConfig) -> R .json(&body) .send() .await - .context("upsert local runner config")?; + .context("upsert normal runner config")?; let status = response.status(); if !status.is_success() { let response_body = response @@ -59,7 +59,7 @@ pub(super) async fn ensure_local_normal_runner_config(config: &ServeConfig) -> R .await .context("read failed runner config response body")?; anyhow::bail!( - "failed to upsert local runner config `{}`: {} {}", + "failed to upsert normal runner config `{}`: {} {}", config.pool_name, status, response_body @@ -69,7 +69,7 @@ pub(super) async fn ensure_local_normal_runner_config(config: &ServeConfig) -> R tracing::debug!( namespace = %config.namespace, pool_name = %config.pool_name, - "ensured local normal runner config" + "ensured normal runner config" ); Ok(()) @@ -80,7 +80,7 @@ async fn get_datacenters(client: &Client, config: &ServeConfig) -> Result Result usize { + self.settings.max_start_payload_bytes + } + + /// Canonical 413 response built through the `RivetError` system. + pub fn incoming_too_long_response(&self) -> ServerlessResponse { + let error = IncomingMessageTooLong { + limit: self.settings.max_start_payload_bytes, + } + .build(); + error_response(error) + } + + /// Canonical 400 response for malformed requests. + pub fn invalid_request_response(&self, reason: impl Into) -> ServerlessResponse { + let error = InvalidRequest { + reason: reason.into(), + } + .build(); + error_response(error) + } + pub async fn handle_request(&self, req: ServerlessRequest) -> ServerlessResponse { let cors = cors_headers(&req); match self.handle_request_inner(req).await { @@ -334,7 +356,6 @@ impl CoreServerlessRuntime { ); if req.body.len() > self.settings.max_start_payload_bytes { return Err(IncomingMessageTooLong { - size: req.body.len(), limit: self.settings.max_start_payload_bytes, } .build()); diff --git a/rivetkit-rust/packages/rivetkit-core/src/serverless_http.rs b/rivetkit-rust/packages/rivetkit-core/src/serverless_http.rs new file mode 100644 index 0000000000..8ec129a64c --- /dev/null +++ b/rivetkit-rust/packages/rivetkit-core/src/serverless_http.rs @@ -0,0 +1,209 @@ +use std::collections::HashMap; +use std::path::PathBuf; +use std::pin::Pin; +use std::task::{Context as TaskContext, Poll}; + +use anyhow::{Context, Result}; +use axum::Router; +use axum::body::{Body, Bytes}; +use axum::extract::{Request, State}; +use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; +use axum::response::IntoResponse; +use axum::routing::any; +use futures::Stream; +use futures::StreamExt; +use http_body_util::LengthLimitError; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_util::sync::CancellationToken; +use tower_http::services::ServeDir; + +use crate::serverless::{CoreServerlessRuntime, ServerlessRequest, ServerlessResponse}; + +#[derive(Debug, Clone)] +pub struct ListenerConfig { + /// Host to bind; accepts numeric IPs or DNS names. Defaults to `0.0.0.0`. + pub host: Option, + pub port: u16, + pub public_dir: Option, +} + +#[derive(Clone)] +struct AppState { + runtime: CoreServerlessRuntime, + shutdown_token: CancellationToken, +} + +/// Bind a TCP listener and serve `runtime` over HTTP until `shutdown` fires. +pub async fn serve( + runtime: CoreServerlessRuntime, + listener: ListenerConfig, + shutdown: CancellationToken, +) -> Result<()> { + let host = listener.host.as_deref().unwrap_or("0.0.0.0"); + let port = listener.port; + + let state = AppState { + runtime, + shutdown_token: shutdown.clone(), + }; + + let forward_service = any(forward_request).with_state(state); + + let router = match listener.public_dir.as_ref() { + Some(dir) => Router::new().fallback_service( + ServeDir::new(dir) + .call_fallback_on_method_not_allowed(true) + .fallback(forward_service), + ), + None => Router::new().fallback_service(forward_service), + }; + + let tcp = tokio::net::TcpListener::bind((host, port)) + .await + .with_context(|| format!("bind tcp listener on {host}:{port}"))?; + let bound = tcp + .local_addr() + .context("read local address of bound listener")?; + tracing::info!(host = %bound.ip(), port = bound.port(), "rivetkit server listening"); + + let shutdown_fut = { + let shutdown = shutdown.clone(); + async move { shutdown.cancelled().await } + }; + + axum::serve(tcp, router.into_make_service()) + .with_graceful_shutdown(shutdown_fut) + .await + .context("axum::serve returned an error")?; + + Ok(()) +} + +async fn forward_request( + State(state): State, + request: Request, +) -> axum::response::Response { + let (parts, body) = request.into_parts(); + let body_limit = state.runtime.max_request_body_bytes(); + let request_token = state.shutdown_token.child_token(); + let body_bytes = match axum::body::to_bytes(body, body_limit).await { + Ok(bytes) => bytes, + Err(error) if is_length_limit_error(&error) => { + tracing::warn!(body_limit, "request body exceeded limit"); + return into_axum_response(state.runtime.incoming_too_long_response(), request_token); + } + Err(error) => { + tracing::warn!(?error, "failed to read request body"); + return into_axum_response( + state + .runtime + .invalid_request_response("failed to read request body"), + request_token, + ); + } + }; + + let path_and_query = parts + .uri + .path_and_query() + .map(|pq| pq.as_str()) + .unwrap_or("/"); + let url = format!("http://internal{path_and_query}"); + + // Repeated header names get comma-joined per RFC 9110 §5.3. + let mut headers: HashMap = HashMap::new(); + for (name, value) in parts.headers.iter() { + let Ok(value_str) = value.to_str() else { + continue; + }; + let key = name.as_str().to_ascii_lowercase(); + headers + .entry(key) + .and_modify(|existing| { + existing.push_str(", "); + existing.push_str(value_str); + }) + .or_insert_with(|| value_str.to_owned()); + } + + let req = ServerlessRequest { + method: parts.method.as_str().to_owned(), + url, + headers, + body: body_bytes.to_vec(), + cancel_token: request_token.clone(), + }; + + into_axum_response(state.runtime.handle_request(req).await, request_token) +} + +fn into_axum_response( + response: ServerlessResponse, + request_token: CancellationToken, +) -> axum::response::Response { + let status = StatusCode::from_u16(response.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + let mut header_map = HeaderMap::with_capacity(response.headers.len()); + for (name, value) in response.headers { + if let (Ok(name), Ok(value)) = ( + HeaderName::try_from(name.as_str()), + HeaderValue::from_str(&value), + ) { + header_map.append(name, value); + } + } + + let stream = UnboundedReceiverStream::new(response.body).map(|chunk| match chunk { + Ok(bytes) => Ok::(Bytes::from(bytes)), + Err(error) => { + tracing::warn!(?error, "serverless stream error"); + Err(std::io::Error::other(format!( + "{}.{}: {}", + error.group, error.code, error.message + ))) + } + }); + + // Cancel the runtime task when the response body is dropped. + let guarded = CancelOnDropStream { + inner: stream, + _guard: CancelOnDrop { + token: request_token, + }, + }; + + (status, header_map, Body::from_stream(guarded)).into_response() +} + +fn is_length_limit_error(error: &axum::Error) -> bool { + let mut source: Option<&dyn std::error::Error> = Some(error); + while let Some(err) = source { + if err.is::() { + return true; + } + source = err.source(); + } + false +} + +struct CancelOnDrop { + token: CancellationToken, +} + +impl Drop for CancelOnDrop { + fn drop(&mut self) { + self.token.cancel(); + } +} + +struct CancelOnDropStream { + inner: S, + _guard: CancelOnDrop, +} + +impl Stream for CancelOnDropStream { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) + } +} diff --git a/rivetkit-rust/packages/rivetkit-core/tests/serverless.rs b/rivetkit-rust/packages/rivetkit-core/tests/serverless.rs index 20975d225d..41bad6aa19 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/serverless.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/serverless.rs @@ -193,6 +193,7 @@ mod moved_tests { serverless_validate_endpoint: true, serverless_max_start_payload_bytes: 1_048_576, serverless_cache_envoy: true, + force_normal_runner_config_upsert: false, } } diff --git a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts index abd33164cb..46a0f7c747 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts +++ b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts @@ -186,6 +186,17 @@ export interface JsServeConfig { serverlessClientToken?: string serverlessValidateEndpoint: boolean serverlessMaxStartPayloadBytes: number + forceNormalRunnerConfigUpsert?: boolean +} +export interface JsListenerConfig { + /** Host to bind. Defaults to `0.0.0.0` when not provided. */ + host?: string + port: number + /** + * Optional static file root mounted as a fallback below the framework + * routes. + */ + publicDir?: string } export interface JsServerlessRequest { method: string @@ -344,6 +355,7 @@ export declare class CoreRegistry { health(): Promise metadata(): JsRegistryRouteResponse metrics(): JsRegistryRouteResponse + serveListener(listener: JsListenerConfig, config: JsServeConfig): Promise handleServerlessRequest(req: JsServerlessRequest, onStreamEvent: (...args: any[]) => any, cancelToken: CancellationToken, config: JsServeConfig): Promise } export declare class Schedule { diff --git a/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs b/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs index f9ad0a9b9c..8817d6ab7c 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs +++ b/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs @@ -10,6 +10,7 @@ use parking_lot::Mutex as ParkingMutex; use rivetkit_core::{ CoreRegistry as NativeCoreRegistry, CoreServerlessRuntime, EngineSpawnMode, ServeConfig, ServerlessRequest, registry::CoreEnvoyHandle, serverless::ServerlessStreamError, + serverless_http::{self, ListenerConfig}, }; use tokio::sync::{Mutex as TokioMutex, Notify}; use tokio_util::sync::CancellationToken as CoreCancellationToken; @@ -36,6 +37,17 @@ pub struct JsServeConfig { pub serverless_client_token: Option, pub serverless_validate_endpoint: bool, pub serverless_max_start_payload_bytes: u32, + pub force_normal_runner_config_upsert: Option, +} + +#[napi(object)] +pub struct JsListenerConfig { + /// Host to bind. Defaults to `0.0.0.0` when not provided. + pub host: Option, + pub port: u32, + /// Optional static file root mounted as a fallback below the framework + /// routes. + pub public_dir: Option, } #[napi(object)] @@ -350,6 +362,27 @@ impl CoreRegistry { }) } + #[napi] + pub async fn serve_listener( + &self, + listener: JsListenerConfig, + config: JsServeConfig, + ) -> napi::Result<()> { + let port: u16 = listener + .port + .try_into() + .map_err(|_| napi_anyhow_error(anyhow::anyhow!("port out of range")))?; + let listener_config = ListenerConfig { + host: listener.host, + port, + public_dir: listener.public_dir.map(PathBuf::from), + }; + let runtime = self.ensure_serverless_runtime(config).await?; + serverless_http::serve(runtime, listener_config, self.shutdown_token.clone()) + .await + .map_err(napi_anyhow_error) + } + #[napi(ts_return_type = "Promise")] pub fn handle_serverless_request( &self, @@ -629,6 +662,9 @@ fn serve_config_from_js( serverless_validate_endpoint: config.serverless_validate_endpoint, serverless_max_start_payload_bytes: config.serverless_max_start_payload_bytes as usize, serverless_cache_envoy, + force_normal_runner_config_upsert: config + .force_normal_runner_config_upsert + .unwrap_or(false), } } diff --git a/rivetkit-typescript/packages/rivetkit/package.json b/rivetkit-typescript/packages/rivetkit/package.json index 4b864ab335..5a0beee2a6 100644 --- a/rivetkit-typescript/packages/rivetkit/package.json +++ b/rivetkit-typescript/packages/rivetkit/package.json @@ -188,8 +188,6 @@ "actor-config-schema-gen": "tsx scripts/actor-config-schema-gen.ts" }, "dependencies": { - "@hono/node-server": "^1.18.2", - "@hono/node-ws": "^1.1.1", "@hono/zod-openapi": "^1.1.5", "@rivet-dev/agent-os-core": "^0.1.1", "@rivetkit/bare-ts": "^0.6.2", @@ -203,7 +201,6 @@ "@rivetkit/workflow-engine": "workspace:*", "cbor-x": "^1.6.0", "drizzle-orm": "^0.44.2", - "get-port": "^7.1.0", "hono": "^4.7.0", "invariant": "^2.2.4", "p-retry": "^6.2.1", @@ -215,12 +212,15 @@ "devDependencies": { "@biomejs/biome": "^2.3", "@copilotkit/llmock": "^1.6.0", + "@hono/node-server": "^1.18.2", + "@hono/node-ws": "^1.1.1", "@rivet-dev/agent-os-common": "*", "@rivet-dev/agent-os-pi": "^0.1.1", "@standard-schema/spec": "^1.0.0", "@types/invariant": "^2", "@types/node": "^22.13.1", "eventsource": "^4.0.0", + "get-port": "^7.1.0", "tsup": "^8.4.0", "tsx": "^4.19.4", "typescript": "^5.7.3", diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts index ab81ed3c77..c14d3c4d4d 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts @@ -1,8 +1,11 @@ -import { Hono } from "hono"; import { isLocalEngineEndpoint } from "@/common/engine"; import { configureServerlessPool } from "@/serverless/configure"; -import { detectRuntime, VERSION } from "@/utils"; -import { crossPlatformServe, loadRuntimeServeStatic } from "@/utils/serve"; +import { VERSION } from "@/utils"; +import { + getRivetkitPublicDir, + getRivetkitRuntimeMode, + parsePortEnv, +} from "@/utils/env-vars"; import { type RegistryActors, type RegistryConfig, @@ -89,6 +92,12 @@ export class Registry { }; } + /** + * Fires `configureServerlessPool` once per process when the registry + * config opts into it. Cached on the instance so repeated calls (from + * `handler()` and `listen()`) only run the upsert once. The retry loop + * inside `configureServerlessPool` tolerates the engine still warming up. + */ #ensureServerlessPoolConfigured( config: RegistryConfig, ): Promise | undefined { @@ -322,13 +331,15 @@ export class Registry { } /** - * Starts an HTTP server that dispatches every request through the - * serverless handler. Uses `crossPlatformServe` to pick the right - * runtime (Node, Bun, Deno). + * Bind an HTTP listener provided by the native (Rust) runtime and serve + * the registry's serverless endpoints over it. Resolves only after the + * registry is shut down (SIGINT/SIGTERM or `nativeRegistry.shutdown()`). * - * @param opts.port Port to listen on. Defaults to 3000. + * @param opts.port Port to listen on. Defaults to `process.env.PORT` + * if set, otherwise 3000. + * @param opts.host Address to bind. Defaults to `0.0.0.0`. * @param opts.publicDir If set, serves static files from this directory - * before falling through to the registry handler. + * as a fallback below the framework routes. * * @example * ```ts @@ -337,18 +348,34 @@ export class Registry { * ``` */ public async listen( - opts: { port?: number; publicDir?: string } = {}, + opts: { port?: number; host?: string; publicDir?: string } = {}, ): Promise { - const port = opts.port ?? 3000; + const port = opts.port ?? parsePortEnv(process.env.PORT) ?? 3000; + const publicDir = opts.publicDir ?? getRivetkitPublicDir(); const config = this.parseConfig(); - const runtime = detectRuntime(); - const app = new Hono(); - if (opts.publicDir) { - const serveStatic = await loadRuntimeServeStatic(runtime); - app.use("*", serveStatic({ root: opts.publicDir })); - } - app.all("*", (c) => this.handler(c.req.raw)); - await crossPlatformServe(config, port, app, runtime); + + // Cache on both promise fields so the shutdown drain sees Mode A and B. + const configuredRegistryPromise = buildConfiguredRegistry(config); + this.#runtimeServeConfiguredPromise = configuredRegistryPromise; + this.#runtimeServerlessPromise = configuredRegistryPromise; + this.#installSignalHandlers(config); + + this.#printWelcome(config, "serverless", { + port, + host: opts.host, + publicDir, + }); + + // Background fire; the retry loop tolerates engine warm-up. + this.#ensureServerlessPoolConfigured(config); + + const { runtime, registry, serveConfig } = + await configuredRegistryPromise; + await runtime.serveListener( + registry, + { port, host: opts.host, publicDir }, + serveConfig, + ); } /** @@ -666,6 +693,15 @@ export class Registry { /** * Starts the actor envoy for standalone server deployments. * + * Auto-promotes to `listen()` when `NODE_ENV === "production"` so the + * same `start()` call boots an HTTP listener in deployed containers + * while keeping the persistent-envoy WS behavior in local development. + * The `RIVETKIT_AUTO_LISTEN` env var overrides the heuristic: `1` + * forces auto-listen on, `0` forces it off. + * + * Mode A (envoy) and Mode B (listener) are mutually exclusive per + * registry instance. + * * @example * ```ts * const registry = setup({ use: { counter } }); @@ -673,6 +709,21 @@ export class Registry { * ``` */ public start() { + if (getRivetkitRuntimeMode() === "serverless") { + // start() defaults publicDir to "/public" unless overridden by env. + const publicDir = getRivetkitPublicDir() ?? "/public"; + // Detached listener; bind failures are fatal so exit hard. + this.listen({ publicDir }).catch((error) => { + logger().error({ error }, "auto-listen failed; exiting"); + if ( + typeof process !== "undefined" && + typeof process.exit === "function" + ) { + process.exit(1); + } + }); + return; + } const config = this.parseConfig(); this.#startEnvoy(config, true); } @@ -680,6 +731,7 @@ export class Registry { #printWelcome( config: RegistryConfig, kind: "serverless" | "serverful", + listener?: { port: number; host?: string; publicDir?: string }, ): void { if (config.noWelcome || this.#welcomePrinted) return; this.#welcomePrinted = true; @@ -711,6 +763,15 @@ export class Registry { } logLine("Actors", Object.keys(config.use).length.toString()); + + if (listener) { + const host = listener.host ?? "0.0.0.0"; + logLine("Listening", `http://${host}:${listener.port}`); + if (listener.publicDir) { + logLine("Public Dir", listener.publicDir); + } + } + console.log(); } } diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts index 2165a420a2..50bb1dec0b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts @@ -18,6 +18,7 @@ import type { RuntimeHttpRequest, RuntimeKvEntry, RuntimeKvListOptions, + RuntimeListenerConfig, RuntimeQueueEnqueueAndWaitOptions, RuntimeQueueMessage, RuntimeQueueNextBatchOptions, @@ -259,6 +260,21 @@ export class NapiCoreRuntime implements CoreRuntime { ); } + async serveListener( + registry: RegistryHandle, + listener: RuntimeListenerConfig, + config: RuntimeServeConfig, + ): Promise { + await asNativeRegistry(registry).serveListener( + { + port: listener.port, + host: listener.host, + publicDir: listener.publicDir, + }, + config, + ); + } + createActorFactory( callbacks: object, config?: RuntimeActorConfig | undefined | null, diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts index 35f8e4748d..c356e626af 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts @@ -1,4 +1,5 @@ import type { SqliteNativeMetrics } from "@/common/database/config"; +import { getRivetkitRuntimeMode } from "@/utils/env-vars"; import type { RegistryConfig } from "./config"; declare const handleBrand: unique symbol; @@ -261,6 +262,13 @@ export interface RuntimeServeConfig { serverlessClientToken?: string; serverlessValidateEndpoint: boolean; serverlessMaxStartPayloadBytes: number; + forceNormalRunnerConfigUpsert?: boolean; +} + +export interface RuntimeListenerConfig { + port: number; + host?: string; + publicDir?: string; } export interface RuntimeServerlessRequest { @@ -338,6 +346,11 @@ export interface CoreRuntime { cancelToken: CancellationTokenHandle, config: RuntimeServeConfig, ): Promise; + serveListener( + registry: RegistryHandle, + listener: RuntimeListenerConfig, + config: RuntimeServeConfig, + ): Promise; registryHealth?( registry: RegistryHandle, ): Promise; @@ -608,6 +621,9 @@ export async function buildServeConfig( serveConfig.inspectorTestToken = process.env._RIVET_TEST_INSPECTOR_TOKEN ?? "token"; } + if (getRivetkitRuntimeMode() === "envoy") { + serveConfig.forceNormalRunnerConfigUpsert = true; + } return serveConfig; } diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts index a5c34b3eb8..2c829495db 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts @@ -26,6 +26,7 @@ import type { RuntimeInspectorSnapshot, RuntimeKvEntry, RuntimeKvListOptions, + RuntimeListenerConfig, RuntimeQueueEnqueueAndWaitOptions, RuntimeQueueInspectMessage, RuntimeQueueMessage, @@ -302,6 +303,16 @@ export class WasmCoreRuntime implements CoreRuntime { ); } + async serveListener( + _registry: RegistryHandle, + _listener: RuntimeListenerConfig, + _config: RuntimeServeConfig, + ): Promise { + throw new Error( + "registry.listen() is not supported on the wasm runtime; use registry.serve() and mount the handler in your platform's HTTP server instead", + ); + } + createActorFactory( callbacks: object, config?: RuntimeActorConfig | undefined | null, diff --git a/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts b/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts index 9f3a269b66..df014e3eb6 100644 --- a/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts +++ b/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts @@ -50,6 +50,37 @@ export const getRivetkitStoragePath = (): string | undefined => getEnvUniversal("RIVETKIT_STORAGE_PATH"); export const getRivetkitRuntime = (): string | undefined => getEnvUniversal("RIVETKIT_RUNTIME"); +export type RuntimeMode = "envoy" | "serverless"; + +export const getRivetkitRuntimeMode = (): RuntimeMode => { + const explicit = getEnvUniversal("RIVETKIT_RUNTIME_MODE"); + if (explicit === "envoy" || explicit === "serverless") return explicit; + const railwayId = getEnvUniversal("RAILWAY_DEPLOYMENT_ID"); + if (railwayId !== undefined && railwayId !== "") return "envoy"; + if (getNodeEnv() === "production") return "serverless"; + return "envoy"; +}; + +export const getRivetkitPublicDir = (): string | undefined => { + const value = getEnvUniversal("RIVETKIT_PUBLIC_DIR"); + return value === undefined || value === "" ? undefined : value; +}; + +export function parsePortEnv(raw: string | undefined): number | undefined { + if (raw === undefined || raw === "") return undefined; + const parsed = Number.parseInt(raw, 10); + if ( + !Number.isFinite(parsed) || + parsed < 1 || + parsed > 65535 || + String(parsed) !== raw.trim() + ) { + throw new Error( + `PORT env var must be an integer between 1 and 65535; got "${raw}"`, + ); + } + return parsed; +} // Logging configuration // DEPRECATED: LOG_LEVEL will be removed in a future version diff --git a/rivetkit-typescript/packages/rivetkit/src/utils/serve.ts b/rivetkit-typescript/packages/rivetkit/src/utils/serve.ts deleted file mode 100644 index a735716966..0000000000 --- a/rivetkit-typescript/packages/rivetkit/src/utils/serve.ts +++ /dev/null @@ -1,216 +0,0 @@ -// TODO: Go back to dynamic import for this -import getPort from "get-port"; -import type { Hono } from "hono"; -import type { RegistryConfig } from "@/registry/config"; -import { logger } from "@/registry/log"; -import { detectRuntime, type Runtime, stringifyError } from "../utils"; - -const DEFAULT_PORT = 6421; -export type ServeStatic = - typeof import("@hono/node-server/serve-static").serveStatic; -const serveStaticLoaderPromises: Partial< - Record> -> = {}; - -/** - * Finds a free port starting from the given port. - * - * Tries ports incrementally until a free one is found. - */ -export async function findFreePort( - startPort: number = DEFAULT_PORT, -): Promise { - // TODO: Fix this - // const getPortModule = "get-port"; - // const { default: getPort } = await import(/* webpackIgnore: true */ getPortModule); - - // Create an iterable of ports starting from startPort - function* portRange(start: number, count: number = 100): Iterable { - for (let i = 0; i < count; i++) { - yield start + i; - } - } - - return getPort({ port: portRange(startPort) }); -} - -export async function crossPlatformServe( - config: RegistryConfig, - httpPort: number, - app: Hono, - runtime: Runtime = detectRuntime(), -): Promise<{ upgradeWebSocket: any; closeServer?: () => void }> { - logger().debug({ msg: "detected runtime for serve", runtime }); - - switch (runtime) { - case "deno": - return serveDeno(config, httpPort, app); - case "bun": - return serveBun(config, httpPort, app); - case "node": - return serveNode(config, httpPort, app); - default: - return serveNode(config, httpPort, app); - } -} - -export async function loadRuntimeServeStatic( - runtime: Runtime, -): Promise { - if (!serveStaticLoaderPromises[runtime]) { - if (runtime === "node") { - const nodeServeStaticModule = "@hono/node-server/serve-static"; - serveStaticLoaderPromises[runtime] = import( - /* webpackIgnore: true */ - nodeServeStaticModule - ).then((x) => x.serveStatic); - } else if (runtime === "bun") { - const bunModule = "hono/bun"; - serveStaticLoaderPromises[runtime] = import( - /* webpackIgnore: true */ - bunModule - ).then((x) => x.serveStatic as ServeStatic); - } else if (runtime === "deno") { - const denoModule = "hono/deno"; - serveStaticLoaderPromises[runtime] = import( - /* webpackIgnore: true */ - denoModule - ).then((x) => x.serveStatic as ServeStatic); - } else { - throw new Error(`unsupported runtime: ${runtime}`); - } - } - - return await serveStaticLoaderPromises[runtime]!; -} - -async function serveNode( - config: RegistryConfig, - httpPort: number, - app: Hono, -): Promise<{ upgradeWebSocket: any; closeServer: () => void }> { - // Import @hono/node-server using string variable to prevent static analysis - const nodeServerModule = "@hono/node-server"; - let serve: any; - try { - const dep = await import( - /* webpackIgnore: true */ - nodeServerModule - ); - serve = dep.serve; - } catch (err) { - logger().error({ - msg: "failed to import @hono/node-server. please run 'npm install @hono/node-server @hono/node-ws'", - error: stringifyError(err), - }); - process.exit(1); - } - - // Import @hono/node-ws using string variable to prevent static analysis - const nodeWsModule = "@hono/node-ws"; - let createNodeWebSocket: any; - try { - const dep = await import( - /* webpackIgnore: true */ - nodeWsModule - ); - createNodeWebSocket = dep.createNodeWebSocket; - } catch (err) { - logger().error({ - msg: "failed to import @hono/node-ws. please run 'npm install @hono/node-server @hono/node-ws'", - error: stringifyError(err), - }); - process.exit(1); - } - - // Inject WS - const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ - app: app, - }); - - // Start server - const port = httpPort; - const hostname = config.httpHost; - const server = serve({ fetch: app.fetch, port, hostname }, () => - logger().info({ msg: "server listening", port, hostname }), - ); - injectWebSocket(server); - - const closeServer = () => { - server.close(); - }; - - return { upgradeWebSocket, closeServer }; -} - -async function serveDeno( - config: RegistryConfig, - httpPort: number, - app: Hono, -): Promise<{ upgradeWebSocket: any }> { - // Import hono/deno using string variable to prevent static analysis - const honoDenoModule = "hono/deno"; - let upgradeWebSocket: any; - try { - const dep = await import( - /* webpackIgnore: true */ - honoDenoModule - ); - upgradeWebSocket = dep.upgradeWebSocket; - } catch (err) { - logger().error({ - msg: "failed to import hono/deno", - error: stringifyError(err), - }); - process.exit(1); - } - - const port = httpPort; - const hostname = config.httpHost; - - // Use Deno.serve - Deno.serve({ port, hostname }, app.fetch); - logger().info({ msg: "server listening", port, hostname }); - - return { upgradeWebSocket }; -} - -async function serveBun( - config: RegistryConfig, - httpPort: number, - app: Hono, -): Promise<{ upgradeWebSocket: any }> { - // Import hono/bun using string variable to prevent static analysis - const honoBunModule = "hono/bun"; - let createBunWebSocket: any; - try { - const dep = await import( - /* webpackIgnore: true */ - honoBunModule - ); - createBunWebSocket = dep.createBunWebSocket; - } catch (err) { - logger().error({ - msg: "failed to import hono/bun", - error: stringifyError(err), - }); - process.exit(1); - } - - const { websocket, upgradeWebSocket } = createBunWebSocket(); - - const port = httpPort; - const hostname = config.httpHost; - - // Use Bun.serve - // @ts-expect-error - Bun global - Bun.serve({ - fetch: app.fetch, - port, - hostname, - websocket, - }); - logger().info({ msg: "server listening", port, hostname }); - - return { upgradeWebSocket }; -} diff --git a/rivetkit-typescript/packages/rivetkit/tests/listener.test.ts b/rivetkit-typescript/packages/rivetkit/tests/listener.test.ts new file mode 100644 index 0000000000..aeb40155bb --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/tests/listener.test.ts @@ -0,0 +1,210 @@ +import getPort from "get-port"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { actor, setup } from "@/mod"; +import type { Registry } from "@/registry"; +import { getRivetkitRuntimeMode, parsePortEnv } from "@/utils/env-vars"; + +describe("getRivetkitRuntimeMode", () => { + const KEYS = [ + "RIVETKIT_RUNTIME_MODE", + "RAILWAY_DEPLOYMENT_ID", + "NODE_ENV", + ] as const; + let snapshot: Record; + + beforeEach(() => { + snapshot = Object.fromEntries(KEYS.map((k) => [k, process.env[k]])); + for (const k of KEYS) delete process.env[k]; + }); + afterEach(() => { + for (const k of KEYS) { + if (snapshot[k] === undefined) delete process.env[k]; + else process.env[k] = snapshot[k]; + } + }); + + test("dev default (no env) is envoy", () => { + expect(getRivetkitRuntimeMode()).toBe("envoy"); + }); + + test("NODE_ENV=production defaults to serverless", () => { + process.env.NODE_ENV = "production"; + expect(getRivetkitRuntimeMode()).toBe("serverless"); + }); + + test("RAILWAY_DEPLOYMENT_ID set defaults to envoy", () => { + process.env.RAILWAY_DEPLOYMENT_ID = "dep_123"; + expect(getRivetkitRuntimeMode()).toBe("envoy"); + }); + + test("Railway overrides NODE_ENV=production", () => { + process.env.RAILWAY_DEPLOYMENT_ID = "dep_123"; + process.env.NODE_ENV = "production"; + expect(getRivetkitRuntimeMode()).toBe("envoy"); + }); + + test("explicit envoy wins over NODE_ENV=production", () => { + process.env.RIVETKIT_RUNTIME_MODE = "envoy"; + process.env.NODE_ENV = "production"; + expect(getRivetkitRuntimeMode()).toBe("envoy"); + }); + + test("explicit serverless wins over Railway", () => { + process.env.RIVETKIT_RUNTIME_MODE = "serverless"; + process.env.RAILWAY_DEPLOYMENT_ID = "dep_123"; + expect(getRivetkitRuntimeMode()).toBe("serverless"); + }); + + test("empty string falls through to next rule", () => { + process.env.RIVETKIT_RUNTIME_MODE = ""; + process.env.NODE_ENV = "production"; + expect(getRivetkitRuntimeMode()).toBe("serverless"); + }); + + test("unrecognized value falls through to next rule", () => { + process.env.RIVETKIT_RUNTIME_MODE = "potato"; + process.env.RAILWAY_DEPLOYMENT_ID = "dep_123"; + expect(getRivetkitRuntimeMode()).toBe("envoy"); + }); + + test("RAILWAY_DEPLOYMENT_ID empty is treated as unset; falls through to NODE_ENV", () => { + process.env.RAILWAY_DEPLOYMENT_ID = ""; + process.env.NODE_ENV = "production"; + expect(getRivetkitRuntimeMode()).toBe("serverless"); + }); +}); + +describe("parsePortEnv", () => { + test("undefined input returns undefined", () => { + expect(parsePortEnv(undefined)).toBeUndefined(); + }); + + test("empty string returns undefined", () => { + expect(parsePortEnv("")).toBeUndefined(); + }); + + test("valid integer string parses", () => { + expect(parsePortEnv("8080")).toBe(8080); + }); + + test("port 1 is accepted (lower bound)", () => { + expect(parsePortEnv("1")).toBe(1); + }); + + test("port 65535 is accepted (upper bound)", () => { + expect(parsePortEnv("65535")).toBe(65535); + }); + + test("port 0 is rejected", () => { + expect(() => parsePortEnv("0")).toThrow(/PORT env var must be/); + }); + + test("port 65536 is rejected", () => { + expect(() => parsePortEnv("65536")).toThrow(/PORT env var must be/); + }); + + test("non-numeric input is rejected", () => { + expect(() => parsePortEnv("notaport")).toThrow(/PORT env var must be/); + }); + + test("partial numeric input is rejected (parseInt would silently succeed)", () => { + expect(() => parsePortEnv("8080abc")).toThrow(/PORT env var must be/); + }); + + test("negative input is rejected", () => { + expect(() => parsePortEnv("-1")).toThrow(/PORT env var must be/); + }); +}); + +const testActor = actor({ + state: {}, + actions: {}, +}); + +describe("registry.listen() end-to-end", () => { + let registry: Registry | undefined; + let listenPromise: Promise | undefined; + + afterEach(async () => { + if (registry) { + await registry.shutdown(); + registry = undefined; + } + if (listenPromise) { + await listenPromise.catch(() => undefined); + listenPromise = undefined; + } + }, 30_000); + + test("binds the requested port and serves /api/rivet/metadata", async () => { + const port = await getPort({ host: "127.0.0.1" }); + registry = setup({ + use: { test: testActor }, + startEngine: false, + endpoint: "http://127.0.0.1:65535", + token: "dev", + namespace: "default", + noWelcome: true, + shutdown: { disableSignalHandlers: true }, + }) as Registry; + + listenPromise = registry.listen({ port, host: "127.0.0.1" }); + + const baseUrl = `http://127.0.0.1:${port}`; + const response = await waitForResponse( + `${baseUrl}/api/rivet/metadata`, + 15_000, + ); + expect(response.status).toBe(200); + const body = await response.json(); + expect(body.runtime).toBe("rivetkit"); + expect(body.actorNames).toBeDefined(); + expect(body.actorNames).toHaveProperty("test"); + }, 30_000); + + test("/api/rivet/health returns ok", async () => { + const port = await getPort({ host: "127.0.0.1" }); + registry = setup({ + use: { test: testActor }, + startEngine: false, + endpoint: "http://127.0.0.1:65535", + token: "dev", + namespace: "default", + noWelcome: true, + shutdown: { disableSignalHandlers: true }, + }) as Registry; + + listenPromise = registry.listen({ port, host: "127.0.0.1" }); + + const response = await waitForResponse( + `http://127.0.0.1:${port}/api/rivet/health`, + 15_000, + ); + expect(response.status).toBe(200); + const body = await response.json(); + expect(body.runtime).toBe("rivetkit"); + expect(body.status).toBeDefined(); + }, 30_000); +}); + +/** + * Poll the URL until it responds (the listener takes a moment to bind and + * build the serverless runtime on first request). + */ +async function waitForResponse( + url: string, + timeoutMs: number, +): Promise { + const deadline = Date.now() + timeoutMs; + let lastError: unknown; + while (Date.now() < deadline) { + try { + const response = await fetch(url); + return response; + } catch (error) { + lastError = error; + await new Promise((resolve) => setTimeout(resolve, 100)); + } + } + throw new Error(`timed out waiting for ${url}: ${String(lastError)}`); +} diff --git a/website/src/content/docs/general/environment-variables.mdx b/website/src/content/docs/general/environment-variables.mdx index 737f587e19..d1650ad1ca 100644 --- a/website/src/content/docs/general/environment-variables.mdx +++ b/website/src/content/docs/general/environment-variables.mdx @@ -64,6 +64,14 @@ These variables configure how clients connect to your actors. | `RIVETKIT_RUNTIME` | Runtime binding to use for RivetKit core: `auto`, `native`, or `wasm`. Defaults to `auto`. | | `RIVETKIT_STORAGE_PATH` | Overrides the default file-system storage path used by RivetKit when using the default driver. | +## Lifecycle + +| Environment Variable | Description | +|---------------------|-------------| +| `RIVETKIT_RUNTIME_MODE` | Controls how `registry.start()` runs. `envoy` opens a long-lived WebSocket to the engine (Mode A) and auto-upserts a normal runner config to the engine on startup (even against remote endpoints). `serverless` binds an HTTP listener via `registry.listen()` (Mode B) and leaves runner-config management to the caller. Resolution: explicit value > `RAILWAY_DEPLOYMENT_ID` set → `envoy` > `NODE_ENV=production` → `serverless` > dev default `envoy`. | +| `RAILWAY_DEPLOYMENT_ID` | Automatically set by Railway. RivetKit uses presence to default `RIVETKIT_RUNTIME_MODE=envoy` when not explicitly set. | +| `RIVETKIT_PUBLIC_DIR` | Directory of static assets to serve alongside the framework routes when calling `registry.listen()`. Used as a fallback when `opts.publicDir` is not passed. On auto-listen via `registry.start()`, defaults to `/public` when this env var is unset. | + ## Logging | Environment Variable | Description | From d3906121ba0d117913fcd7012867beab5cfa1c96 Mon Sep 17 00:00:00 2001 From: ABCxFF <79597906+abcxff@users.noreply.github.com> Date: Tue, 9 Jun 2026 15:48:54 +0000 Subject: [PATCH 2/2] feat(rivetkit): RIVETKIT_RUNTIME_MODE env var, listener hardening --- .../src/registry/envoy_callbacks.rs | 1 - .../rivetkit-core/src/registry/mod.rs | 6 +- .../src/registry/runner_config.rs | 14 ++-- .../rivetkit-core/tests/serverless.rs | 1 - .../packages/rivetkit-napi/index.d.ts | 1 - .../packages/rivetkit-napi/src/registry.rs | 4 - .../packages/rivetkit/src/registry/index.ts | 15 ++-- .../packages/rivetkit/src/registry/runtime.ts | 5 -- .../packages/rivetkit/src/utils/env-vars.ts | 14 ++-- .../packages/rivetkit/tests/listener.test.ts | 73 ++++++------------- .../docs/general/environment-variables.mdx | 4 +- 11 files changed, 43 insertions(+), 95 deletions(-) diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/envoy_callbacks.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/envoy_callbacks.rs index 0526eb431f..77bababf2d 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/envoy_callbacks.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/envoy_callbacks.rs @@ -227,7 +227,6 @@ impl ServeConfig { serverless_validate_endpoint: settings.serverless_validate_endpoint, serverless_max_start_payload_bytes: settings.serverless_max_start_payload_bytes, serverless_cache_envoy: true, - force_normal_runner_config_upsert: false, } } } diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs index 8dddbd8507..52624ef87e 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs @@ -242,10 +242,6 @@ pub struct ServeConfig { pub serverless_validate_endpoint: bool, pub serverless_max_start_payload_bytes: usize, pub serverless_cache_envoy: bool, - /// When true, upsert a normal runner config to the engine on startup - /// even if the endpoint is not a local engine. Set by the TS Registry - /// when `RIVETKIT_RUNTIME_MODE` resolves to `envoy`. - pub force_normal_runner_config_upsert: bool, } #[derive(Debug, Default, Deserialize)] @@ -601,7 +597,7 @@ impl CoreRegistry { } #[cfg(feature = "native-runtime")] - runner_config::ensure_normal_runner_config(&config).await?; + runner_config::ensure_local_normal_runner_config(&config).await?; let callbacks = Arc::new(RegistryCallbacks { dispatcher: dispatcher.clone(), }); diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/runner_config.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/runner_config.rs index 340a5545db..88b1480b55 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/runner_config.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/runner_config.rs @@ -17,8 +17,8 @@ struct Datacenter { name: String, } -pub(super) async fn ensure_normal_runner_config(config: &ServeConfig) -> Result<()> { - if !is_local_engine_endpoint(&config.endpoint) && !config.force_normal_runner_config_upsert { +pub(super) async fn ensure_local_normal_runner_config(config: &ServeConfig) -> Result<()> { + if !is_local_engine_endpoint(&config.endpoint) { return Ok(()); } @@ -51,7 +51,7 @@ pub(super) async fn ensure_normal_runner_config(config: &ServeConfig) -> Result< .json(&body) .send() .await - .context("upsert normal runner config")?; + .context("upsert local runner config")?; let status = response.status(); if !status.is_success() { let response_body = response @@ -59,7 +59,7 @@ pub(super) async fn ensure_normal_runner_config(config: &ServeConfig) -> Result< .await .context("read failed runner config response body")?; anyhow::bail!( - "failed to upsert normal runner config `{}`: {} {}", + "failed to upsert local runner config `{}`: {} {}", config.pool_name, status, response_body @@ -69,7 +69,7 @@ pub(super) async fn ensure_normal_runner_config(config: &ServeConfig) -> Result< tracing::debug!( namespace = %config.namespace, pool_name = %config.pool_name, - "ensured normal runner config" + "ensured local normal runner config" ); Ok(()) @@ -80,7 +80,7 @@ async fn get_datacenters(client: &Client, config: &ServeConfig) -> Result Result, pub serverless_validate_endpoint: bool, pub serverless_max_start_payload_bytes: u32, - pub force_normal_runner_config_upsert: Option, } #[napi(object)] @@ -662,9 +661,6 @@ fn serve_config_from_js( serverless_validate_endpoint: config.serverless_validate_endpoint, serverless_max_start_payload_bytes: config.serverless_max_start_payload_bytes as usize, serverless_cache_envoy, - force_normal_runner_config_upsert: config - .force_normal_runner_config_upsert - .unwrap_or(false), } } diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts index c14d3c4d4d..8a3b993082 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts @@ -335,7 +335,7 @@ export class Registry { * the registry's serverless endpoints over it. Resolves only after the * registry is shut down (SIGINT/SIGTERM or `nativeRegistry.shutdown()`). * - * @param opts.port Port to listen on. Defaults to `process.env.PORT` + * @param opts.port Port to listen on. Defaults to `process.env.RIVET_PORT` * if set, otherwise 3000. * @param opts.host Address to bind. Defaults to `0.0.0.0`. * @param opts.publicDir If set, serves static files from this directory @@ -350,7 +350,7 @@ export class Registry { public async listen( opts: { port?: number; host?: string; publicDir?: string } = {}, ): Promise { - const port = opts.port ?? parsePortEnv(process.env.PORT) ?? 3000; + const port = opts.port ?? parsePortEnv(process.env.RIVET_PORT) ?? 3000; const publicDir = opts.publicDir ?? getRivetkitPublicDir(); const config = this.parseConfig(); @@ -693,14 +693,9 @@ export class Registry { /** * Starts the actor envoy for standalone server deployments. * - * Auto-promotes to `listen()` when `NODE_ENV === "production"` so the - * same `start()` call boots an HTTP listener in deployed containers - * while keeping the persistent-envoy WS behavior in local development. - * The `RIVETKIT_AUTO_LISTEN` env var overrides the heuristic: `1` - * forces auto-listen on, `0` forces it off. - * - * Mode A (envoy) and Mode B (listener) are mutually exclusive per - * registry instance. + * Set `RIVETKIT_RUNTIME_MODE=serverless` to instead bind an HTTP listener + * via `listen()` (Mode B). Mode A (envoy) and Mode B (listener) are + * mutually exclusive per registry instance. * * @example * ```ts diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts index c356e626af..0727dfc05e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts @@ -1,5 +1,4 @@ import type { SqliteNativeMetrics } from "@/common/database/config"; -import { getRivetkitRuntimeMode } from "@/utils/env-vars"; import type { RegistryConfig } from "./config"; declare const handleBrand: unique symbol; @@ -262,7 +261,6 @@ export interface RuntimeServeConfig { serverlessClientToken?: string; serverlessValidateEndpoint: boolean; serverlessMaxStartPayloadBytes: number; - forceNormalRunnerConfigUpsert?: boolean; } export interface RuntimeListenerConfig { @@ -621,9 +619,6 @@ export async function buildServeConfig( serveConfig.inspectorTestToken = process.env._RIVET_TEST_INSPECTOR_TOKEN ?? "token"; } - if (getRivetkitRuntimeMode() === "envoy") { - serveConfig.forceNormalRunnerConfigUpsert = true; - } return serveConfig; } diff --git a/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts b/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts index df014e3eb6..8a41419c36 100644 --- a/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts +++ b/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts @@ -52,14 +52,10 @@ export const getRivetkitRuntime = (): string | undefined => getEnvUniversal("RIVETKIT_RUNTIME"); export type RuntimeMode = "envoy" | "serverless"; -export const getRivetkitRuntimeMode = (): RuntimeMode => { - const explicit = getEnvUniversal("RIVETKIT_RUNTIME_MODE"); - if (explicit === "envoy" || explicit === "serverless") return explicit; - const railwayId = getEnvUniversal("RAILWAY_DEPLOYMENT_ID"); - if (railwayId !== undefined && railwayId !== "") return "envoy"; - if (getNodeEnv() === "production") return "serverless"; - return "envoy"; -}; +export const getRivetkitRuntimeMode = (): RuntimeMode => + getEnvUniversal("RIVETKIT_RUNTIME_MODE") === "serverless" + ? "serverless" + : "envoy"; export const getRivetkitPublicDir = (): string | undefined => { const value = getEnvUniversal("RIVETKIT_PUBLIC_DIR"); @@ -76,7 +72,7 @@ export function parsePortEnv(raw: string | undefined): number | undefined { String(parsed) !== raw.trim() ) { throw new Error( - `PORT env var must be an integer between 1 and 65535; got "${raw}"`, + `RIVET_PORT env var must be an integer between 1 and 65535; got "${raw}"`, ); } return parsed; diff --git a/rivetkit-typescript/packages/rivetkit/tests/listener.test.ts b/rivetkit-typescript/packages/rivetkit/tests/listener.test.ts index aeb40155bb..521cd18b4b 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/listener.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/listener.test.ts @@ -5,73 +5,40 @@ import type { Registry } from "@/registry"; import { getRivetkitRuntimeMode, parsePortEnv } from "@/utils/env-vars"; describe("getRivetkitRuntimeMode", () => { - const KEYS = [ - "RIVETKIT_RUNTIME_MODE", - "RAILWAY_DEPLOYMENT_ID", - "NODE_ENV", - ] as const; - let snapshot: Record; + let snapshot: string | undefined; beforeEach(() => { - snapshot = Object.fromEntries(KEYS.map((k) => [k, process.env[k]])); - for (const k of KEYS) delete process.env[k]; + snapshot = process.env.RIVETKIT_RUNTIME_MODE; + delete process.env.RIVETKIT_RUNTIME_MODE; }); afterEach(() => { - for (const k of KEYS) { - if (snapshot[k] === undefined) delete process.env[k]; - else process.env[k] = snapshot[k]; - } + if (snapshot === undefined) delete process.env.RIVETKIT_RUNTIME_MODE; + else process.env.RIVETKIT_RUNTIME_MODE = snapshot; }); - test("dev default (no env) is envoy", () => { + test("default (unset) is envoy", () => { expect(getRivetkitRuntimeMode()).toBe("envoy"); }); - test("NODE_ENV=production defaults to serverless", () => { - process.env.NODE_ENV = "production"; + test("explicit serverless", () => { + process.env.RIVETKIT_RUNTIME_MODE = "serverless"; expect(getRivetkitRuntimeMode()).toBe("serverless"); }); - test("RAILWAY_DEPLOYMENT_ID set defaults to envoy", () => { - process.env.RAILWAY_DEPLOYMENT_ID = "dep_123"; - expect(getRivetkitRuntimeMode()).toBe("envoy"); - }); - - test("Railway overrides NODE_ENV=production", () => { - process.env.RAILWAY_DEPLOYMENT_ID = "dep_123"; - process.env.NODE_ENV = "production"; - expect(getRivetkitRuntimeMode()).toBe("envoy"); - }); - - test("explicit envoy wins over NODE_ENV=production", () => { + test("explicit envoy", () => { process.env.RIVETKIT_RUNTIME_MODE = "envoy"; - process.env.NODE_ENV = "production"; expect(getRivetkitRuntimeMode()).toBe("envoy"); }); - test("explicit serverless wins over Railway", () => { - process.env.RIVETKIT_RUNTIME_MODE = "serverless"; - process.env.RAILWAY_DEPLOYMENT_ID = "dep_123"; - expect(getRivetkitRuntimeMode()).toBe("serverless"); - }); - - test("empty string falls through to next rule", () => { + test("empty string is envoy", () => { process.env.RIVETKIT_RUNTIME_MODE = ""; - process.env.NODE_ENV = "production"; - expect(getRivetkitRuntimeMode()).toBe("serverless"); + expect(getRivetkitRuntimeMode()).toBe("envoy"); }); - test("unrecognized value falls through to next rule", () => { + test("unrecognized value is envoy", () => { process.env.RIVETKIT_RUNTIME_MODE = "potato"; - process.env.RAILWAY_DEPLOYMENT_ID = "dep_123"; expect(getRivetkitRuntimeMode()).toBe("envoy"); }); - - test("RAILWAY_DEPLOYMENT_ID empty is treated as unset; falls through to NODE_ENV", () => { - process.env.RAILWAY_DEPLOYMENT_ID = ""; - process.env.NODE_ENV = "production"; - expect(getRivetkitRuntimeMode()).toBe("serverless"); - }); }); describe("parsePortEnv", () => { @@ -96,23 +63,29 @@ describe("parsePortEnv", () => { }); test("port 0 is rejected", () => { - expect(() => parsePortEnv("0")).toThrow(/PORT env var must be/); + expect(() => parsePortEnv("0")).toThrow(/RIVET_PORT env var must be/); }); test("port 65536 is rejected", () => { - expect(() => parsePortEnv("65536")).toThrow(/PORT env var must be/); + expect(() => parsePortEnv("65536")).toThrow( + /RIVET_PORT env var must be/, + ); }); test("non-numeric input is rejected", () => { - expect(() => parsePortEnv("notaport")).toThrow(/PORT env var must be/); + expect(() => parsePortEnv("notaport")).toThrow( + /RIVET_PORT env var must be/, + ); }); test("partial numeric input is rejected (parseInt would silently succeed)", () => { - expect(() => parsePortEnv("8080abc")).toThrow(/PORT env var must be/); + expect(() => parsePortEnv("8080abc")).toThrow( + /RIVET_PORT env var must be/, + ); }); test("negative input is rejected", () => { - expect(() => parsePortEnv("-1")).toThrow(/PORT env var must be/); + expect(() => parsePortEnv("-1")).toThrow(/RIVET_PORT env var must be/); }); }); diff --git a/website/src/content/docs/general/environment-variables.mdx b/website/src/content/docs/general/environment-variables.mdx index d1650ad1ca..b0fe3a72b6 100644 --- a/website/src/content/docs/general/environment-variables.mdx +++ b/website/src/content/docs/general/environment-variables.mdx @@ -68,9 +68,9 @@ These variables configure how clients connect to your actors. | Environment Variable | Description | |---------------------|-------------| -| `RIVETKIT_RUNTIME_MODE` | Controls how `registry.start()` runs. `envoy` opens a long-lived WebSocket to the engine (Mode A) and auto-upserts a normal runner config to the engine on startup (even against remote endpoints). `serverless` binds an HTTP listener via `registry.listen()` (Mode B) and leaves runner-config management to the caller. Resolution: explicit value > `RAILWAY_DEPLOYMENT_ID` set → `envoy` > `NODE_ENV=production` → `serverless` > dev default `envoy`. | -| `RAILWAY_DEPLOYMENT_ID` | Automatically set by Railway. RivetKit uses presence to default `RIVETKIT_RUNTIME_MODE=envoy` when not explicitly set. | +| `RIVETKIT_RUNTIME_MODE` | Controls how `registry.start()` runs. Defaults to `envoy`: opens a long-lived WebSocket to the engine (Mode A). Set to `serverless` to bind an HTTP listener via `registry.listen()` (Mode B). | | `RIVETKIT_PUBLIC_DIR` | Directory of static assets to serve alongside the framework routes when calling `registry.listen()`. Used as a fallback when `opts.publicDir` is not passed. On auto-listen via `registry.start()`, defaults to `/public` when this env var is unset. | +| `RIVET_PORT` | Port the listener binds when calling `registry.listen()` without an explicit `opts.port`. Must be an integer between 1 and 65535. Defaults to `3000`. | ## Logging