diff --git a/CLAUDE.md b/CLAUDE.md index 71fe123a19..96472c984f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -131,6 +131,7 @@ docker-compose up -d - Use `scripts/cargo/check-rivetkit-core-wasm.sh` as the canonical wasm gate for `rivetkit-core`; it checks the wasm build, scans native dependency leaks, and verifies native transport/runtime features fail on wasm. - The high-level `rivetkit` crate stays a thin typed wrapper over `rivetkit-core` and re-exports shared transport/config types instead of redefining them. - When `rivetkit` needs ergonomic helpers on a `rivetkit-core` type it re-exports, prefer an extension trait plus `prelude` re-export instead of wrapping and replacing the core type. +- RivetKit action and event protocol `args` must always be array-shaped before crossing the client protocol boundary. Normalize at the server/source side, not in client delivery code: named structs/objects become `[object]`, tuples/arrays stay positional, scalars become `[scalar]`, and unit/null becomes `[]`. - `engine/sdks/*/api-*` are auto-generated SDK outputs; update the source API schema and regenerate them instead of editing them by hand. ### RivetKit Test Fixtures diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs index 8bff175167..9839d79247 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs @@ -834,19 +834,13 @@ pub(super) fn decode_http_action_args( HttpResponseEncoding::Json => { let request: HttpActionRequestJson = serde_json::from_slice(body).context("decode json HTTP action request")?; - let args = match request.args { - JsonValue::Array(args) => args, - _ => Vec::new(), - }; + let args = normalize_json_args(request.args); encode_json_as_cbor(&args) } HttpResponseEncoding::Cbor => { let request: HttpActionRequestJson = ciborium::from_reader(Cursor::new(body)) .context("decode cbor HTTP action request")?; - let args = match request.args { - JsonValue::Array(args) => args, - _ => Vec::new(), - }; + let args = normalize_json_args(request.args); encode_json_as_cbor(&args) } HttpResponseEncoding::Bare => { @@ -858,6 +852,14 @@ pub(super) fn decode_http_action_args( } } +fn normalize_json_args(args: JsonValue) -> Vec { + match args { + JsonValue::Array(args) => args, + JsonValue::Null => Vec::new(), + value => vec![value], + } +} + pub(super) fn decode_http_queue_request( encoding: HttpResponseEncoding, body: &[u8], diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs index d591b5594f..448cdd09ec 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs @@ -144,8 +144,33 @@ impl RegistryDispatcher { Ok(body) => body, Err(response) => return Ok(Some(response)), }; + if !body.args.is_empty() && body.properties.is_some() { + return Ok(Some(json_http_response( + StatusCode::BAD_REQUEST, + &json!({ + "error": "use either args or properties, not both", + }), + )?)); + } + if body + .properties + .as_ref() + .is_some_and(|properties| !properties.is_object()) + { + return Ok(Some(json_http_response( + StatusCode::BAD_REQUEST, + &json!({ + "error": "properties must be an object", + }), + )?)); + } + let args = if let Some(properties) = body.properties { + vec![properties] + } else { + body.args + }; match self - .execute_inspector_action(instance, &action_name, body.args) + .execute_inspector_action(instance, &action_name, args) .await { Ok(output) => json_http_response( diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs index d7b76a0c21..3f1581e787 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs @@ -251,6 +251,7 @@ struct InspectorPatchStateBody { #[serde(default)] struct InspectorActionBody { args: Vec, + properties: Option, } #[derive(Debug, Default, Deserialize)] diff --git a/rivetkit-rust/packages/rivetkit/src/action.rs b/rivetkit-rust/packages/rivetkit/src/action.rs index 96035714fa..80c5389fc9 100644 --- a/rivetkit-rust/packages/rivetkit/src/action.rs +++ b/rivetkit-rust/packages/rivetkit/src/action.rs @@ -21,12 +21,17 @@ pub trait Action: serde::Serialize + DeserializeOwned + Send + Sync + 'static { } pub fn encode_positional(value: &T) -> Result> { + encode_varargs(value, "action args") +} + +pub(crate) fn encode_varargs(value: &T, label: &str) -> Result> { let mut encoded = Vec::new(); - ciborium::into_writer(value, &mut encoded).context("encode action args as cbor")?; + ciborium::into_writer(value, &mut encoded) + .with_context(|| format!("encode {label} as cbor"))?; let value: Value = ciborium::from_reader(Cursor::new(&encoded)) - .context("decode action args into cbor value")?; + .with_context(|| format!("decode {label} into cbor value"))?; let value = positional_value(value); - encode_value(&value) + encode_value(&value, label) } pub fn decode_positional(args: &[u8]) -> Result { @@ -54,16 +59,17 @@ pub fn decode_positional(args: &[u8]) -> Result { fn positional_value(value: Value) -> Value { match value { - Value::Map(entries) => Value::Array(entries.into_iter().map(|(_, value)| value).collect()), + Value::Map(_) => Value::Array(vec![value]), Value::Array(values) => Value::Array(values), Value::Null => Value::Array(Vec::new()), value => Value::Array(vec![value]), } } -fn encode_value(value: &Value) -> Result> { +fn encode_value(value: &Value, label: &str) -> Result> { let mut encoded = Vec::new(); - ciborium::into_writer(value, &mut encoded).context("encode positional action args as cbor")?; + ciborium::into_writer(value, &mut encoded) + .with_context(|| format!("encode positional {label} as cbor"))?; Ok(encoded) } @@ -339,14 +345,17 @@ mod tests { } #[test] - fn positional_encode_has_ts_byte_parity() { + fn positional_encode_matches_ts_action_args() { assert_eq!( encode_positional(&NamedArgs { first: "a".into(), second: "b".into(), }) .expect("encode named args"), - vec![0x82, 0x61, b'a', 0x61, b'b'] + vec![ + 0x81, 0xa2, 0x65, b'f', b'i', b'r', b's', b't', 0x61, b'a', 0x66, b's', b'e', b'c', + b'o', b'n', b'd', 0x61, b'b', + ] ); assert_eq!( encode_positional(&NewtypeArg(5)).expect("encode newtype arg"), @@ -410,6 +419,13 @@ mod tests { })) .expect("decode named args from map"); assert_eq!(from_map, from_seq); + + let from_single_map_arg = decode_positional::(&cbor(&vec![NamedArgs { + first: "a".into(), + second: "b".into(), + }])) + .expect("decode named args from single object arg"); + assert_eq!(from_single_map_arg, from_seq); } #[test] @@ -425,7 +441,7 @@ mod tests { } #[test] - fn positional_encode_leaves_nested_struct_as_map() { + fn positional_encode_wraps_named_struct_as_single_arg() { let bytes = encode_positional(&WithNested { nested: Nested { value: 7, @@ -440,9 +456,11 @@ mod tests { let ciborium::Value::Array(values) = value else { panic!("top-level args should be an array"); }; - assert_eq!(values.len(), 2); - assert!(matches!(values[0], ciborium::Value::Map(_))); - assert!(matches!(values[1], ciborium::Value::Bool(true))); + assert_eq!(values.len(), 1); + let ciborium::Value::Map(fields) = &values[0] else { + panic!("named struct arg should remain a map"); + }; + assert_eq!(fields.len(), 2); } fn cbor(value: &T) -> Vec { diff --git a/rivetkit-rust/packages/rivetkit/src/context.rs b/rivetkit-rust/packages/rivetkit/src/context.rs index a94fa65f44..25e4f0e514 100644 --- a/rivetkit-rust/packages/rivetkit/src/context.rs +++ b/rivetkit-rust/packages/rivetkit/src/context.rs @@ -21,6 +21,7 @@ use rivetkit_core::{ use serde::{Serialize, de::DeserializeOwned}; use tokio_util::sync::CancellationToken; +use crate::action; use crate::actor::Actor; use crate::event::Event; use crate::queue::Queue; @@ -345,7 +346,7 @@ impl Ctx { } pub fn broadcast(&self, name: &str, event: &E) -> Result<()> { - let event_bytes = encode_cbor(event, "broadcast event")?; + let event_bytes = action::encode_varargs(event, "event args")?; self.inner.broadcast(name, &event_bytes); Ok(()) } @@ -514,7 +515,7 @@ impl ConnCtx { } pub fn send(&self, name: &str, event: &E) -> Result<()> { - let event_bytes = encode_cbor(event, "connection event")?; + let event_bytes = action::encode_varargs(event, "connection event args")?; self.inner.send(name, &event_bytes); Ok(()) } diff --git a/rivetkit-rust/packages/rivetkit/src/typed_client.rs b/rivetkit-rust/packages/rivetkit/src/typed_client.rs index eb47713f47..4c20d3dd25 100644 --- a/rivetkit-rust/packages/rivetkit/src/typed_client.rs +++ b/rivetkit-rust/packages/rivetkit/src/typed_client.rs @@ -224,8 +224,31 @@ pub(crate) fn encode_action_args(action: &M) -> Result } fn decode_event(event: &ClientEvent) -> Result { - ciborium::from_reader(Cursor::new(&event.raw_args)) - .with_context(|| format!("decode typed event '{}'", E::NAME)) + decode_event_args(&event.raw_args).with_context(|| format!("decode typed event '{}'", E::NAME)) +} + +fn decode_event_args(raw_args: &[u8]) -> Result { + let value: CborValue = + ciborium::from_reader(Cursor::new(raw_args)).context("decode typed event args as cbor")?; + match value { + CborValue::Array(values) if values.is_empty() => { + crate::event::deserialize_cbor_value(CborValue::Null) + .map_err(|error| anyhow::anyhow!(error.to_string())) + .context("decode typed event from empty args") + } + CborValue::Array(mut values) if values.len() == 1 => { + let value = values.remove(0); + crate::event::deserialize_cbor_value(value) + .map_err(|error| anyhow::anyhow!(error.to_string())) + .context("decode typed event from single arg") + } + CborValue::Array(values) => crate::event::deserialize_cbor_value(CborValue::Array(values)) + .map_err(|error| anyhow::anyhow!(error.to_string())) + .context("decode typed event from positional args"), + value => crate::event::deserialize_cbor_value(value) + .map_err(|error| anyhow::anyhow!(error.to_string())) + .context("decode typed event from legacy payload"), + } } fn cbor_to_json(value: CborValue) -> Result { diff --git a/rivetkit-rust/packages/rivetkit/tests/client.rs b/rivetkit-rust/packages/rivetkit/tests/client.rs index f75f5bc3e1..1f1d4bdac7 100644 --- a/rivetkit-rust/packages/rivetkit/tests/client.rs +++ b/rivetkit-rust/packages/rivetkit/tests/client.rs @@ -293,10 +293,10 @@ async fn typed_event_connection(mut socket: WebSocket) { socket .send(connection_message(wire::ToClientBody::Event(wire::Event { name: "notice".to_owned(), - args: cbor(&SiblingNotice { + args: cbor(&vec![SiblingNotice { message: "typed-event".to_owned(), count: 7, - }), + }]), }))) .await .unwrap(); diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts index fa2dddf8a9..a0a9e2af83 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts @@ -630,6 +630,14 @@ function encodeValue(value: unknown): RuntimeBytes { return encodeCborCompat(value as JsonCompatValue); } +function normalizeArgs(value: unknown): unknown[] { + return Array.isArray(value) + ? value + : value === undefined || value === null + ? [] + : [value]; +} + function unwrapTsfnPayload(error: unknown, payload: T): T { if (error !== null && error !== undefined) { throw error; @@ -1096,11 +1104,7 @@ function wrapNativeCallback, Result>( function decodeArgs(value?: RuntimeBytes | null): unknown[] { const decoded = decodeValue(value); - return Array.isArray(decoded) - ? decoded - : decoded === undefined - ? [] - : [decoded]; + return normalizeArgs(decoded); } function buildRequest(init: { @@ -3837,14 +3841,38 @@ export function buildNativeFactory( 404, ); } - const body = (await jsRequest.json()) as { args?: unknown[] }; + const body = (await jsRequest.json()) as { + args?: unknown; + properties?: unknown; + }; + if (body.args !== undefined && body.properties !== undefined) { + return jsonResponse( + { error: "use either args or properties, not both" }, + { status: 400 }, + ); + } + if ( + body.properties !== undefined && + (body.properties === null || + typeof body.properties !== "object" || + Array.isArray(body.properties)) + ) { + return jsonResponse( + { error: "properties must be an object" }, + { status: 400 }, + ); + } + const args = + body.properties !== undefined + ? [body.properties] + : normalizeArgs(body.args); try { const output = await action( actorCtx, ...validateActionArgs( schemaConfig.actionInputSchemas, actionName, - body.args ?? [], + args, ), ); return jsonResponse({ output }); diff --git a/website/src/content/docs/actors/quickstart/rust.mdx b/website/src/content/docs/actors/quickstart/rust.mdx index 2958a6b912..ef81451a0e 100644 --- a/website/src/content/docs/actors/quickstart/rust.mdx +++ b/website/src/content/docs/actors/quickstart/rust.mdx @@ -198,16 +198,17 @@ const client = createClient("http://localhost:6420"); const counter = client.counter.getOrCreate(["my-counter"]); -const count = await counter.increment(3); +const counterConnection = counter.connect(); +counterConnection.on("newCount", (event) => { + console.log("Event count:", event.count); +}); + +const count = await counterConnection.increment(3); console.log("New count:", count); -await counter.connect().increment(1); +await counterConnection.increment(1); ``` - -Events emitted by a Rust actor with `ctx.emit(...)` are broadcast as a single serialized struct value. The TypeScript and React clients deliver event arguments positionally, so consuming a Rust struct event from JavaScript is not supported yet. Call actions across languages, and subscribe to events from Rust clients. - - See the [JavaScript client documentation](/docs/clients/javascript) for more information. @@ -229,10 +230,13 @@ function Counter() { }); const increment = async () => { - const next = await counter.connection?.increment(1); - if (typeof next === "number") setCount(next); + await counter.connection?.increment(1); }; + counter.useEvent("newCount", (event) => { + setCount(event.count); + }); + return (

Count: {count}

@@ -242,10 +246,6 @@ function Counter() { } ``` - -Events emitted by a Rust actor with `ctx.emit(...)` are broadcast as a single serialized struct value. The TypeScript and React clients deliver event arguments positionally, so `useEvent` cannot consume a Rust struct event yet. This example reads the action return value instead. - - See the [React documentation](/docs/clients/react) for more information. diff --git a/website/src/content/docs/actors/state.mdx b/website/src/content/docs/actors/state.mdx index 98661fa864..dcdd808127 100644 --- a/website/src/content/docs/actors/state.mdx +++ b/website/src/content/docs/actors/state.mdx @@ -4,7 +4,7 @@ description: "Actors store state in memory for instant reads and writes. State c skill: true --- -## Durable vs Ephemeral +## Types of State There are three ways to store data in an actor, depending on what it looks like and whether it needs to survive restarts. @@ -150,19 +150,21 @@ function createEventEmitter(): EventEmitter { import { actor } from "rivetkit"; import { Pool } from "pg"; +// One shared pool for the whole process, created once and reused by every actor +const pool = new Pool({ connectionString: process.env.DATABASE_URL }); + const userActor = actor({ state: { profile: null as Record | null }, - // Open a connection and load initial data on every start + // Load this actor's row from the shared pool on each start createVars: async (c) => { - const pool = new Pool({ connectionString: process.env.DATABASE_URL }); - const result = await pool.query("SELECT * FROM users WHERE id = $1", [c.key[0]]); - return { pool, profile: result.rows[0] }; + const { rows } = await pool.query("SELECT * FROM users WHERE id = $1", [c.key[0]]); + return { profile: rows[0] }; }, actions: { updateEmail: async (c, email: string) => { - await c.vars.pool.query("UPDATE users SET email = $1 WHERE id = $2", [email, c.key[0]]); + await pool.query("UPDATE users SET email = $1 WHERE id = $2", [email, c.key[0]]); } } }); @@ -340,24 +342,26 @@ const room = actor({ ### Loading from external sources -`createVars` can be `async`, so open a connection and load initial data on each start. The connection lives only in memory: +Create the connection pool once at module scope and share it across all actors, then use `createVars` (which can be `async`) to load this actor's data from it on each start: ```typescript @nocheck import { actor } from "rivetkit"; import { Pool } from "pg"; +// One shared pool for the whole process, not one per actor +const pool = new Pool({ connectionString: process.env.DATABASE_URL }); + const profile = actor({ state: { cachedName: "" }, createVars: async (c) => { - const pool = new Pool({ connectionString: process.env.DATABASE_URL }); const { rows } = await pool.query("SELECT * FROM users WHERE id = $1", [c.key[0]]); - return { pool, user: rows[0] }; + return { user: rows[0] }; }, actions: { updateEmail: async (c, email: string) => { - await c.vars.pool.query("UPDATE users SET email = $1 WHERE id = $2", [email, c.key[0]]); + await pool.query("UPDATE users SET email = $1 WHERE id = $2", [email, c.key[0]]); } } }); @@ -367,15 +371,18 @@ When the actor owns its data, prefer [durable state](#durable-state) or [SQLite] ### Cleanup -`vars` is dropped when the actor stops, but external resources are not closed for you. Release them in `onSleep` and `onDestroy`: +`vars` is dropped when the actor stops, but per-actor resources like timers, subscriptions, and dedicated connections aren't cleaned up for you. Release them in `onSleep` and `onDestroy`. A shared pool stays open for the whole process, so don't close it per actor. ```typescript @nocheck -const profile = actor({ - createVars: () => ({ pool: new Pool() }), +const poller = actor({ + state: { ticks: 0 }, + + // Per-actor timer started on each wake + createVars: (c) => ({ timer: setInterval(() => c.state.ticks++, 5000) }), - // Close the connection before the actor sleeps or is destroyed - onSleep: (c) => c.vars.pool.end(), - onDestroy: (c) => c.vars.pool.end(), + // Clear it before the actor sleeps or is destroyed + onSleep: (c) => clearInterval(c.vars.timer), + onDestroy: (c) => clearInterval(c.vars.timer), actions: { /* ... */ } });