From bab56c93ffe3eb62e2e2db172c826e27721d88e8 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Fri, 16 Jan 2026 23:43:09 -0800 Subject: [PATCH] feat: kv api --- CLAUDE.md | 1 + .../rivetkit/fixtures/driver-test-suite/kv.ts | 43 +++ .../fixtures/driver-test-suite/registry.ts | 3 + .../rivetkit/src/actor/contexts/base/actor.ts | 12 + .../src/actor/instance/connection-manager.ts | 2 +- .../rivetkit/src/actor/instance/keys.ts | 29 ++ .../rivetkit/src/actor/instance/kv.ts | 254 +++++++++++++++++- .../rivetkit/src/actor/instance/mod.ts | 2 +- .../src/actor/instance/state-manager.ts | 2 +- .../packages/rivetkit/src/actor/mod.ts | 3 +- .../packages/rivetkit/src/client/mod.ts | 2 +- .../rivetkit/src/driver-helpers/utils.ts | 2 +- .../rivetkit/src/driver-test-suite/mod.ts | 3 + .../src/driver-test-suite/tests/actor-kv.ts | 44 +++ .../src/drivers/engine/actor-driver.ts | 6 +- 15 files changed, 385 insertions(+), 23 deletions(-) create mode 100644 rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/kv.ts create mode 100644 rivetkit-typescript/packages/rivetkit/src/actor/instance/keys.ts create mode 100644 rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-kv.ts diff --git a/CLAUDE.md b/CLAUDE.md index e64cdb0ad9..0705f72f56 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -180,6 +180,7 @@ Data structures often include: ## Testing Guidelines - When running tests, always pipe the test to a file in /tmp/ then grep it in a second step. You can grep test logs multiple times to search for different log lines. +- For RivetKit TypeScript tests, run from `rivetkit-typescript/packages/rivetkit` and use `pnpm test ` with `-t` to narrow to specific suites. For example: `pnpm test driver-file-system -t ".*Actor KV.*"`. ## Optimizations diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/kv.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/kv.ts new file mode 100644 index 0000000000..60ae2cd3e1 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/kv.ts @@ -0,0 +1,43 @@ +import { actor, type ActorContext } from "rivetkit"; + +export const kvActor = actor({ + actions: { + putText: async ( + c: ActorContext, + key: string, + value: string, + ) => { + await c.kv.put(key, value); + return true; + }, + getText: async ( + c: ActorContext, + key: string, + ) => { + return await c.kv.get(key); + }, + listText: async ( + c: ActorContext, + prefix: string, + ) => { + const results = await c.kv.list(prefix, { keyType: "text" }); + return results.map(([key, value]) => ({ + key, + value, + })); + }, + roundtripArrayBuffer: async ( + c: ActorContext, + key: string, + values: number[], + ) => { + const buffer = new Uint8Array(values).buffer; + await c.kv.put(key, buffer, { type: "arrayBuffer" }); + const result = await c.kv.get(key, { type: "arrayBuffer" }); + if (!result) { + return null; + } + return Array.from(new Uint8Array(result)); + }, + }, +}); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts index d7136b39f2..7371a272d3 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts @@ -22,6 +22,7 @@ import { destroyActor, destroyObserver } from "./destroy"; import { customTimeoutActor, errorHandlingActor } from "./error-handling"; import { hibernationActor } from "./hibernation"; import { inlineClientActor } from "./inline-client"; +import { kvActor } from "./kv"; import { largePayloadActor, largePayloadConnActor } from "./large-payloads"; import { counterWithLifecycle } from "./lifecycle"; import { metadataActor } from "./metadata"; @@ -72,6 +73,8 @@ export const registry = setup({ customTimeoutActor, // From inline-client.ts inlineClientActor, + // From kv.ts + kvActor, // From action-inputs.ts inputActor, // From action-timeout.ts diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts index ed15eedb5c..da47afe0b0 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts @@ -6,6 +6,7 @@ import type { Conn, ConnId } from "../../conn/mod"; import type { AnyDatabaseProvider, InferDatabaseClient } from "../../database"; import type { ActorDefinition, AnyActorDefinition } from "../../definition"; import type { ActorInstance, SaveStateOptions } from "../../instance/mod"; +import { ActorKv } from "../../instance/kv"; import type { Schedule } from "../../schedule"; /** @@ -27,6 +28,7 @@ export class ActorContext< TInput, TDatabase >; + #kv: ActorKv | undefined; constructor( actor: ActorInstance< @@ -41,6 +43,16 @@ export class ActorContext< this.#actor = actor; } + /** + * Gets the KV storage interface. + */ + get kv(): ActorKv { + if (!this.#kv) { + this.#kv = new ActorKv(this.#actor.driver, this.#actor.id); + } + return this.#kv; + } + /** * Get the actor state * diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts index 6866340b05..7910ee0366 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts @@ -31,7 +31,7 @@ import { import type { AnyDatabaseProvider } from "../database"; import { CachedSerializer } from "../protocol/serde"; import { deadline } from "../utils"; -import { makeConnKey } from "./kv"; +import { makeConnKey } from "./keys"; import type { ActorInstance } from "./mod"; /** * Manages all connection-related operations for an actor instance. diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/keys.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/keys.ts new file mode 100644 index 0000000000..641cfd027c --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/keys.ts @@ -0,0 +1,29 @@ +export const KEYS = { + PERSIST_DATA: Uint8Array.from([1]), + CONN_PREFIX: Uint8Array.from([2]), // Prefix for connection keys + INSPECTOR_TOKEN: Uint8Array.from([3]), // Inspector token key + KV: Uint8Array.from([4]), // Prefix for user-facing KV storage +}; + +// Helper to create a prefixed key for user-facing KV storage +export function makePrefixedKey(key: Uint8Array): Uint8Array { + const prefixed = new Uint8Array(KEYS.KV.length + key.length); + prefixed.set(KEYS.KV, 0); + prefixed.set(key, KEYS.KV.length); + return prefixed; +} + +// Helper to remove the prefix from a key +export function removePrefixFromKey(prefixedKey: Uint8Array): Uint8Array { + return prefixedKey.slice(KEYS.KV.length); +} + +// Helper to create a connection key +export function makeConnKey(connId: string): Uint8Array { + const encoder = new TextEncoder(); + const connIdBytes = encoder.encode(connId); + const key = new Uint8Array(KEYS.CONN_PREFIX.length + connIdBytes.length); + key.set(KEYS.CONN_PREFIX, 0); + key.set(connIdBytes, KEYS.CONN_PREFIX.length); + return key; +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/kv.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/kv.ts index d046f6bbab..6aed05d770 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/kv.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/kv.ts @@ -1,15 +1,241 @@ -export const KEYS = { - PERSIST_DATA: Uint8Array.from([1]), - CONN_PREFIX: Uint8Array.from([2]), // Prefix for connection keys - INSPECTOR_TOKEN: Uint8Array.from([3]), // Inspector token key -}; - -// Helper to create a connection key -export function makeConnKey(connId: string): Uint8Array { - const encoder = new TextEncoder(); - const connIdBytes = encoder.encode(connId); - const key = new Uint8Array(KEYS.CONN_PREFIX.length + connIdBytes.length); - key.set(KEYS.CONN_PREFIX, 0); - key.set(connIdBytes, KEYS.CONN_PREFIX.length); - return key; +import type { ActorDriver } from "../driver"; +import { makePrefixedKey, removePrefixFromKey } from "./keys"; + +/** + * User-facing KV storage interface exposed on ActorContext. + */ +type KvValueType = "text" | "arrayBuffer" | "binary"; +type KvKeyType = "text" | "binary"; +type KvKey = Uint8Array | string; + +type KvValueTypeMap = { + text: string; + arrayBuffer: ArrayBuffer; + binary: Uint8Array; +}; + +type KvKeyTypeMap = { + text: string; + binary: Uint8Array; +}; + +type KvValueOptions = { + type?: T; +}; + +type KvListOptions< + T extends KvValueType = "text", + K extends KvKeyType = "text", +> = KvValueOptions & { + keyType?: K; +}; + +const textEncoder = new TextEncoder(); +const textDecoder = new TextDecoder(); + +function encodeKey( + key: KvKeyTypeMap[K], + keyType?: K, +): Uint8Array { + if (key instanceof Uint8Array) { + return key; + } + const resolvedKeyType = keyType ?? "text"; + if (resolvedKeyType === "binary") { + throw new TypeError("Expected a Uint8Array when keyType is binary"); + } + return textEncoder.encode(key); +} + +function decodeKey( + key: Uint8Array, + keyType?: K, +): KvKeyTypeMap[K] { + const resolvedKeyType = keyType ?? "text"; + switch (resolvedKeyType) { + case "text": + return textDecoder.decode(key) as KvKeyTypeMap[K]; + case "binary": + return key as KvKeyTypeMap[K]; + default: + throw new TypeError("Invalid kv key type"); + } +} + +function resolveValueType( + value: string | Uint8Array | ArrayBuffer, +): KvValueType { + if (typeof value === "string") { + return "text"; + } + if (value instanceof Uint8Array) { + return "binary"; + } + if (value instanceof ArrayBuffer) { + return "arrayBuffer"; + } + throw new TypeError("Invalid kv value"); +} + +function encodeValue( + value: KvValueTypeMap[T], + options?: KvValueOptions, +): Uint8Array { + const type = + options?.type ?? + resolveValueType(value as string | Uint8Array | ArrayBuffer); + switch (type) { + case "text": + if (typeof value !== "string") { + throw new TypeError("Expected a string when type is text"); + } + return textEncoder.encode(value); + case "arrayBuffer": + if (!(value instanceof ArrayBuffer)) { + throw new TypeError("Expected an ArrayBuffer when type is arrayBuffer"); + } + return new Uint8Array(value); + case "binary": + if (!(value instanceof Uint8Array)) { + throw new TypeError("Expected a Uint8Array when type is binary"); + } + return value; + default: + throw new TypeError("Invalid kv value type"); + } +} + +function decodeValue( + value: Uint8Array, + options?: KvValueOptions, +): KvValueTypeMap[T] { + const type = options?.type ?? "text"; + switch (type) { + case "text": + return textDecoder.decode(value) as KvValueTypeMap[T]; + case "arrayBuffer": { + const copy = new Uint8Array(value.byteLength); + copy.set(value); + return copy.buffer as KvValueTypeMap[T]; + } + case "binary": + return value as KvValueTypeMap[T]; + default: + throw new TypeError("Invalid kv value type"); + } +} + +export class ActorKv { + #driver: ActorDriver; + #actorId: string; + + constructor(driver: ActorDriver, actorId: string) { + this.#driver = driver; + this.#actorId = actorId; + } + + /** + * Get a single value by key. + */ + async get( + key: KvKey, + options?: KvValueOptions, + ): Promise { + const results = await this.#driver.kvBatchGet(this.#actorId, [ + makePrefixedKey(encodeKey(key)), + ]); + const result = results[0]; + if (!result) { + return null; + } + return decodeValue(result, options); + } + + /** + * Get multiple values by keys. + */ + async getBatch( + keys: KvKey[], + options?: KvValueOptions, + ): Promise<(KvValueTypeMap[T] | null)[]> { + const prefixedKeys = keys.map((key) => + makePrefixedKey(encodeKey(key)), + ); + const results = await this.#driver.kvBatchGet( + this.#actorId, + prefixedKeys, + ); + return results.map((result) => + result ? decodeValue(result, options) : null, + ); + } + + /** + * Put a single key-value pair. + */ + async put( + key: KvKey, + value: KvValueTypeMap[T], + options?: KvValueOptions, + ): Promise { + await this.#driver.kvBatchPut(this.#actorId, [ + [makePrefixedKey(encodeKey(key)), encodeValue(value, options)], + ]); + } + + /** + * Put multiple key-value pairs. + */ + async putBatch( + entries: [KvKey, KvValueTypeMap[T]][], + options?: KvValueOptions, + ): Promise { + const prefixedEntries: [Uint8Array, Uint8Array][] = entries.map( + ([key, value]) => [ + makePrefixedKey(encodeKey(key)), + encodeValue(value, options), + ], + ); + await this.#driver.kvBatchPut(this.#actorId, prefixedEntries); + } + + /** + * Delete a single key. + */ + async delete(key: KvKey): Promise { + await this.#driver.kvBatchDelete(this.#actorId, [ + makePrefixedKey(encodeKey(key)), + ]); + } + + /** + * Delete multiple keys. + */ + async deleteBatch(keys: KvKey[]): Promise { + const prefixedKeys = keys.map((key) => + makePrefixedKey(encodeKey(key)), + ); + await this.#driver.kvBatchDelete(this.#actorId, prefixedKeys); + } + + /** + * List all keys with a given prefix. + * Returns key-value pairs where keys have the user prefix removed. + */ + async list( + prefix: KvKeyTypeMap[K], + options?: KvListOptions, + ): Promise<[KvKeyTypeMap[K], KvValueTypeMap[T]][]> { + const prefixedPrefix = makePrefixedKey( + encodeKey(prefix, options?.keyType), + ); + const results = await this.#driver.kvListPrefix( + this.#actorId, + prefixedPrefix, + ); + return results.map(([key, value]) => [ + decodeKey(removePrefixFromKey(key), options?.keyType), + decodeValue(value, options), + ]); + } } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index 94b04f0d2e..0e02ab08d0 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -44,7 +44,7 @@ import { } from "../utils"; import { ConnectionManager } from "./connection-manager"; import { EventManager } from "./event-manager"; -import { KEYS } from "./kv"; +import { KEYS } from "./keys"; import { convertActorFromBarePersisted, type PersistedActor, diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts index b0ac6288a1..0bdf34e500 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts @@ -12,7 +12,7 @@ import { convertConnToBarePersistedConn } from "../conn/persisted"; import type { ActorDriver } from "../driver"; import * as errors from "../errors"; import { isConnStatePath, isStatePath } from "../utils"; -import { KEYS, makeConnKey } from "./kv"; +import { KEYS, makeConnKey } from "./keys"; import type { ActorInstance } from "./mod"; import { convertActorToBarePersisted, type PersistedActor } from "./persisted"; diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts index eee47db48f..d9906fbc9a 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/mod.ts @@ -76,7 +76,8 @@ export type { AnyConn, Conn } from "./conn/mod"; export type { ActorDefinition, AnyActorDefinition } from "./definition"; export { lookupInRegistry } from "./definition"; export { UserError, type UserErrorOptions } from "./errors"; -export { KEYS as KV_KEYS } from "./instance/kv"; +export { KEYS as KV_KEYS } from "./instance/keys"; +export { ActorKv } from "./instance/kv"; export type { AnyActorInstance } from "./instance/mod"; export { type ActorRouter, diff --git a/rivetkit-typescript/packages/rivetkit/src/client/mod.ts b/rivetkit-typescript/packages/rivetkit/src/client/mod.ts index db76f03a46..aa9a0e5cd1 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/mod.ts @@ -22,7 +22,7 @@ export { ManagerError, } from "@/client/errors"; export type { CreateRequest } from "@/manager/protocol/query"; -export { KEYS as KV_KEYS } from "../actor/instance/kv"; +export { KEYS as KV_KEYS } from "../actor/instance/keys"; export type { ActorActionFunction } from "./actor-common"; export type { ActorConn, diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-helpers/utils.ts b/rivetkit-typescript/packages/rivetkit/src/driver-helpers/utils.ts index b7dccd49ed..709956d070 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-helpers/utils.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-helpers/utils.ts @@ -1,5 +1,5 @@ import * as cbor from "cbor-x"; -import { KEYS } from "@/actor/instance/kv"; +import { KEYS } from "@/actor/instance/keys"; import type * as persistSchema from "@/schemas/actor-persist/mod"; import { ACTOR_VERSIONED, diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts index 7f2f691b43..a4eaa6490b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts @@ -22,6 +22,7 @@ import { runActorErrorHandlingTests } from "./tests/actor-error-handling"; import { runActorHandleTests } from "./tests/actor-handle"; import { runActorInlineClientTests } from "./tests/actor-inline-client"; import { runActorInspectorTests } from "./tests/actor-inspector"; +import { runActorKvTests } from "./tests/actor-kv"; import { runActorMetadataTests } from "./tests/actor-metadata"; import { runActorOnStateChangeTests } from "./tests/actor-onstatechange"; import { runActorVarsTests } from "./tests/actor-vars"; @@ -122,6 +123,8 @@ export function runDriverTests( runActorInlineClientTests(driverTestConfig); + runActorKvTests(driverTestConfig); + runRawHttpTests(driverTestConfig); runRawHttpRequestPropertiesTests(driverTestConfig); diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-kv.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-kv.ts new file mode 100644 index 0000000000..4c05cb55b7 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-kv.ts @@ -0,0 +1,44 @@ +import type { DriverTestConfig } from "../mod"; +import { setupDriverTest } from "../utils"; +import { describe, expect, test, type TestContext } from "vitest"; + +export function runActorKvTests(driverTestConfig: DriverTestConfig) { + describe("Actor KV Tests", () => { + test("supports text encoding and decoding", async (c: TestContext) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const kvHandle = client.kvActor.getOrCreate(["kv-text"]); + + await kvHandle.putText("greeting", "hello"); + const value = await kvHandle.getText("greeting"); + expect(value).toBe("hello"); + + await kvHandle.putText("prefix-a", "alpha"); + await kvHandle.putText("prefix-b", "beta"); + + const results = await kvHandle.listText("prefix-"); + const sorted = results.sort((a, b) => a.key.localeCompare(b.key)); + expect(sorted).toEqual([ + { key: "prefix-a", value: "alpha" }, + { key: "prefix-b", value: "beta" }, + ]); + }); + + test( + "supports arrayBuffer encoding and decoding", + async (c: TestContext) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const kvHandle = client.kvActor.getOrCreate(["kv-array-buffer"]); + + const values = await kvHandle.roundtripArrayBuffer("bytes", [ + 4, + 8, + 15, + 16, + 23, + 42, + ]); + expect(values).toEqual([4, 8, 15, 16, 23, 42]); + }, + ); + }); +} diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index 4a7a5c9f8f..a16896691b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -11,7 +11,7 @@ import { WSContext, type WSContextInit } from "hono/ws"; import invariant from "invariant"; import { type AnyConn, CONN_STATE_MANAGER_SYMBOL } from "@/actor/conn/mod"; import { lookupInRegistry } from "@/actor/definition"; -import { KEYS } from "@/actor/instance/kv"; +import { KEYS } from "@/actor/instance/keys"; import { deserializeActorKey } from "@/actor/keys"; import { getValueLength } from "@/actor/protocol/old"; import { type ActorRouter, createActorRouter } from "@/actor/router"; @@ -153,7 +153,7 @@ export class EngineActorDriver implements ActorDriver { onConnected: () => { this.#runnerStarted.resolve(undefined); }, - onDisconnected: (_code, _reason) => { }, + onDisconnected: (_code, _reason) => {}, onShutdown: () => { this.#runnerStopped.resolve(undefined); this.#isRunnerStopped = true; @@ -358,7 +358,7 @@ export class EngineActorDriver implements ActorDriver { async serverlessHandleStart(c: HonoContext): Promise { return streamSSE(c, async (stream) => { // NOTE: onAbort does not work reliably - stream.onAbort(() => { }); + stream.onAbort(() => {}); c.req.raw.signal.addEventListener("abort", () => { logger().debug("SSE aborted, shutting down runner");