Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ docker-compose up -d
- Use `scripts/cargo/check-rivetkit-core-wasm.sh` as the canonical wasm gate for `rivetkit-core`; it checks the wasm build, scans native dependency leaks, and verifies native transport/runtime features fail on wasm.
- The high-level `rivetkit` crate stays a thin typed wrapper over `rivetkit-core` and re-exports shared transport/config types instead of redefining them.
- When `rivetkit` needs ergonomic helpers on a `rivetkit-core` type it re-exports, prefer an extension trait plus `prelude` re-export instead of wrapping and replacing the core type.
- RivetKit action and event protocol `args` must always be array-shaped before crossing the client protocol boundary. Normalize at the server/source side, not in client delivery code: named structs/objects become `[object]`, tuples/arrays stay positional, scalars become `[scalar]`, and unit/null becomes `[]`.
- `engine/sdks/*/api-*` are auto-generated SDK outputs; update the source API schema and regenerate them instead of editing them by hand.

### RivetKit Test Fixtures
Expand Down
18 changes: 10 additions & 8 deletions rivetkit-rust/packages/rivetkit-core/src/registry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,19 +834,13 @@ pub(super) fn decode_http_action_args(
HttpResponseEncoding::Json => {
let request: HttpActionRequestJson =
serde_json::from_slice(body).context("decode json HTTP action request")?;
let args = match request.args {
JsonValue::Array(args) => args,
_ => Vec::new(),
};
let args = normalize_json_args(request.args);
encode_json_as_cbor(&args)
}
HttpResponseEncoding::Cbor => {
let request: HttpActionRequestJson = ciborium::from_reader(Cursor::new(body))
.context("decode cbor HTTP action request")?;
let args = match request.args {
JsonValue::Array(args) => args,
_ => Vec::new(),
};
let args = normalize_json_args(request.args);
encode_json_as_cbor(&args)
}
HttpResponseEncoding::Bare => {
Expand All @@ -858,6 +852,14 @@ pub(super) fn decode_http_action_args(
}
}

fn normalize_json_args(args: JsonValue) -> Vec<JsonValue> {
match args {
JsonValue::Array(args) => args,
JsonValue::Null => Vec::new(),
value => vec![value],
}
}

pub(super) fn decode_http_queue_request(
encoding: HttpResponseEncoding,
body: &[u8],
Expand Down
27 changes: 26 additions & 1 deletion rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,33 @@ impl RegistryDispatcher {
Ok(body) => body,
Err(response) => return Ok(Some(response)),
};
if !body.args.is_empty() && body.properties.is_some() {
return Ok(Some(json_http_response(
StatusCode::BAD_REQUEST,
&json!({
"error": "use either args or properties, not both",
}),
)?));
}
if body
.properties
.as_ref()
.is_some_and(|properties| !properties.is_object())
{
return Ok(Some(json_http_response(
StatusCode::BAD_REQUEST,
&json!({
"error": "properties must be an object",
}),
)?));
}
let args = if let Some(properties) = body.properties {
vec![properties]
} else {
body.args
};
match self
.execute_inspector_action(instance, &action_name, body.args)
.execute_inspector_action(instance, &action_name, args)
.await
{
Ok(output) => json_http_response(
Expand Down
1 change: 1 addition & 0 deletions rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ struct InspectorPatchStateBody {
#[serde(default)]
struct InspectorActionBody {
args: Vec<JsonValue>,
properties: Option<JsonValue>,
}

#[derive(Debug, Default, Deserialize)]
Expand Down
42 changes: 30 additions & 12 deletions rivetkit-rust/packages/rivetkit/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ pub trait Action: serde::Serialize + DeserializeOwned + Send + Sync + 'static {
}

pub fn encode_positional<T: Serialize>(value: &T) -> Result<Vec<u8>> {
encode_varargs(value, "action args")
}

pub(crate) fn encode_varargs<T: Serialize>(value: &T, label: &str) -> Result<Vec<u8>> {
let mut encoded = Vec::new();
ciborium::into_writer(value, &mut encoded).context("encode action args as cbor")?;
ciborium::into_writer(value, &mut encoded)
.with_context(|| format!("encode {label} as cbor"))?;
let value: Value = ciborium::from_reader(Cursor::new(&encoded))
.context("decode action args into cbor value")?;
.with_context(|| format!("decode {label} into cbor value"))?;
let value = positional_value(value);
encode_value(&value)
encode_value(&value, label)
}

pub fn decode_positional<T: DeserializeOwned>(args: &[u8]) -> Result<T> {
Expand Down Expand Up @@ -54,16 +59,17 @@ pub fn decode_positional<T: DeserializeOwned>(args: &[u8]) -> Result<T> {

fn positional_value(value: Value) -> Value {
match value {
Value::Map(entries) => Value::Array(entries.into_iter().map(|(_, value)| value).collect()),
Value::Map(_) => Value::Array(vec![value]),
Value::Array(values) => Value::Array(values),
Value::Null => Value::Array(Vec::new()),
value => Value::Array(vec![value]),
}
}

fn encode_value(value: &Value) -> Result<Vec<u8>> {
fn encode_value(value: &Value, label: &str) -> Result<Vec<u8>> {
let mut encoded = Vec::new();
ciborium::into_writer(value, &mut encoded).context("encode positional action args as cbor")?;
ciborium::into_writer(value, &mut encoded)
.with_context(|| format!("encode positional {label} as cbor"))?;
Ok(encoded)
}

Expand Down Expand Up @@ -339,14 +345,17 @@ mod tests {
}

#[test]
fn positional_encode_has_ts_byte_parity() {
fn positional_encode_matches_ts_action_args() {
assert_eq!(
encode_positional(&NamedArgs {
first: "a".into(),
second: "b".into(),
})
.expect("encode named args"),
vec![0x82, 0x61, b'a', 0x61, b'b']
vec![
0x81, 0xa2, 0x65, b'f', b'i', b'r', b's', b't', 0x61, b'a', 0x66, b's', b'e', b'c',
b'o', b'n', b'd', 0x61, b'b',
]
);
assert_eq!(
encode_positional(&NewtypeArg(5)).expect("encode newtype arg"),
Expand Down Expand Up @@ -410,6 +419,13 @@ mod tests {
}))
.expect("decode named args from map");
assert_eq!(from_map, from_seq);

let from_single_map_arg = decode_positional::<NamedArgs>(&cbor(&vec![NamedArgs {
first: "a".into(),
second: "b".into(),
}]))
.expect("decode named args from single object arg");
assert_eq!(from_single_map_arg, from_seq);
}

#[test]
Expand All @@ -425,7 +441,7 @@ mod tests {
}

#[test]
fn positional_encode_leaves_nested_struct_as_map() {
fn positional_encode_wraps_named_struct_as_single_arg() {
let bytes = encode_positional(&WithNested {
nested: Nested {
value: 7,
Expand All @@ -440,9 +456,11 @@ mod tests {
let ciborium::Value::Array(values) = value else {
panic!("top-level args should be an array");
};
assert_eq!(values.len(), 2);
assert!(matches!(values[0], ciborium::Value::Map(_)));
assert!(matches!(values[1], ciborium::Value::Bool(true)));
assert_eq!(values.len(), 1);
let ciborium::Value::Map(fields) = &values[0] else {
panic!("named struct arg should remain a map");
};
assert_eq!(fields.len(), 2);
}

fn cbor<T: Serialize>(value: &T) -> Vec<u8> {
Expand Down
5 changes: 3 additions & 2 deletions rivetkit-rust/packages/rivetkit/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use rivetkit_core::{
use serde::{Serialize, de::DeserializeOwned};
use tokio_util::sync::CancellationToken;

use crate::action;
use crate::actor::Actor;
use crate::event::Event;
use crate::queue::Queue;
Expand Down Expand Up @@ -345,7 +346,7 @@ impl<A: Actor> Ctx<A> {
}

pub fn broadcast<E: Serialize>(&self, name: &str, event: &E) -> Result<()> {
let event_bytes = encode_cbor(event, "broadcast event")?;
let event_bytes = action::encode_varargs(event, "event args")?;
self.inner.broadcast(name, &event_bytes);
Ok(())
}
Expand Down Expand Up @@ -514,7 +515,7 @@ impl<A: Actor> ConnCtx<A> {
}

pub fn send<E: Serialize>(&self, name: &str, event: &E) -> Result<()> {
let event_bytes = encode_cbor(event, "connection event")?;
let event_bytes = action::encode_varargs(event, "connection event args")?;
self.inner.send(name, &event_bytes);
Ok(())
}
Expand Down
27 changes: 25 additions & 2 deletions rivetkit-rust/packages/rivetkit/src/typed_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,31 @@ pub(crate) fn encode_action_args<M: Action>(action: &M) -> Result<Vec<JsonValue>
}

fn decode_event<E: Event>(event: &ClientEvent) -> Result<E> {
ciborium::from_reader(Cursor::new(&event.raw_args))
.with_context(|| format!("decode typed event '{}'", E::NAME))
decode_event_args(&event.raw_args).with_context(|| format!("decode typed event '{}'", E::NAME))
}

fn decode_event_args<E: Event>(raw_args: &[u8]) -> Result<E> {
let value: CborValue =
ciborium::from_reader(Cursor::new(raw_args)).context("decode typed event args as cbor")?;
match value {
CborValue::Array(values) if values.is_empty() => {
crate::event::deserialize_cbor_value(CborValue::Null)
.map_err(|error| anyhow::anyhow!(error.to_string()))
.context("decode typed event from empty args")
}
CborValue::Array(mut values) if values.len() == 1 => {
let value = values.remove(0);
crate::event::deserialize_cbor_value(value)
.map_err(|error| anyhow::anyhow!(error.to_string()))
.context("decode typed event from single arg")
}
CborValue::Array(values) => crate::event::deserialize_cbor_value(CborValue::Array(values))
.map_err(|error| anyhow::anyhow!(error.to_string()))
.context("decode typed event from positional args"),
value => crate::event::deserialize_cbor_value(value)
.map_err(|error| anyhow::anyhow!(error.to_string()))
.context("decode typed event from legacy payload"),
}
}

fn cbor_to_json(value: CborValue) -> Result<JsonValue> {
Expand Down
4 changes: 2 additions & 2 deletions rivetkit-rust/packages/rivetkit/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,10 @@ async fn typed_event_connection(mut socket: WebSocket) {
socket
.send(connection_message(wire::ToClientBody::Event(wire::Event {
name: "notice".to_owned(),
args: cbor(&SiblingNotice {
args: cbor(&vec![SiblingNotice {
message: "typed-event".to_owned(),
count: 7,
}),
}]),
})))
.await
.unwrap();
Expand Down
42 changes: 35 additions & 7 deletions rivetkit-typescript/packages/rivetkit/src/registry/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,14 @@ function encodeValue(value: unknown): RuntimeBytes {
return encodeCborCompat(value as JsonCompatValue);
}

function normalizeArgs(value: unknown): unknown[] {
return Array.isArray(value)
? value
: value === undefined || value === null
? []
: [value];
}

function unwrapTsfnPayload<T>(error: unknown, payload: T): T {
if (error !== null && error !== undefined) {
throw error;
Expand Down Expand Up @@ -1096,11 +1104,7 @@ function wrapNativeCallback<Args extends Array<unknown>, Result>(

function decodeArgs(value?: RuntimeBytes | null): unknown[] {
const decoded = decodeValue<unknown>(value);
return Array.isArray(decoded)
? decoded
: decoded === undefined
? []
: [decoded];
return normalizeArgs(decoded);
}

function buildRequest(init: {
Expand Down Expand Up @@ -3837,14 +3841,38 @@ export function buildNativeFactory(
404,
);
}
const body = (await jsRequest.json()) as { args?: unknown[] };
const body = (await jsRequest.json()) as {
args?: unknown;
properties?: unknown;
};
if (body.args !== undefined && body.properties !== undefined) {
return jsonResponse(
{ error: "use either args or properties, not both" },
{ status: 400 },
);
}
if (
body.properties !== undefined &&
(body.properties === null ||
typeof body.properties !== "object" ||
Array.isArray(body.properties))
) {
return jsonResponse(
{ error: "properties must be an object" },
{ status: 400 },
);
}
const args =
body.properties !== undefined
? [body.properties]
: normalizeArgs(body.args);
try {
const output = await action(
actorCtx,
...validateActionArgs(
schemaConfig.actionInputSchemas,
actionName,
body.args ?? [],
args,
),
);
return jsonResponse({ output });
Expand Down
24 changes: 12 additions & 12 deletions website/src/content/docs/actors/quickstart/rust.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,17 @@ const client = createClient("http://localhost:6420");

const counter = client.counter.getOrCreate(["my-counter"]);

const count = await counter.increment(3);
const counterConnection = counter.connect();
counterConnection.on("newCount", (event) => {
console.log("Event count:", event.count);
});

const count = await counterConnection.increment(3);
console.log("New count:", count);

await counter.connect().increment(1);
await counterConnection.increment(1);
```

<Note>
Events emitted by a Rust actor with `ctx.emit(...)` are broadcast as a single serialized struct value. The TypeScript and React clients deliver event arguments positionally, so consuming a Rust struct event from JavaScript is not supported yet. Call actions across languages, and subscribe to events from Rust clients.
</Note>

See the [JavaScript client documentation](/docs/clients/javascript) for more information.

</Tab>
Expand All @@ -229,10 +230,13 @@ function Counter() {
});

const increment = async () => {
const next = await counter.connection?.increment(1);
if (typeof next === "number") setCount(next);
await counter.connection?.increment(1);
};

counter.useEvent("newCount", (event) => {
setCount(event.count);
});

return (
<div>
<p>Count: {count}</p>
Expand All @@ -242,10 +246,6 @@ function Counter() {
}
```

<Note>
Events emitted by a Rust actor with `ctx.emit(...)` are broadcast as a single serialized struct value. The TypeScript and React clients deliver event arguments positionally, so `useEvent` cannot consume a Rust struct event yet. This example reads the action return value instead.
</Note>

See the [React documentation](/docs/clients/react) for more information.

</Tab>
Expand Down
Loading
Loading