diff --git a/examples/ai-generated-actor/src/actors/dynamic-runner.ts b/examples/ai-generated-actor/src/actors/dynamic-runner.ts index a1cbf41fb1..405729c670 100644 --- a/examples/ai-generated-actor/src/actors/dynamic-runner.ts +++ b/examples/ai-generated-actor/src/actors/dynamic-runner.ts @@ -4,7 +4,7 @@ import { dynamicActor } from "rivetkit/dynamic"; export const dynamicRunner = dynamicActor({ load: async (c: any) => { // Extract the coding agent that owns the code - const codeAgentKey = c.key.slice(0, -1); + const codeAgentKey = c.actorKey.slice(0, -1); const client = await c.client(); const state = await client.codeAgent .getOrCreate(codeAgentKey) diff --git a/examples/collaborative-document/src/index.ts b/examples/collaborative-document/src/index.ts index 808519a258..f253df6597 100644 --- a/examples/collaborative-document/src/index.ts +++ b/examples/collaborative-document/src/index.ts @@ -114,7 +114,7 @@ export const documentList = actor({ createDocument: async (c, title: string) => { const documentId = randomUUID(); const createdAt = Date.now(); - const workspaceId = c.key[0] ?? "default"; + const workspaceId = c.actorKey[0] ?? "default"; const safeTitle = title.trim() || "Untitled document"; const client = c.client(); diff --git a/examples/kitchen-sink/src/actors/lifecycle/destroy.ts b/examples/kitchen-sink/src/actors/lifecycle/destroy.ts index 187560435f..20fcab97d2 100644 --- a/examples/kitchen-sink/src/actors/lifecycle/destroy.ts +++ b/examples/kitchen-sink/src/actors/lifecycle/destroy.ts @@ -20,7 +20,7 @@ export const destroyActor = actor({ state: { value: 0, key: "" }, onWake: (c) => { // Store the actor key so we can reference it in onDestroy - c.state.key = c.key.join("/"); + c.state.key = c.actorKey.join("/"); }, onDestroy: async (c) => { const client = c.client(); diff --git a/examples/kitchen-sink/src/actors/state/metadata.ts b/examples/kitchen-sink/src/actors/state/metadata.ts index 7d8641d817..d5c946e378 100644 --- a/examples/kitchen-sink/src/actors/state/metadata.ts +++ b/examples/kitchen-sink/src/actors/state/metadata.ts @@ -13,7 +13,7 @@ export const metadataActor = actor({ }, onWake: (c) => { // Store the actor name during initialization - c.state.actorName = c.name; + c.state.actorName = c.actorName; }, actions: { // Set up test tags - this will be called by tests to simulate tags @@ -32,7 +32,7 @@ export const metadataActor = actor({ getMetadata: (c) => { // Create metadata object from stored values const metadata = { - name: c.name, + name: c.actorName, tags: c.state.storedTags, region: c.state.storedRegion, }; @@ -44,7 +44,7 @@ export const metadataActor = actor({ // Get the actor name getActorName: (c) => { - return c.name; + return c.actorName; }, // Get a specific tag by key diff --git a/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts b/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts index 631c4ce03c..17a3f26cc8 100644 --- a/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts +++ b/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts @@ -304,8 +304,8 @@ export const mockAgenticLoop = actor({ await recordDebugEvent(c, { name: "onWake", details: { - key: c.key, - name: c.name, + key: c.actorKey, + name: c.actorName, }, }); }, diff --git a/examples/kitchen-sink/src/actors/workflow/_helpers.ts b/examples/kitchen-sink/src/actors/workflow/_helpers.ts deleted file mode 100644 index 3c6eed304c..0000000000 --- a/examples/kitchen-sink/src/actors/workflow/_helpers.ts +++ /dev/null @@ -1,12 +0,0 @@ -// Type helper - cast loop context to access actor-specific properties -// Only call these helpers INSIDE a step callback where state access is allowed -// biome-ignore lint/suspicious/noExplicitAny: Workflow context typing workaround -export type ActorLoopContext = { - state: S; - broadcast: (name: string, ...args: unknown[]) => void; -}; - -// biome-ignore lint/suspicious/noExplicitAny: Workflow context typing workaround -export function actorCtx(ctx: unknown): ActorLoopContext { - return ctx as any; -} diff --git a/examples/kitchen-sink/src/actors/workflow/approval.ts b/examples/kitchen-sink/src/actors/workflow/approval.ts index 57a9270ac8..e6b9a4c49b 100644 --- a/examples/kitchen-sink/src/actors/workflow/approval.ts +++ b/examples/kitchen-sink/src/actors/workflow/approval.ts @@ -4,7 +4,6 @@ import { actor, event, queue } from "rivetkit"; import { Loop, workflow } from "rivetkit/workflow"; -import { actorCtx } from "./_helpers.ts"; export type RequestStatus = "pending" | "approved" | "rejected" | "timeout"; @@ -19,8 +18,6 @@ export type ApprovalRequest = { deciding?: boolean; // True when a decision is being processed }; -type State = ApprovalRequest; - const QUEUE_DECISION = "decision" as const; const APPROVAL_TIMEOUT_MS = 30000; @@ -37,7 +34,7 @@ type ApprovalDecision = { export const approval = actor({ createState: (c, input?: ApprovalRequestInput): ApprovalRequest => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, title: input?.title ?? "Untitled Request", description: input?.description ?? "", status: "pending", @@ -71,10 +68,8 @@ export const approval = actor({ run: workflow(async (ctx) => { await ctx.loop("approval-loop", async (loopCtx) => { - const c = actorCtx(loopCtx); - - await loopCtx.step("init-request", async () => { - ctx.log.info({ + await loopCtx.step("init-request", async (c) => { + c.log.info({ msg: "waiting for approval decision", requestId: c.state.id, title: c.state.title, @@ -91,18 +86,18 @@ export const approval = actor({ ); const decision = decisionMessage?.body ?? null; - await loopCtx.step("update-status", async () => { + await loopCtx.step("update-status", async (c) => { c.state.deciding = false; if (decision === null) { c.state.status = "timeout"; - ctx.log.info({ + c.log.info({ msg: "request timed out", requestId: c.state.id, }); } else if (decision.approved) { c.state.status = "approved"; c.state.decidedBy = decision.approver; - ctx.log.info({ + c.log.info({ msg: "request approved", requestId: c.state.id, approver: decision.approver, @@ -110,7 +105,7 @@ export const approval = actor({ } else { c.state.status = "rejected"; c.state.decidedBy = decision.approver; - ctx.log.info({ + c.log.info({ msg: "request rejected", requestId: c.state.id, approver: decision.approver, diff --git a/examples/kitchen-sink/src/actors/workflow/batch.ts b/examples/kitchen-sink/src/actors/workflow/batch.ts index 44a431ecb1..3ab762fdc5 100644 --- a/examples/kitchen-sink/src/actors/workflow/batch.ts +++ b/examples/kitchen-sink/src/actors/workflow/batch.ts @@ -4,7 +4,6 @@ import { actor, event } from "rivetkit"; import { Loop, workflow } from "rivetkit/workflow"; -import { actorCtx } from "./_helpers.ts"; export type BatchInfo = { id: number; @@ -24,8 +23,6 @@ export type BatchJob = { completedAt?: number; }; -type State = BatchJob; - function fetchBatch( cursor: number, batchSize: number, @@ -50,7 +47,7 @@ export type BatchJobInput = { export const batch = actor({ createState: (c, input?: BatchJobInput): BatchJob => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, totalItems: input?.totalItems ?? 50, batchSize: input?.batchSize ?? 5, status: "running", @@ -77,10 +74,8 @@ export const batch = actor({ name: "batch-loop", state: { cursor: 0 }, run: async (batchCtx, loopState: { cursor: number }) => { - const c = actorCtx(batchCtx); - - const batch = await batchCtx.step("fetch-batch", async () => { - ctx.log.info({ + const batch = await batchCtx.step("fetch-batch", async (c) => { + c.log.info({ msg: "processing batch", jobId: c.state.id, cursor: loopState.cursor, @@ -95,7 +90,7 @@ export const batch = actor({ ); }); - await batchCtx.step("process-batch", async () => { + await batchCtx.step("process-batch", async (c) => { await new Promise((r) => setTimeout(r, 300 + Math.random() * 500), ); @@ -113,7 +108,7 @@ export const batch = actor({ c.broadcast("batchProcessed", batchInfo); c.broadcast("stateChanged", c.state); - ctx.log.info({ + c.log.info({ msg: "batch processed", jobId: c.state.id, cursor: loopState.cursor, @@ -122,7 +117,7 @@ export const batch = actor({ }); if (!batch.hasMore) { - await batchCtx.step("mark-complete", async () => { + await batchCtx.step("mark-complete", async (c) => { c.state.status = "completed"; c.state.completedAt = Date.now(); c.broadcast("stateChanged", c.state); diff --git a/examples/kitchen-sink/src/actors/workflow/dashboard.ts b/examples/kitchen-sink/src/actors/workflow/dashboard.ts index ea70e773fe..81afecad91 100644 --- a/examples/kitchen-sink/src/actors/workflow/dashboard.ts +++ b/examples/kitchen-sink/src/actors/workflow/dashboard.ts @@ -3,7 +3,6 @@ import { actor, event, queue } from "rivetkit"; import { Loop, workflow } from "rivetkit/workflow"; -import { actorCtx } from "./_helpers.ts"; export type UserStats = { count: number; @@ -43,8 +42,6 @@ export type DashboardState = { lastRefresh: number | null; }; -type State = DashboardState; - const QUEUE_REFRESH = "refresh"; type RefreshMessage = Record; @@ -115,8 +112,6 @@ export const dashboard = actor({ run: workflow(async (ctx) => { await ctx.loop("refresh-loop", async (loopCtx) => { - const c = actorCtx(loopCtx); - await loopCtx.queue.next("wait-refresh", { names: [QUEUE_REFRESH], }); @@ -126,9 +121,7 @@ export const dashboard = actor({ const results = await loopCtx.join("fetch-all", { users: { run: async (branchCtx) => { - const bc = actorCtx(branchCtx); - - await branchCtx.step("mark-running", async () => { + await branchCtx.step("mark-running", async (bc) => { bc.state.branches.users = "running"; bc.broadcast("stateChanged", bc.state); }); @@ -140,7 +133,7 @@ export const dashboard = actor({ }, ); - await branchCtx.step("mark-complete", async () => { + await branchCtx.step("mark-complete", async (bc) => { bc.state.branches.users = "completed"; bc.broadcast("stateChanged", bc.state); }); @@ -150,9 +143,7 @@ export const dashboard = actor({ }, orders: { run: async (branchCtx) => { - const bc = actorCtx(branchCtx); - - await branchCtx.step("mark-running", async () => { + await branchCtx.step("mark-running", async (bc) => { bc.state.branches.orders = "running"; bc.broadcast("stateChanged", bc.state); }); @@ -164,7 +155,7 @@ export const dashboard = actor({ }, ); - await branchCtx.step("mark-complete", async () => { + await branchCtx.step("mark-complete", async (bc) => { bc.state.branches.orders = "completed"; bc.broadcast("stateChanged", bc.state); }); @@ -174,9 +165,7 @@ export const dashboard = actor({ }, metrics: { run: async (branchCtx) => { - const bc = actorCtx(branchCtx); - - await branchCtx.step("mark-running", async () => { + await branchCtx.step("mark-running", async (bc) => { bc.state.branches.metrics = "running"; bc.broadcast("stateChanged", bc.state); }); @@ -188,7 +177,7 @@ export const dashboard = actor({ }, ); - await branchCtx.step("mark-complete", async () => { + await branchCtx.step("mark-complete", async (bc) => { bc.state.branches.metrics = "completed"; bc.broadcast("stateChanged", bc.state); }); @@ -198,7 +187,7 @@ export const dashboard = actor({ }, }); - await loopCtx.step("save-data", async () => { + await loopCtx.step("save-data", async (c) => { c.state.data = { users: results.users, orders: results.orders, diff --git a/examples/kitchen-sink/src/actors/workflow/history-examples.ts b/examples/kitchen-sink/src/actors/workflow/history-examples.ts index b7db948673..da0bb14334 100644 --- a/examples/kitchen-sink/src/actors/workflow/history-examples.ts +++ b/examples/kitchen-sink/src/actors/workflow/history-examples.ts @@ -1,6 +1,5 @@ import { actor, queue } from "rivetkit"; import { Loop, workflow } from "rivetkit/workflow"; -import { actorCtx } from "./_helpers.ts"; function delay(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); @@ -17,15 +16,14 @@ export type WorkflowHistorySimpleState = { export const workflowHistorySimple = actor({ createState: (c): WorkflowHistorySimpleState => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, status: "pending", }), actions: { getState: (c): WorkflowHistorySimpleState => c.state, }, run: workflow(async (ctx) => { - await ctx.step("start", async () => { - const c = actorCtx(ctx); + await ctx.step("start", async (c) => { c.state.status = "running"; c.state.lastStep = "start"; c.state.startedAt = Date.now(); @@ -34,24 +32,21 @@ export const workflowHistorySimple = actor({ await delay(700); - await ctx.step("process", async () => { - const c = actorCtx(ctx); + await ctx.step("process", async (c) => { c.state.lastStep = "process"; return { processed: true, items: 5 }; }); await delay(2200); - await ctx.step("validate", async () => { - const c = actorCtx(ctx); + await ctx.step("validate", async (c) => { c.state.lastStep = "validate"; return { valid: true }; }); await delay(600); - await ctx.step("complete", async () => { - const c = actorCtx(ctx); + await ctx.step("complete", async (c) => { c.state.lastStep = "complete"; c.state.status = "completed"; c.state.completedAt = Date.now(); @@ -73,7 +68,7 @@ export type WorkflowHistoryLoopState = { export const workflowHistoryLoop = actor({ createState: (c): WorkflowHistoryLoopState => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, status: "running", processed: 0, batches: [], @@ -82,8 +77,7 @@ export const workflowHistoryLoop = actor({ getState: (c): WorkflowHistoryLoopState => c.state, }, run: workflow(async (ctx) => { - await ctx.step("init", async () => { - const c = actorCtx(ctx); + await ctx.step("init", async (c) => { c.state.status = "running"; return { batchSize: LOOP_ITEMS.length }; }); @@ -97,8 +91,7 @@ export const workflowHistoryLoop = actor({ run: async (loopCtx, loopState: { index: number }) => { const item = LOOP_ITEMS[loopState.index]; - await loopCtx.step(`process-${loopState.index}`, async () => { - const c = actorCtx(loopCtx); + await loopCtx.step(`process-${loopState.index}`, async (c) => { c.state.processed += 1; c.state.batches.push({ index: loopState.index, item }); return { item, status: "done" }; @@ -112,8 +105,7 @@ export const workflowHistoryLoop = actor({ }, }); - await ctx.step("finalize", async () => { - const c = actorCtx(ctx); + await ctx.step("finalize", async (c) => { c.state.status = "completed"; c.state.completedAt = Date.now(); return { allProcessed: true }; @@ -133,15 +125,14 @@ export type WorkflowHistoryJoinState = { export const workflowHistoryJoin = actor({ createState: (c): WorkflowHistoryJoinState => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, status: "pending", }), actions: { getState: (c): WorkflowHistoryJoinState => c.state, }, run: workflow(async (ctx) => { - await ctx.step("start", async () => { - const c = actorCtx(ctx); + await ctx.step("start", async (c) => { c.state.status = "running"; return { ready: true }; }); @@ -176,8 +167,7 @@ export const workflowHistoryJoin = actor({ }, }); - await ctx.step("merge-results", async () => { - const c = actorCtx(ctx); + await ctx.step("merge-results", async (c) => { c.state.status = "completed"; c.state.result = { api: results["fetch-api"].data, @@ -198,15 +188,14 @@ export type WorkflowHistoryRaceState = { export const workflowHistoryRace = actor({ createState: (c): WorkflowHistoryRaceState => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, status: "running", }), actions: { getState: (c): WorkflowHistoryRaceState => c.state, }, run: workflow(async (ctx) => { - await ctx.step("begin", async () => { - const c = actorCtx(ctx); + await ctx.step("begin", async (c) => { c.state.status = "running"; return { started: true }; }); @@ -231,8 +220,7 @@ export const workflowHistoryRace = actor({ }, ]); - await ctx.step("use-result", async () => { - const c = actorCtx(ctx); + await ctx.step("use-result", async (c) => { c.state.status = "completed"; c.state.winner = winner; c.state.result = value.provider; @@ -293,7 +281,7 @@ const FULL_WORKFLOW_ITEMS = [ export const workflowHistoryFull = actor({ createState: (c): WorkflowHistoryFullState => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, status: "pending", seededMessages: false, }), @@ -316,8 +304,7 @@ export const workflowHistoryFull = actor({ }, }, run: workflow(async (ctx) => { - await ctx.step("bootstrap", async () => { - const c = actorCtx(ctx); + await ctx.step("bootstrap", async (c) => { c.state.status = "running"; c.state.lastStep = "bootstrap"; c.state.startedAt = Date.now(); @@ -327,16 +314,14 @@ export const workflowHistoryFull = actor({ }; }); - await ctx.step("validate-input", async () => { - const c = actorCtx(ctx); + await ctx.step("validate-input", async (c) => { c.state.lastStep = "validate-input"; return true; }); await ctx.rollbackCheckpoint("checkpoint-after-validation"); - await ctx.step("load-user-profile", async () => { - const c = actorCtx(ctx); + await ctx.step("load-user-profile", async (c) => { c.state.lastStep = "load-user-profile"; return { id: "user-123", @@ -345,14 +330,12 @@ export const workflowHistoryFull = actor({ }; }); - await ctx.step("compute-discount", async () => { - const c = actorCtx(ctx); + await ctx.step("compute-discount", async (c) => { c.state.lastStep = "compute-discount"; return { percent: 5, reason: "tier-discount" }; }); - await ctx.step("ephemeral-cache-check", async () => { - const c = actorCtx(ctx); + await ctx.step("ephemeral-cache-check", async (c) => { c.state.lastStep = "ephemeral-cache-check"; return { cacheHit: false, tier: "standard" }; }); @@ -504,8 +487,7 @@ export const workflowHistoryFull = actor({ await ctx.removed("legacy-step-placeholder", "step"); - await ctx.step("finalize", async () => { - const c = actorCtx(ctx); + await ctx.step("finalize", async (c) => { c.state.lastStep = "finalize"; c.state.status = "completed"; c.state.completedAt = Date.now(); @@ -532,7 +514,7 @@ export const workflowHistoryInProgress = actor({ c, input?: WorkflowHistoryInProgressInput, ): WorkflowHistoryInProgressState => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, status: "running", processingDurationMs: input?.processingDurationMs ?? 30000, progress: 0, @@ -541,21 +523,18 @@ export const workflowHistoryInProgress = actor({ getState: (c): WorkflowHistoryInProgressState => c.state, }, run: workflow(async (ctx) => { - await ctx.step("init", async () => { - const c = actorCtx(ctx); + await ctx.step("init", async (c) => { c.state.startedAt = Date.now(); c.state.progress = 10; return { initialized: true }; }); - await ctx.step("fetch-data", async () => { - const c = actorCtx(ctx); + await ctx.step("fetch-data", async (c) => { c.state.progress = 25; return { fetched: true, records: 100 }; }); - await ctx.step("process", async () => { - const c = actorCtx(ctx); + await ctx.step("process", async (c) => { c.state.progress = 42; await delay(c.state.processingDurationMs); c.state.status = "completed"; @@ -577,7 +556,7 @@ const RETRY_MAX_RETRIES = 20; export const workflowHistoryRetrying = actor({ createState: (c): WorkflowHistoryRetryingState => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, status: "running", attempts: 0, succeedAfter: 999, @@ -589,8 +568,7 @@ export const workflowHistoryRetrying = actor({ }, }, run: workflow(async (ctx) => { - await ctx.step("start", async () => { - const c = actorCtx(ctx); + await ctx.step("start", async (c) => { c.state.status = "running"; return { ready: true }; }); @@ -600,8 +578,7 @@ export const workflowHistoryRetrying = actor({ maxRetries: RETRY_MAX_RETRIES, retryBackoffBase: 250, retryBackoffMax: 1500, - run: async () => { - const c = actorCtx(ctx); + run: async (c) => { c.state.attempts += 1; if (c.state.attempts < c.state.succeedAfter) { const error = new Error("Connection timeout after 5000ms"); @@ -627,7 +604,7 @@ const FAILED_MAX_RETRIES = 3; export const workflowHistoryFailed = actor({ createState: (c): WorkflowHistoryFailedState => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, status: "running", attempts: 0, }), @@ -635,8 +612,7 @@ export const workflowHistoryFailed = actor({ getState: (c): WorkflowHistoryFailedState => c.state, }, run: workflow(async (ctx) => { - await ctx.step("init", async () => { - const c = actorCtx(ctx); + await ctx.step("init", async (c) => { c.state.status = "running"; return { initialized: true }; }); @@ -650,8 +626,7 @@ export const workflowHistoryFailed = actor({ maxRetries: FAILED_MAX_RETRIES, retryBackoffBase: 200, retryBackoffMax: 800, - run: async () => { - const c = actorCtx(ctx); + run: async (c) => { c.state.attempts += 1; const error = new Error( "Database connection failed: ECONNREFUSED", diff --git a/examples/kitchen-sink/src/actors/workflow/order.ts b/examples/kitchen-sink/src/actors/workflow/order.ts index 5dbb7889f6..16e1012634 100644 --- a/examples/kitchen-sink/src/actors/workflow/order.ts +++ b/examples/kitchen-sink/src/actors/workflow/order.ts @@ -4,7 +4,6 @@ import { actor, event } from "rivetkit"; import { Loop, workflow } from "rivetkit/workflow"; -import { actorCtx } from "./_helpers.ts"; export type OrderStatus = | "pending" @@ -23,8 +22,6 @@ export type Order = { completedAt?: number; }; -type State = Order; - async function simulateWork(name: string, failChance = 0.1): Promise { await new Promise((resolve) => setTimeout(resolve, 500 + Math.random() * 1000), @@ -36,7 +33,7 @@ async function simulateWork(name: string, failChance = 0.1): Promise { export const order = actor({ createState: (c): Order => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, status: "pending", step: 0, createdAt: Date.now(), @@ -51,36 +48,40 @@ export const order = actor({ run: workflow(async (ctx) => { await ctx.loop("process-order", async (loopCtx) => { - const c = actorCtx(loopCtx); - - await loopCtx.step("validate", async () => { - ctx.log.info({ msg: "processing order", orderId: c.state.id }); - c.state.status = "validating"; - c.state.step = 1; - c.broadcast("orderUpdated", c.state); + await loopCtx.step("validate", async (step) => { + step.log.info({ + msg: "processing order", + orderId: step.state.id, + }); + step.state.status = "validating"; + step.state.step = 1; + step.broadcast("orderUpdated", step.state); await simulateWork("validation", 0.05); }); - await loopCtx.step("charge", async () => { - c.state.status = "charging"; - c.state.step = 2; - c.broadcast("orderUpdated", c.state); + await loopCtx.step("charge", async (step) => { + step.state.status = "charging"; + step.state.step = 2; + step.broadcast("orderUpdated", step.state); await simulateWork("payment", 0.1); }); - await loopCtx.step("fulfill", async () => { - c.state.status = "fulfilling"; - c.state.step = 3; - c.broadcast("orderUpdated", c.state); + await loopCtx.step("fulfill", async (step) => { + step.state.status = "fulfilling"; + step.state.step = 3; + step.broadcast("orderUpdated", step.state); await simulateWork("fulfillment", 0.05); }); - await loopCtx.step("complete", async () => { - c.state.status = "completed"; - c.state.step = 4; - c.state.completedAt = Date.now(); - c.broadcast("orderUpdated", c.state); - ctx.log.info({ msg: "order completed", orderId: c.state.id }); + await loopCtx.step("complete", async (step) => { + step.state.status = "completed"; + step.state.step = 4; + step.state.completedAt = Date.now(); + step.broadcast("orderUpdated", step.state); + step.log.info({ + msg: "order completed", + orderId: step.state.id, + }); }); return Loop.break(undefined); diff --git a/examples/kitchen-sink/src/actors/workflow/payment.ts b/examples/kitchen-sink/src/actors/workflow/payment.ts index 22985793bd..4ed1639f59 100644 --- a/examples/kitchen-sink/src/actors/workflow/payment.ts +++ b/examples/kitchen-sink/src/actors/workflow/payment.ts @@ -4,7 +4,6 @@ import { actor, event } from "rivetkit"; import { Loop, workflow } from "rivetkit/workflow"; -import { actorCtx } from "./_helpers.ts"; export type TransactionStep = { name: string; @@ -31,8 +30,6 @@ export type Transaction = { completedAt?: number; }; -type State = Transaction; - export type TransactionInput = { amount?: number; shouldFail?: boolean; @@ -40,7 +37,7 @@ export type TransactionInput = { export const payment = actor({ createState: (c, input?: TransactionInput): Transaction => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, amount: input?.amount ?? 100, shouldFail: input?.shouldFail ?? false, status: "pending", @@ -63,10 +60,8 @@ export const payment = actor({ run: workflow(async (ctx) => { await ctx.loop("payment-loop", async (loopCtx) => { - const c = actorCtx(loopCtx); - - await loopCtx.step("init-payment", async () => { - ctx.log.info({ + await loopCtx.step("init-payment", async (c) => { + c.log.info({ msg: "starting payment processing", txId: c.state.id, amount: c.state.amount, @@ -80,7 +75,7 @@ export const payment = actor({ // Step 1: Reserve inventory await loopCtx.step({ name: "reserve-inventory", - run: async () => { + run: async (c) => { c.state.status = "reserving"; const step = c.state.steps.find( (s) => s.name === "reserve-inventory", @@ -94,13 +89,13 @@ export const payment = actor({ await new Promise((r) => setTimeout(r, 500 + Math.random() * 500), ); - ctx.log.info({ + c.log.info({ msg: "inventory reserved", txId: c.state.id, }); return { reserved: true }; }, - rollback: async () => { + rollback: async (c) => { // Set rolling_back status on first rollback c.state.status = "rolling_back"; const step = c.state.steps.find( @@ -110,7 +105,7 @@ export const payment = actor({ step.status = "rolled_back"; step.rolledBackAt = Date.now(); } - ctx.log.info({ + c.log.info({ msg: "inventory released", txId: c.state.id, }); @@ -123,7 +118,7 @@ export const payment = actor({ // Step 2: Charge card await loopCtx.step({ name: "charge-card", - run: async () => { + run: async (c) => { c.state.status = "charging"; const step = c.state.steps.find( (s) => s.name === "charge-card", @@ -142,10 +137,10 @@ export const payment = actor({ throw new Error("Payment declined (simulated)"); } - ctx.log.info({ msg: "card charged", txId: c.state.id }); + c.log.info({ msg: "card charged", txId: c.state.id }); return { chargeId: `ch_${c.state.id}` }; }, - rollback: async () => { + rollback: async (c) => { c.state.status = "rolling_back"; const step = c.state.steps.find( (s) => s.name === "charge-card", @@ -154,7 +149,7 @@ export const payment = actor({ step.status = "rolled_back"; step.rolledBackAt = Date.now(); } - ctx.log.info({ msg: "charge refunded", txId: c.state.id }); + c.log.info({ msg: "charge refunded", txId: c.state.id }); c.broadcast("transactionUpdated", c.state); // Small delay so UI can show the rollback await new Promise((r) => setTimeout(r, 400)); @@ -162,7 +157,7 @@ export const payment = actor({ }); // Step 3: Complete order - await loopCtx.step("complete-order", async () => { + await loopCtx.step("complete-order", async (c) => { c.state.status = "completing"; const step = c.state.steps.find( (s) => s.name === "complete-order", @@ -176,7 +171,7 @@ export const payment = actor({ c.state.status = "completed"; c.state.completedAt = Date.now(); - ctx.log.info({ msg: "order completed", txId: c.state.id }); + c.log.info({ msg: "order completed", txId: c.state.id }); c.broadcast("transactionCompleted", c.state); }); diff --git a/examples/kitchen-sink/src/actors/workflow/race.ts b/examples/kitchen-sink/src/actors/workflow/race.ts index f900f48bc7..e06e372a23 100644 --- a/examples/kitchen-sink/src/actors/workflow/race.ts +++ b/examples/kitchen-sink/src/actors/workflow/race.ts @@ -4,7 +4,6 @@ import { actor, event } from "rivetkit"; import { Loop, workflow } from "rivetkit/workflow"; -import { actorCtx } from "./_helpers.ts"; export type RaceTask = { id: string; @@ -17,8 +16,6 @@ export type RaceTask = { actualDurationMs?: number; }; -type State = RaceTask; - export type RaceTaskInput = { workDurationMs?: number; timeoutMs?: number; @@ -26,7 +23,7 @@ export type RaceTaskInput = { export const race = actor({ createState: (c, input?: RaceTaskInput): RaceTask => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, workDurationMs: input?.workDurationMs ?? 2000, timeoutMs: input?.timeoutMs ?? 3000, status: "running", @@ -43,13 +40,11 @@ export const race = actor({ run: workflow(async (ctx) => { await ctx.loop("race-loop", async (loopCtx) => { - const c = actorCtx(loopCtx); - // Get durations inside a step since state is only available in steps const { workDurationMs, timeoutMs, taskId } = await loopCtx.step( "start-race", - async () => { - ctx.log.info({ + async (c) => { + c.log.info({ msg: "starting race", taskId: c.state.id, workDurationMs: c.state.workDurationMs, @@ -86,7 +81,7 @@ export const race = actor({ }, ]); - await loopCtx.step("save-result", async () => { + await loopCtx.step("save-result", async (c) => { c.state.completedAt = Date.now(); c.state.actualDurationMs = c.state.completedAt - c.state.startedAt; @@ -94,14 +89,14 @@ export const race = actor({ if (winner === "work") { c.state.status = "work_won"; c.state.result = value as string; - ctx.log.info({ + c.log.info({ msg: "work completed before timeout", taskId: c.state.id, durationMs: c.state.actualDurationMs, }); } else { c.state.status = "timeout_won"; - ctx.log.info({ + c.log.info({ msg: "timeout won the race", taskId: c.state.id, durationMs: c.state.actualDurationMs, diff --git a/examples/kitchen-sink/src/actors/workflow/timer.ts b/examples/kitchen-sink/src/actors/workflow/timer.ts index 8b24848915..e683442c88 100644 --- a/examples/kitchen-sink/src/actors/workflow/timer.ts +++ b/examples/kitchen-sink/src/actors/workflow/timer.ts @@ -4,7 +4,6 @@ import { actor, event } from "rivetkit"; import { Loop, workflow } from "rivetkit/workflow"; -import { actorCtx } from "./_helpers.ts"; export type Timer = { id: string; @@ -14,8 +13,6 @@ export type Timer = { completedAt?: number; }; -type State = Timer; - export type TimerInput = { name?: string; durationMs?: number; @@ -23,7 +20,7 @@ export type TimerInput = { export const timer = actor({ createState: (c, input?: TimerInput): Timer => ({ - id: c.key[0] as string, + id: c.actorKey[0] as string, name: input?.name ?? "Timer", durationMs: input?.durationMs ?? 10000, startedAt: Date.now(), @@ -39,25 +36,29 @@ export const timer = actor({ run: workflow(async (ctx) => { await ctx.loop("timer-loop", async (loopCtx) => { - const c = actorCtx(loopCtx); - // Get duration inside a step since state is only available in steps - const durationMs = await loopCtx.step("start-timer", async () => { - ctx.log.info({ - msg: "starting timer", - timerId: c.state.id, - durationMs: c.state.durationMs, - }); - c.broadcast("timerStarted", c.state); - return c.state.durationMs; - }); + const durationMs = await loopCtx.step( + "start-timer", + async (step) => { + step.log.info({ + msg: "starting timer", + timerId: step.state.id, + durationMs: step.state.durationMs, + }); + step.broadcast("timerStarted", step.state); + return step.state.durationMs; + }, + ); await loopCtx.sleep("countdown", durationMs); - await loopCtx.step("complete-timer", async () => { - c.state.completedAt = Date.now(); - c.broadcast("timerCompleted", c.state); - ctx.log.info({ msg: "timer completed", timerId: c.state.id }); + await loopCtx.step("complete-timer", async (step) => { + step.state.completedAt = Date.now(); + step.broadcast("timerCompleted", step.state); + step.log.info({ + msg: "timer completed", + timerId: step.state.id, + }); }); return Loop.break(undefined); diff --git a/examples/kitchen-sink/src/actors/workflow/workflow-fixtures.ts b/examples/kitchen-sink/src/actors/workflow/workflow-fixtures.ts index 7f17c5d072..55cc1e1893 100644 --- a/examples/kitchen-sink/src/actors/workflow/workflow-fixtures.ts +++ b/examples/kitchen-sink/src/actors/workflow/workflow-fixtures.ts @@ -1,5 +1,5 @@ import { actor, event, queue } from "rivetkit"; -import { Loop, type WorkflowLoopContextOf, workflow } from "rivetkit/workflow"; +import { Loop, type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; const WORKFLOW_GUARD_KV_KEY = "__rivet_actor_workflow_guard_triggered"; @@ -13,17 +13,25 @@ export const workflowCounterActor = actor({ history: [] as number[], }, run: workflow(async (ctx) => { + let leakedStep: + | WorkflowStepContextOf + | undefined; await ctx.loop("counter", async (loopCtx) => { - try { - // Accessing state outside a step should throw. - // biome-ignore lint/style/noUnusedExpressions: intentionally checking accessor. - loopCtx.state; - } catch {} - - await loopCtx.step("increment", async () => { - incrementWorkflowCounter(loopCtx); + await loopCtx.step("increment", async (step) => { + incrementWorkflowCounter(step); + // Capture the step context to verify it cannot be used after + // its step has finished. + leakedStep = step; }); + if (leakedStep) { + try { + // Accessing state on a finished step context should throw. + // biome-ignore lint/style/noUnusedExpressions: intentionally checking accessor. + leakedStep.state; + } catch {} + } + await loopCtx.sleep("idle", 25); return Loop.continue(undefined); }); @@ -59,12 +67,8 @@ export const workflowQueueActor = actor({ return Loop.continue(undefined); } const complete = message.complete; - await loopCtx.step("store-message", async () => { - await storeWorkflowQueueMessage( - loopCtx, - message.body, - complete, - ); + await loopCtx.step("store-message", async (step) => { + await storeWorkflowQueueMessage(step, message.body, complete); }); return Loop.continue(undefined); }); @@ -80,8 +84,8 @@ export const workflowSleepActor = actor({ }, run: workflow(async (ctx) => { await ctx.loop("sleep", async (loopCtx) => { - await loopCtx.step("tick", async () => { - incrementWorkflowSleepTick(loopCtx); + await loopCtx.step("tick", async (step) => { + incrementWorkflowSleepTick(step); }); await loopCtx.sleep("delay", 40); return Loop.continue(undefined); @@ -115,9 +119,9 @@ export const workflowQueueTimeoutActor = actor({ }, run: workflow(async (ctx) => { await ctx.loop("queue-timeout-loop", async (loopCtx) => { - const timeoutMs = await loopCtx.step("read-timeout", async () => { - return readWorkflowTimeoutMs(loopCtx); - }); + const timeoutMs = await loopCtx.step("read-timeout", async (step) => + readWorkflowTimeoutMs(step), + ); const [message] = await loopCtx.queue.nextBatch( "wait-job-or-timeout", @@ -128,14 +132,14 @@ export const workflowQueueTimeoutActor = actor({ ); if (!message) { - await loopCtx.step("tick", async () => { - recordWorkflowTimeoutTick(loopCtx); + await loopCtx.step("tick", async (step) => { + recordWorkflowTimeoutTick(step); }); return Loop.continue(undefined); } - await loopCtx.step("process-job", async () => { - processWorkflowTimeoutJob(loopCtx, message.body); + await loopCtx.step("process-job", async (step) => { + processWorkflowTimeoutJob(step, message.body); }); return Loop.continue(undefined); }); @@ -155,53 +159,53 @@ export const workflowQueueTimeoutActor = actor({ }); function incrementWorkflowCounter( - ctx: WorkflowLoopContextOf, + step: WorkflowStepContextOf, ): void { - ctx.state.runCount += 1; - ctx.state.history.push(ctx.state.runCount); + step.state.runCount += 1; + step.state.history.push(step.state.runCount); } async function storeWorkflowQueueMessage( - ctx: WorkflowLoopContextOf, + step: WorkflowStepContextOf, body: unknown, complete: (response: { echo: unknown }) => Promise, ): Promise { - ctx.state.received.push(body); + step.state.received.push(body); await complete({ echo: body }); } function incrementWorkflowSleepTick( - ctx: WorkflowLoopContextOf, + step: WorkflowStepContextOf, ): void { - ctx.state.ticks += 1; + step.state.ticks += 1; } function readWorkflowTimeoutMs( - ctx: WorkflowLoopContextOf, + step: WorkflowStepContextOf, ): number { - return ctx.state.timeoutMs; + return step.state.timeoutMs; } function recordWorkflowTimeoutTick( - ctx: WorkflowLoopContextOf, + step: WorkflowStepContextOf, ): void { const at = Date.now(); - ctx.state.ticks += 1; - ctx.state.lastTickAt = at; - ctx.broadcast("tick", { - ticks: ctx.state.ticks, + step.state.ticks += 1; + step.state.lastTickAt = at; + step.broadcast("tick", { + ticks: step.state.ticks, at, }); } function processWorkflowTimeoutJob( - ctx: WorkflowLoopContextOf, + step: WorkflowStepContextOf, job: { id: string; payload: string }, ): void { - ctx.state.processed += 1; - ctx.state.lastJob = job; - ctx.broadcast("jobProcessed", { - processed: ctx.state.processed, + step.state.processed += 1; + step.state.lastJob = job; + step.broadcast("jobProcessed", { + processed: step.state.processed, job, }); } diff --git a/examples/multiplayer-game-patterns/src/actors/idle/world.ts b/examples/multiplayer-game-patterns/src/actors/idle/world.ts index e5705506d1..99c7e92169 100644 --- a/examples/multiplayer-game-patterns/src/actors/idle/world.ts +++ b/examples/multiplayer-game-patterns/src/actors/idle/world.ts @@ -54,7 +54,9 @@ export const idleWorld = actor({ return; } c.state.playerName = input.playerName; - const keyPlayerId = Array.isArray(c.key) ? c.key[0] : c.key; + const keyPlayerId = Array.isArray(c.actorKey) + ? c.actorKey[0] + : c.actorKey; if (input.playerId) { c.state.playerId = input.playerId; } else if (typeof keyPlayerId === "string" && keyPlayerId) { diff --git a/examples/multiplayer-game-patterns/src/actors/open-world/chunk.ts b/examples/multiplayer-game-patterns/src/actors/open-world/chunk.ts index 99145fba21..fffda30656 100644 --- a/examples/multiplayer-game-patterns/src/actors/open-world/chunk.ts +++ b/examples/multiplayer-game-patterns/src/actors/open-world/chunk.ts @@ -40,7 +40,7 @@ export const openWorldChunk = actor({ snapshot: event(), }, createState: (c): State => { - const key = Array.isArray(c.key) ? c.key : [c.key]; + const key = Array.isArray(c.actorKey) ? c.actorKey : [c.actorKey]; const chunkX = Number(key[1] ?? "0"); const chunkY = Number(key[2] ?? "0"); return { diff --git a/examples/multiplayer-game-patterns/src/actors/party/match.ts b/examples/multiplayer-game-patterns/src/actors/party/match.ts index b7fb34f521..ca9d82ead0 100644 --- a/examples/multiplayer-game-patterns/src/actors/party/match.ts +++ b/examples/multiplayer-game-patterns/src/actors/party/match.ts @@ -44,7 +44,7 @@ export const partyMatch = actor({ ): Promise => { const playerId = params?.playerId; const joinToken = params?.joinToken; - const matchId = c.key[0]; + const matchId = c.actorKey[0]; if (!matchId || !playerId || !joinToken) { throw new UserError("invalid join params", { diff --git a/examples/per-tenant-database/src/index.ts b/examples/per-tenant-database/src/index.ts index 2661707104..b1b46364aa 100644 --- a/examples/per-tenant-database/src/index.ts +++ b/examples/per-tenant-database/src/index.ts @@ -36,7 +36,7 @@ export const companyDatabase = actor({ createState: (c): CompanyDatabaseState => { const now = Date.now(); return { - company_name: getCompanyName(c.key[0]), + company_name: getCompanyName(c.actorKey[0]), employees: [], projects: [], created_at: now, diff --git a/examples/sandbox-coding-agent/src/index.ts b/examples/sandbox-coding-agent/src/index.ts index 77cee223b6..9515163283 100644 --- a/examples/sandbox-coding-agent/src/index.ts +++ b/examples/sandbox-coding-agent/src/index.ts @@ -234,7 +234,7 @@ export const agent = actor({ try { const sandbox = c .client() - .codingSandbox.getOrCreate([c.key[0]]); + .codingSandbox.getOrCreate([c.actorKey[0]]); const sessionId = c.state.sessionId ?? buildId("session"); if (!c.state.sessionId) { diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-raw.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-raw.ts index b9f9650712..29f83bd500 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-raw.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/actor-db-raw.ts @@ -54,7 +54,7 @@ async function recordLifecycleEvent( const client = c.client(); const observer = client.lifecycleObserver.getOrCreate([observerKey]); await observer.recordEvent({ - actorKey: c.key.join("/"), + actorKey: c.actorKey.join("/"), event, }); } diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/counter.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/counter.ts index 682ec1b36b..4e3c8b2594 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/counter.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/counter.ts @@ -20,7 +20,7 @@ export const counter = actor({ return c.state.count; }, getKey: (c) => { - return c.key; + return c.actorKey; }, }, }); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/destroy.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/destroy.ts index c752b17d6b..68113dcb35 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/destroy.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/destroy.ts @@ -25,7 +25,7 @@ export const destroyActor = actor({ }, onWake: (c) => { // Store the actor key so we can reference it in onDestroy - c.state.key = c.key.join("/"); + c.state.key = c.actorKey.join("/"); }, onRequest: (c, request) => { const url = new URL(request.url); @@ -95,7 +95,7 @@ export const destroyAbortSignalActor = actor({ onDestroy: async (c) => { const client = c.client(); const observer = client.destroyObserver.getOrCreate(["observer"]); - await observer.notifyDestroyed(c.key.join("/")); + await observer.notifyDestroyed(c.actorKey.join("/")); }, actions: { requestDestroy: (c) => { diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/metadata.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/metadata.ts index 7d8641d817..d5c946e378 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/metadata.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/metadata.ts @@ -13,7 +13,7 @@ export const metadataActor = actor({ }, onWake: (c) => { // Store the actor name during initialization - c.state.actorName = c.name; + c.state.actorName = c.actorName; }, actions: { // Set up test tags - this will be called by tests to simulate tags @@ -32,7 +32,7 @@ export const metadataActor = actor({ getMetadata: (c) => { // Create metadata object from stored values const metadata = { - name: c.name, + name: c.actorName, tags: c.state.storedTags, region: c.state.storedRegion, }; @@ -44,7 +44,7 @@ export const metadataActor = actor({ // Get the actor name getActorName: (c) => { - return c.name; + return c.actorName; }, // Get a specific tag by key diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/save-state.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/save-state.ts index 49a70fcb72..818b7004f3 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/save-state.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/save-state.ts @@ -30,7 +30,7 @@ export const saveStateActor = actor({ const observer = c .client() .saveStateObserver.getOrCreate(["observer"]); - await observer.recordPhase(c.key.join("/"), "immediate"); + await observer.recordPhase(c.actorKey.join("/"), "immediate"); await new Promise(() => {}); }, @@ -41,7 +41,7 @@ export const saveStateActor = actor({ const observer = c .client() .saveStateObserver.getOrCreate(["observer"]); - await observer.recordPhase(c.key.join("/"), "deferred"); + await observer.recordPhase(c.actorKey.join("/"), "deferred"); await new Promise(() => {}); }, diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/start-stop-race.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/start-stop-race.ts index c03015a096..2ab5a5172c 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/start-stop-race.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/start-stop-race.ts @@ -24,7 +24,7 @@ export const startStopRaceActor = actor({ const client = c.client(); const observer = client.lifecycleObserver.getOrCreate(["observer"]); await observer.recordEvent({ - actorKey: c.key.join("/"), + actorKey: c.actorKey.join("/"), event: "started", }); }, @@ -34,7 +34,7 @@ export const startStopRaceActor = actor({ const client = c.client(); const observer = client.lifecycleObserver.getOrCreate(["observer"]); await observer.recordEvent({ - actorKey: c.key.join("/"), + actorKey: c.actorKey.join("/"), event: "destroy", }); }, diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts index 6259cfd20d..c6c7fe9abd 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts @@ -5,7 +5,7 @@ import { db } from "@/common/database/mod"; import { WORKFLOW_GUARD_KV_KEY } from "@/workflow/constants"; import { type WorkflowErrorEvent, - type WorkflowLoopContextOf, + type WorkflowStepContextOf, workflow, } from "@/workflow/mod"; import type { registry } from "./registry-static"; @@ -36,17 +36,25 @@ export const workflowCounterActor = actor({ history: [] as number[], }, run: workflow(async (ctx) => { + let leakedStep: + | WorkflowStepContextOf + | undefined; await ctx.loop("counter", async (loopCtx) => { - try { - // Accessing state outside a step should throw. - // biome-ignore lint/style/noUnusedExpressions: intentionally checking accessor. - loopCtx.state; - } catch {} - - await loopCtx.step("increment", async () => { - incrementWorkflowCounter(loopCtx); + await loopCtx.step("increment", async (step) => { + incrementWorkflowCounter(step); + // Capture the step context to verify it cannot be used after + // its step has finished. + leakedStep = step; }); + // Using a finished step context outside its step should throw. + if (leakedStep) { + try { + // biome-ignore lint/style/noUnusedExpressions: intentionally checking accessor. + leakedStep.state; + } catch {} + } + await loopCtx.sleep("idle", 25); return Loop.continue(undefined); }); @@ -82,12 +90,8 @@ export const workflowQueueActor = actor({ return Loop.continue(undefined); } const complete = message.complete; - await loopCtx.step("store-message", async () => { - await storeWorkflowQueueMessage( - loopCtx, - message.body, - complete, - ); + await loopCtx.step("store-message", async (step) => { + await storeWorkflowQueueMessage(step, message.body, complete); }); return Loop.continue(undefined); }); @@ -130,9 +134,12 @@ export const workflowNestedLoopActor = actor({ return Loop.break(undefined); } - await subLoopCtx.step(`process-item-${itemIndex}`, async () => { - subLoopCtx.state.processed.push(item); - }); + await subLoopCtx.step( + `process-item-${itemIndex}`, + async (step) => { + step.state.processed.push(item); + }, + ); itemIndex += 1; return Loop.continue(undefined); }); @@ -177,8 +184,8 @@ export const workflowNestedJoinActor = actor({ run: async (branchCtx) => await branchCtx.step( `process-item-${index}`, - async () => { - branchCtx.state.processed.push(item); + async (step) => { + step.state.processed.push(item); return item; }, ), @@ -224,8 +231,8 @@ export const workflowNestedRaceActor = actor({ { name: "fast", run: async (raceCtx) => - await raceCtx.step("process-fast", async () => { - raceCtx.state.processed.push(item); + await raceCtx.step("process-fast", async (step) => { + step.state.processed.push(item); return item; }), }, @@ -271,8 +278,8 @@ export const workflowSpawnChildActor = actor({ work: queue<{ task: string }, { ok: true }>(), }, run: workflow(async (ctx) => { - await ctx.step("mark-started", async () => { - ctx.state.started = true; + await ctx.step("mark-started", async (step) => { + step.state.started = true; }); await ctx.loop("cmd-loop", async (loopCtx) => { @@ -283,8 +290,8 @@ export const workflowSpawnChildActor = actor({ completable: true, }, ); - await loopCtx.step("process-cmd", async () => { - loopCtx.state.processed.push(message.body.task); + await loopCtx.step("process-cmd", async (step) => { + step.state.processed.push(message.body.task); }); await message.complete?.({ ok: true }); return Loop.continue(undefined); @@ -319,9 +326,9 @@ export const workflowSpawnParentActor = actor({ }, ); - await loopCtx.step("spawn-child", async () => { + await loopCtx.step("spawn-child", async (step) => { try { - const client = loopCtx.client(); + const client = step.client(); const handle = client.workflowSpawnChildActor.getOrCreate( [message.body.key], { @@ -336,13 +343,13 @@ export const workflowSpawnParentActor = actor({ timeout: 500, }, ); - loopCtx.state.results.push({ + step.state.results.push({ key: message.body.key, result, error: null, }); } catch (error) { - loopCtx.state.results.push({ + step.state.results.push({ key: message.body.key, result: null, error: @@ -387,32 +394,37 @@ export const workflowAccessActor = actor({ insideClientAvailable: false, }, run: workflow(async (ctx) => { + let leakedStep: + | WorkflowStepContextOf + | undefined; await ctx.loop("access", async (loopCtx) => { + await loopCtx.step("access-step", async (step) => { + // Inside a step, db and client are reachable. + await updateWorkflowAccessInsideState(step); + leakedStep = step; + }); + + // db and client on a finished step context should throw the + // step-only guard. let outsideDbError: string | null = null; let outsideClientError: string | null = null; - try { - // Accessing db outside a step should throw. // biome-ignore lint/style/noUnusedExpressions: intentionally checking accessor. - loopCtx.db; + leakedStep?.db; } catch (error) { outsideDbError = error instanceof Error ? error.message : String(error); } - try { - loopCtx.client(); + leakedStep?.client(); } catch (error) { outsideClientError = error instanceof Error ? error.message : String(error); } - await loopCtx.step("access-step", async () => { - await updateWorkflowAccessState( - loopCtx, - outsideDbError, - outsideClientError, - ); + await loopCtx.step("record-access", async (step) => { + step.state.outsideDbError = outsideDbError; + step.state.outsideClientError = outsideClientError; }); await loopCtx.sleep("idle", 25); @@ -430,8 +442,8 @@ export const workflowSleepActor = actor({ }, run: workflow(async (ctx) => { await ctx.loop("sleep", async (loopCtx) => { - await loopCtx.step("tick", async () => { - incrementWorkflowSleepTick(loopCtx); + await loopCtx.step("tick", async (step) => { + incrementWorkflowSleepTick(step); }); await loopCtx.sleep("delay", 40); return Loop.continue(undefined); @@ -463,9 +475,9 @@ export const workflowTryActor = actor({ const stepResult = await ctx.tryStep({ name: "charge-card", maxRetries: 0, - run: async () => { - ctx.state.innerWrites += 1; - ctx.vars.innerWrites += 1; + run: async (step) => { + step.state.innerWrites += 1; + step.vars.innerWrites += 1; throw new Error("card declined"); }, }); @@ -483,17 +495,17 @@ export const workflowTryActor = actor({ }); }); - await ctx.step("store-try-results", async () => { - ctx.vars.recoveryWrites += 1; + await ctx.step("store-try-results", async (step) => { + step.vars.recoveryWrites += 1; if (!stepResult.ok) { - ctx.state.tryStepFailure = { + step.state.tryStepFailure = { kind: stepResult.failure.kind, message: stepResult.failure.error.message, attempts: stepResult.failure.attempts, }; } if (!joinResult.ok) { - ctx.state.tryJoinFailure = `${joinResult.failure.source}:${joinResult.failure.name}`; + step.state.tryJoinFailure = `${joinResult.failure.source}:${joinResult.failure.name}`; } }); }), @@ -519,18 +531,18 @@ export const workflowStepRollbackActor = actor({ const stepResult = await ctx.try( "recover-failed-step", async (tryCtx) => { - await tryCtx.step("failing-step", async () => { - tryCtx.state.failedStateWrites += 1; - tryCtx.vars.failedVarsWrites += 1; + await tryCtx.step("failing-step", async (step) => { + step.state.failedStateWrites += 1; + step.vars.failedVarsWrites += 1; throw new Error("step rollback"); }); }, ); - await ctx.step("record-recovery", async () => { - ctx.state.recoveryStateWrites += 1; - ctx.vars.recoveryVarsWrites += 1; - ctx.state.failureCaught = !stepResult.ok; + await ctx.step("record-recovery", async (step) => { + step.state.recoveryStateWrites += 1; + step.vars.recoveryVarsWrites += 1; + step.state.failureCaught = !stepResult.ok; }); }), actions: { @@ -588,8 +600,8 @@ export const workflowCompleteActor = actor({ c.state.sleepCount += 1; }, run: workflow(async (ctx) => { - await ctx.step("complete", async () => { - ctx.state.runCount += 1; + await ctx.step("complete", async (step) => { + step.state.runCount += 1; }); }), actions: { @@ -604,11 +616,11 @@ export const workflowDestroyActor = actor({ onDestroy: async (c) => { const client = c.client(); const observer = client.destroyObserver.getOrCreate(["observer"]); - await observer.notifyDestroyed(c.key.join("/")); + await observer.notifyDestroyed(c.actorKey.join("/")); }, run: workflow(async (ctx) => { - await ctx.step("destroy", async () => { - ctx.destroy(); + await ctx.step("destroy", async (step) => { + step.destroy(); }); }), }); @@ -627,15 +639,15 @@ export const workflowFailedStepActor = actor({ c.state.sleepCount += 1; }, run: workflow(async (ctx) => { - await ctx.step("prepare", async () => { - ctx.state.timeline.push("prepare"); + await ctx.step("prepare", async (step) => { + step.state.timeline.push("prepare"); }); await ctx.step({ name: "fail", maxRetries: 2, - run: async () => { - ctx.state.runCount += 1; - ctx.state.timeline.push("fail"); + run: async (step) => { + step.state.runCount += 1; + step.state.timeline.push("fail"); throw new Error("workflow step failed"); }, }); @@ -660,9 +672,9 @@ export const workflowErrorHookActor = actor({ maxRetries: 2, retryBackoffBase: 1, retryBackoffMax: 1, - run: async () => { - ctx.state.attempts += 1; - if (ctx.state.attempts === 1) { + run: async (step) => { + step.state.attempts += 1; + if (step.state.attempts === 1) { throw new Error("workflow hook failed"); } }, @@ -700,9 +712,9 @@ export const workflowErrorHookSleepActor = actor({ maxRetries: 2, retryBackoffBase: 1, retryBackoffMax: 1, - run: async () => { - ctx.state.attempts += 1; - if (ctx.state.attempts === 1) { + run: async (step) => { + step.state.attempts += 1; + if (step.state.attempts === 1) { throw new Error("workflow hook failed"); } }, @@ -746,9 +758,9 @@ export const workflowErrorHookEffectsActor = actor({ maxRetries: 2, retryBackoffBase: 1, retryBackoffMax: 1, - run: async () => { - ctx.state.attempts += 1; - if (ctx.state.attempts === 1) { + run: async (step) => { + step.state.attempts += 1; + if (step.state.attempts === 1) { throw new Error("workflow hook failed"); } }, @@ -788,11 +800,11 @@ export const workflowReplayActor = actor({ timeline: [] as string[], }, run: workflow(async (ctx) => { - await ctx.step("one", async () => { - ctx.state.timeline.push("one"); + await ctx.step("one", async (step) => { + step.state.timeline.push("one"); }); - await ctx.step("two", async () => { - ctx.state.timeline.push("two"); + await ctx.step("two", async (step) => { + step.state.timeline.push("two"); }); }), actions: { @@ -809,21 +821,21 @@ export const workflowRunningStepActor = actor({ }, run: workflow(async (ctx) => { await ctx.step("prepare", async () => {}); - await ctx.step("block", async () => { + await ctx.step("block", async (step) => { const deferred = createWorkflowRunningStepDeferred(); - workflowRunningStepDeferreds.set(ctx.actorId, deferred); - if (workflowRunningStepReleased.delete(ctx.actorId)) { + workflowRunningStepDeferreds.set(step.actorId, deferred); + if (workflowRunningStepReleased.delete(step.actorId)) { deferred.resolve(); } try { await deferred.promise; } finally { - workflowRunningStepDeferreds.delete(ctx.actorId); - workflowRunningStepReleased.delete(ctx.actorId); + workflowRunningStepDeferreds.delete(step.actorId); + workflowRunningStepReleased.delete(step.actorId); } }); - await ctx.step("finish", async () => { - ctx.state.finishedAt = Date.now(); + await ctx.step("finish", async (step) => { + step.state.finishedAt = Date.now(); }); }), actions: { @@ -843,45 +855,41 @@ export const workflowRunningStepActor = actor({ }); function incrementWorkflowCounter( - ctx: WorkflowLoopContextOf, + step: WorkflowStepContextOf, ): void { - ctx.state.runCount += 1; - ctx.state.history.push(ctx.state.runCount); + step.state.runCount += 1; + step.state.history.push(step.state.runCount); } async function storeWorkflowQueueMessage( - ctx: WorkflowLoopContextOf, + step: WorkflowStepContextOf, body: unknown, complete: (response: { echo: unknown }) => Promise, ): Promise { - ctx.state.received.push(body); + step.state.received.push(body); await complete({ echo: body }); } -async function updateWorkflowAccessState( - ctx: WorkflowLoopContextOf, - outsideDbError: string | null, - outsideClientError: string | null, +async function updateWorkflowAccessInsideState( + step: WorkflowStepContextOf, ): Promise { - await ctx.db.execute( + await step.db.execute( `INSERT INTO workflow_access_log (created_at) VALUES (${Date.now()})`, ); - const counts = await ctx.db.execute<{ count: number }>( + const counts = await step.db.execute<{ count: number }>( `SELECT COUNT(*) as count FROM workflow_access_log`, ); - const client = ctx.client(); + const client = step.client(); - ctx.state.outsideDbError = outsideDbError; - ctx.state.outsideClientError = outsideClientError; - ctx.state.insideDbCount = counts[0]?.count ?? 0; - ctx.state.insideClientAvailable = + step.state.insideDbCount = counts[0]?.count ?? 0; + step.state.insideClientAvailable = typeof client.workflowQueueActor.getForId === "function"; } function incrementWorkflowSleepTick( - ctx: WorkflowLoopContextOf, + step: WorkflowStepContextOf, ): void { - ctx.state.ticks += 1; + step.state.ticks += 1; } export { WORKFLOW_NESTED_QUEUE_NAME, WORKFLOW_QUEUE_NAME }; diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts index ea94448566..4020f7c7cf 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts @@ -310,8 +310,8 @@ export interface ActorContext< readonly schedule: ActorSchedule; readonly queue: ActorQueue; readonly actorId: string; - readonly name: string; - readonly key: string[]; + readonly actorName: string; + readonly actorKey: string[]; readonly region: string; readonly conns: Map< string, diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts index 4d7438ec00..d41591c20d 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts @@ -2557,11 +2557,11 @@ export class ActorContextHandleAdapter { return callNativeSync(() => this.#runtime.actorId(this.#ctx)); } - get name(): string { + get actorName(): string { return callNativeSync(() => this.#runtime.actorName(this.#ctx)); } - get key(): string[] { + get actorKey(): string[] { return toActorKey( callNativeSync(() => this.#runtime.actorKey(this.#ctx)), ); diff --git a/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts b/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts index 72809666f2..2a3eafec70 100644 --- a/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts +++ b/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts @@ -1,5 +1,3 @@ -// @ts-nocheck - import type { BranchConfig, BranchOutput, @@ -57,7 +55,177 @@ type WorkflowActorQueueNextBatchOptions< type WorkflowActorQueueNextBatchOptionsFallback = Omit, "signal">; -type ActorWorkflowLoopConfig< +// Step run callbacks receive a WorkflowStepContext, which is the only place +// actor data (state/db/vars/client) may be touched. +type WorkflowStepRun< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + TEvents extends EventSchemaConfig, + TQueues extends QueueSchemaConfig, +> = ( + step: WorkflowStepContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, +) => Promise; + +// Step rollback callbacks compensate a committed step, so they also run with a +// WorkflowStepContext to mutate actor data. +type WorkflowStepRollback< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + TEvents extends EventSchemaConfig, + TQueues extends QueueSchemaConfig, +> = ( + step: WorkflowStepContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + output: T, +) => Promise; + +// Orchestration callbacks (try/loop/race/join) receive a WorkflowContext, +// because inside them you sequence further steps rather than touch actor data. +type WorkflowContextRun< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + TEvents extends EventSchemaConfig, + TQueues extends QueueSchemaConfig, +> = ( + ctx: WorkflowContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, +) => Promise; + +export type WorkflowStepConfig< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + TEvents extends EventSchemaConfig, + TQueues extends QueueSchemaConfig, +> = Omit, "run" | "rollback"> & { + run: WorkflowStepRun< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >; + rollback?: WorkflowStepRollback< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >; +}; + +export type WorkflowTryStepConfig< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + TEvents extends EventSchemaConfig, + TQueues extends QueueSchemaConfig, +> = Omit, "run" | "rollback"> & { + run: WorkflowStepRun< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >; + rollback?: WorkflowStepRollback< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >; +}; + +export type WorkflowTryConfig< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + TEvents extends EventSchemaConfig, + TQueues extends QueueSchemaConfig, +> = Omit, "run"> & { + run: WorkflowContextRun< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >; +}; + +export type WorkflowLoopConfig< S, T, TState, @@ -70,7 +238,7 @@ type ActorWorkflowLoopConfig< TQueues extends QueueSchemaConfig, > = Omit, "run"> & { run: ( - ctx: ActorWorkflowContext< + ctx: WorkflowContext< TState, TConnParams, TConnState, @@ -86,7 +254,7 @@ type ActorWorkflowLoopConfig< >; }; -type ActorWorkflowBranchConfig< +export type WorkflowBranchConfig< TOutput, TState, TConnParams, @@ -97,8 +265,54 @@ type ActorWorkflowBranchConfig< TEvents extends EventSchemaConfig, TQueues extends QueueSchemaConfig, > = { - run: ( - ctx: ActorWorkflowContext< + run: WorkflowContextRun< + TOutput, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >; +}; + +// Marks a step context inactive once its step has finished. Module-private so it +// never appears on the public surface. +const DEACTIVATE_STEP = Symbol("workflow.step.deactivate"); + +/** + * The context handed to a workflow step (`step` / `tryStep` callbacks). This is + * the only scope where actor data (state, vars, db, client) and side effects + * (broadcast, queue.send) are reachable. It is valid only while its step is + * executing; using it after the step resolves throws. + */ +export class WorkflowStepContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase extends AnyDatabaseProvider, + TEvents extends EventSchemaConfig = Record, + TQueues extends QueueSchemaConfig = Record, +> { + #runCtx: RunContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >; + #active = true; + #onGuardViolation: () => void; + + constructor( + runCtx: RunContext< TState, TConnParams, TConnState, @@ -108,10 +322,133 @@ type ActorWorkflowBranchConfig< TEvents, TQueues >, - ) => Promise; -}; + onGuardViolation: () => void, + ) { + this.#runCtx = runCtx; + this.#onGuardViolation = onGuardViolation; + } -export class ActorWorkflowContext< + [DEACTIVATE_STEP](): void { + this.#active = false; + } + + #ensureActive(feature: string): void { + if (!this.#active) { + this.#onGuardViolation(); + throw new Error( + `${feature} is only available inside workflow steps`, + ); + } + } + + get actorId(): string { + return this.#runCtx.actorId; + } + + get actorName(): string { + return this.#runCtx.actorName; + } + + get actorKey(): string[] { + return this.#runCtx.actorKey; + } + + get log() { + return this.#runCtx.log; + } + + get abortSignal(): AbortSignal { + return this.#runCtx.abortSignal; + } + + get state(): TState extends never ? never : TState { + this.#ensureActive("state"); + return this.#runCtx.state as TState extends never ? never : TState; + } + + get vars(): TVars extends never ? never : TVars { + this.#ensureActive("vars"); + return this.#runCtx.vars as TVars extends never ? never : TVars; + } + + get db(): TDatabase extends never ? never : InferDatabaseClient { + this.#ensureActive("db"); + return this.#runCtx.db as TDatabase extends never + ? never + : InferDatabaseClient; + } + + client = Registry>(): Client { + this.#ensureActive("client"); + return this.#runCtx.client(); + } + + broadcast( + name: K, + ...args: InferEventArgs[K]> + ): void; + broadcast( + name: keyof TEvents extends never ? string : never, + ...args: Array + ): void; + broadcast(name: string, ...args: Array): void { + this.#ensureActive("broadcast"); + this.#runCtx.broadcast( + name as never, + ...(args as unknown[] as never[]), + ); + } + + get queue() { + const self = this; + function send( + name: K, + body: InferSchemaMap[K], + ): Promise; + function send( + name: keyof TQueues extends never ? string : never, + body: unknown, + ): Promise; + async function send(name: string, body: unknown): Promise { + self.#ensureActive("queue.send"); + await self.#runCtx.queue.send(name as never, body as never); + } + return { send }; + } + + /** + * Holds the actor awake for the duration of the provided promise. The actor + * cannot idle-sleep or finalize the sleep grace period until the promise + * settles. + */ + keepAwake(promise: Promise): Promise { + this.#ensureActive("keepAwake"); + return this.#runCtx.keepAwake(promise); + } + + /** + * Registers a promise that the sleep grace period will wait on. Use this for + * best-effort flush/cleanup work that may complete inside the grace window. + */ + waitUntil(promise: Promise): void { + this.#ensureActive("waitUntil"); + this.#runCtx.waitUntil(promise); + } + + destroy(): void { + this.#ensureActive("destroy"); + this.#runCtx.destroy(); + } +} + +/** + * The context handed to the workflow function and to orchestration callbacks + * (`try` / `loop` / `race` / `join`). It exposes the deterministic, replayable + * workflow primitives (step, sleep, queue waits, control flow). It deliberately + * does NOT expose actor data; reach `state`, `db`, `vars`, `client`, and + * `broadcast` through the step context passed to `step` / `tryStep`. + */ +export class WorkflowContext< TState, TConnParams, TConnState, @@ -120,8 +457,7 @@ export class ActorWorkflowContext< TDatabase extends AnyDatabaseProvider, TEvents extends EventSchemaConfig = Record, TQueues extends QueueSchemaConfig = Record, -> implements WorkflowContextInterface -{ +> { #inner: WorkflowContextInterface; #runCtx: RunContext< TState, @@ -133,9 +469,6 @@ export class ActorWorkflowContext< TEvents, TQueues >; - #actorAccessDepth = 0; - #allowActorAccess = false; - #guardViolation = false; constructor( inner: WorkflowContextInterface, @@ -162,6 +495,22 @@ export class ActorWorkflowContext< return this.#inner.abortSignal; } + get actorId(): string { + return this.#runCtx.actorId; + } + + get actorName(): string { + return this.#runCtx.actorName; + } + + get actorKey(): string[] { + return this.#runCtx.actorKey; + } + + get log() { + return this.#runCtx.log; + } + get queue() { const self = this; function next< @@ -181,10 +530,9 @@ export class ActorWorkflowContext< TCompletable > >; - async function next( - name: string, - opts?: WorkflowActorQueueNextOptions, - ): Promise> { + // The implementation signature stays broad so the schema-typed public + // overloads above remain compatible with it. + async function next(name: string, opts?: any): Promise { const message = await self.#inner.queue.next(name, opts); return self.#toActorQueueMessage(message); } @@ -210,106 +558,232 @@ export class ActorWorkflowContext< > > >; - async function nextBatch( - name: string, - opts?: WorkflowActorQueueNextBatchOptions, - ): Promise>> { + async function nextBatch(name: string, opts?: any): Promise { const messages = await self.#inner.queue.nextBatch(name, opts); return messages.map((message) => self.#toActorQueueMessage(message), ); } - function send( - name: K, - body: InferSchemaMap[K], - ): Promise; - function send( - name: keyof TQueues extends never ? string : never, - body: unknown, - ): Promise; - async function send(name: string, body: unknown): Promise { - self.#ensureActorAccess("queue.send"); - await self.#runCtx.queue.send(name as never, body as never); - } - return { next, nextBatch, - send, }; } + step( + name: string, + run: WorkflowStepRun< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ): Promise; + step( + config: WorkflowStepConfig< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ): Promise; async step( - nameOrConfig: string | StepConfig, - run?: () => Promise, + nameOrConfig: + | string + | WorkflowStepConfig< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + run?: WorkflowStepRun< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, ): Promise { if (typeof nameOrConfig === "string") { if (!run) { throw new Error("Step run function missing"); } + const stepRun = run; return await this.#wrapActive(() => - this.#inner.step(nameOrConfig, () => - this.#withActorAccessAndStateRollback(run), - ), + this.#inner.step(nameOrConfig, () => this.#runStep(stepRun)), ); } - const stepConfig = nameOrConfig as StepConfig; + const stepConfig = nameOrConfig; + const rollback = stepConfig.rollback; const config: StepConfig = { ...stepConfig, - run: () => this.#withActorAccessAndStateRollback(stepConfig.run), + run: () => this.#runStep(stepConfig.run), + rollback: rollback + ? (_ctx, output) => this.#runRollback(rollback, output) + : undefined, }; return await this.#wrapActive(() => this.#inner.step(config)); } + tryStep( + name: string, + run: WorkflowStepRun< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ): Promise>; + tryStep( + config: WorkflowTryStepConfig< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ): Promise>; async tryStep( - nameOrConfig: string | TryStepConfig, - run?: () => Promise, + nameOrConfig: + | string + | WorkflowTryStepConfig< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + run?: WorkflowStepRun< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, ): Promise> { if (typeof nameOrConfig === "string") { if (!run) { throw new Error("Step run function missing"); } + const stepRun = run; return await this.#wrapActive(() => - this.#inner.tryStep(nameOrConfig, () => - this.#withActorAccessAndStateRollback(run), - ), + this.#inner.tryStep(nameOrConfig, () => this.#runStep(stepRun)), ); } - const stepConfig = nameOrConfig as TryStepConfig; + const stepConfig = nameOrConfig; + const rollback = stepConfig.rollback; const config: TryStepConfig = { ...stepConfig, - run: () => this.#withActorAccessAndStateRollback(stepConfig.run), + run: () => this.#runStep(stepConfig.run), + rollback: rollback + ? (_ctx, output) => this.#runRollback(rollback, output) + : undefined, }; return await this.#wrapActive(() => this.#inner.tryStep(config)); } + try( + name: string, + run: WorkflowContextRun< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ): Promise>; + try( + config: WorkflowTryConfig< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ): Promise>; async try( - nameOrConfig: string | Parameters[0], - run?: ( - ctx: ActorWorkflowContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase, - TEvents, - TQueues - >, - ) => Promise, + nameOrConfig: + | string + | WorkflowTryConfig< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + run?: WorkflowContextRun< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, ): Promise> { if (typeof nameOrConfig === "string") { if (!run) { throw new Error("Try run function missing"); } + const tryRun = run; return await this.#wrapActive(() => this.#inner.try(nameOrConfig, async (ctx) => - run(this.#createChildContext(ctx)), + tryRun(this.#createChildContext(ctx)), ), ); } - const tryConfig = nameOrConfig as TryBlockConfig; + const tryConfig = nameOrConfig; const config: TryBlockConfig = { ...tryConfig, run: async (ctx) => tryConfig.run(this.#createChildContext(ctx)), @@ -317,10 +791,10 @@ export class ActorWorkflowContext< return await this.#wrapActive(() => this.#inner.try(config)); } - async loop( + loop( name: string, run: ( - ctx: ActorWorkflowContext< + ctx: WorkflowContext< TState, TConnParams, TConnState, @@ -332,14 +806,8 @@ export class ActorWorkflowContext< >, ) => Promise | undefined | void>, ): Promise; - async loop( - name: string, - run: ( - ctx: WorkflowContextInterface, - ) => Promise | undefined | void>, - ): Promise; - async loop( - config: ActorWorkflowLoopConfig< + loop( + config: WorkflowLoopConfig< S, T, TState, @@ -352,12 +820,10 @@ export class ActorWorkflowContext< TQueues >, ): Promise; - async loop(config: LoopConfig): Promise; async loop( nameOrConfig: | string - | LoopConfig - | ActorWorkflowLoopConfig< + | WorkflowLoopConfig< any, any, TState, @@ -370,7 +836,7 @@ export class ActorWorkflowContext< TQueues >, run?: ( - ctx: ActorWorkflowContext< + ctx: WorkflowContext< TState, TConnParams, TConnState, @@ -386,16 +852,26 @@ export class ActorWorkflowContext< if (!run) { throw new Error("Loop run function missing"); } + const loopRun = run; return await this.#wrapActive(() => - this.#inner.loop(nameOrConfig, async (ctx) => - run(this.#createChildContext(ctx)), + this.#inner.loop( + nameOrConfig, + // A void return (no explicit Loop result) is undefined at + // runtime, which the engine treats as continue. + async ( + ctx, + ): Promise | undefined> => + (await loopRun(this.#createChildContext(ctx))) ?? + undefined, ), ); } + const loopConfig = nameOrConfig; const wrapped: LoopConfig = { - ...nameOrConfig, - run: async (ctx, state) => - nameOrConfig.run(this.#createChildContext(ctx), state), + ...loopConfig, + run: (async (ctx, state) => + (await loopConfig.run(this.#createChildContext(ctx), state)) ?? + undefined) as LoopConfig["run"], }; return await this.#wrapActive(() => this.#inner.loop(wrapped)); } @@ -408,11 +884,6 @@ export class ActorWorkflowContext< return this.#inner.sleepUntil(name, timestampMs); } - destroy(): void { - this.#ensureActorAccess("destroy"); - this.#runCtx.destroy(); - } - async rollbackCheckpoint(name: string): Promise { await this.#wrapActive(() => this.#inner.rollbackCheckpoint(name)); } @@ -420,7 +891,7 @@ export class ActorWorkflowContext< async join< T extends Record< string, - ActorWorkflowBranchConfig< + WorkflowBranchConfig< unknown, TState, TConnParams, @@ -440,7 +911,10 @@ export class ActorWorkflowContext< name: string, branches: T, ): Promise<{ [K in keyof T]: BranchOutput }>; - async join(name: string, branches: Record>) { + async join( + name: string, + branches: Record Promise }>, + ) { const wrappedBranches = Object.fromEntries( Object.entries(branches).map(([key, branch]) => [ key, @@ -459,25 +933,17 @@ export class ActorWorkflowContext< name: string, branches: Array<{ name: string; - run: ( - ctx: ActorWorkflowContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase, - TEvents, - TQueues - >, - ) => Promise; - }>, - ): Promise<{ winner: string; value: T }>; - async race( - name: string, - branches: Array<{ - name: string; - run: (ctx: WorkflowContextInterface) => Promise; + run: WorkflowContextRun< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >; }>, ): Promise<{ winner: string; value: T }> { const wrappedBranches = branches.map((branch) => ({ @@ -498,124 +964,33 @@ export class ActorWorkflowContext< return this.#inner.isEvicted(); } - get state(): TState extends never ? never : TState { - this.#ensureActorAccess("state"); - return this.#runCtx.state as TState extends never ? never : TState; - } - - get vars(): TVars extends never ? never : TVars { - this.#ensureActorAccess("vars"); - return this.#runCtx.vars as TVars extends never ? never : TVars; - } - - client = Registry>(): Client { - this.#ensureActorAccess("client"); - return this.#runCtx.client(); - } - - get db(): TDatabase extends never ? never : InferDatabaseClient { - this.#ensureActorAccess("db"); - return this.#runCtx.db as TDatabase extends never - ? never - : InferDatabaseClient; - } - - get log() { - return this.#runCtx.log; - } - - /** @deprecated No-op. Use `keepAwake(promise)` or `waitUntil(promise)` instead. */ - setPreventSleep(_prevent: boolean): void { - this.#ensureActorAccess("setPreventSleep"); - } - - /** @deprecated No-op. Always returns `false`. */ - get preventSleep(): boolean { - this.#ensureActorAccess("preventSleep"); - return false; - } - - /** - * Holds the actor awake for the duration of the provided promise. The - * actor cannot idle-sleep or finalize the sleep grace period until the - * promise settles. - */ - keepAwake(promise: Promise): Promise { - this.#ensureActorAccess("keepAwake"); - return this.#runCtx.keepAwake(promise); - } - - /** - * Registers a promise that the sleep grace period will wait on. Use this - * for best-effort flush/cleanup work that may complete inside the grace - * window. For work the actor must stay running through, prefer - * `c.keepAwake(promise)` which also blocks idle sleep. - */ - waitUntil(promise: Promise): void { - this.#ensureActorAccess("waitUntil"); - this.#runCtx.waitUntil(promise); - } - - get actorId(): string { - return this.#runCtx.actorId; - } - - broadcast( - name: K, - ...args: InferEventArgs[K]> - ): void; - broadcast( - name: keyof TEvents extends never ? string : never, - ...args: Array - ): void; - broadcast(name: string, ...args: Array): void { - this.#ensureActorAccess("broadcast"); - this.#runCtx.broadcast( - name as never, - ...(args as unknown[] as never[]), - ); - } - - #toActorQueueMessage( - message: WorkflowQueueMessage, - ): WorkflowQueueMessage & { id: bigint } { - let id: bigint; - try { - id = BigInt(message.id); - } catch { - throw new Error(`Invalid queue message id "${message.id}"`); - } - return { - id, - name: message.name, - body: message.body, - createdAt: message.createdAt, - ...(message.complete ? { complete: message.complete } : {}), - }; - } - - async #wrapActive(run: () => Promise): Promise { - return await this.#runCtx.internalKeepAwake(run); - } - - async #withActorAccess(run: () => Promise): Promise { - this.#actorAccessDepth++; - if (this.#actorAccessDepth === 1) { - this.#allowActorAccess = true; - } - try { - return await run(); - } finally { - this.#actorAccessDepth--; - if (this.#actorAccessDepth === 0) { - this.#allowActorAccess = false; - } - } - } - - async #withActorAccessAndStateRollback( - run: () => Promise, + // Runs a user step body inside a fresh step context, snapshotting actor + // state/vars so a thrown step rolls back its mutations, and deactivating the + // step context once the body settles so it cannot be used after the step. + async #runStep( + run: WorkflowStepRun< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, ): Promise { + const stepCtx = new WorkflowStepContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >(this.#runCtx, () => this.#markGuardTriggered()); + let stateSnapshot: { state: TState } | null = null; try { stateSnapshot = { state: this.#runCtx[RAW_STATE_SYMBOL]() }; @@ -629,33 +1004,77 @@ export class ActorWorkflowContext< stateSnapshot.state = structuredClone(stateSnapshot.state); } const varsSnapshot = structuredClone(this.#runCtx.vars); + try { - return await this.#withActorAccess(run); + return await run(stepCtx); } catch (error) { if (stateSnapshot) { this.#runCtx.state = stateSnapshot.state; } this.#runCtx.vars = varsSnapshot; throw error; + } finally { + stepCtx[DEACTIVATE_STEP](); } } - #ensureActorAccess(feature: string): void { - if (!this.#allowActorAccess) { - this.#guardViolation = true; - this.#markGuardTriggered(); - throw new Error( - `${feature} is only available inside workflow steps`, - ); + #toActorQueueMessage( + message: WorkflowQueueMessage, + ): WorkflowQueueMessage & { id: bigint } { + let id: bigint; + try { + id = BigInt(message.id); + } catch { + throw new Error(`Invalid queue message id "${message.id}"`); + } + return { + id, + name: message.name, + body: message.body, + createdAt: message.createdAt, + ...(message.complete ? { complete: message.complete } : {}), + }; + } + + // Runs a step rollback compensation with an active step context. Rollbacks + // intentionally mutate actor state, so their writes are not snapshotted. + async #runRollback( + rollback: WorkflowStepRollback< + T, + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + output: T, + ): Promise { + const stepCtx = new WorkflowStepContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >(this.#runCtx, () => this.#markGuardTriggered()); + try { + await rollback(stepCtx, output); + } finally { + stepCtx[DEACTIVATE_STEP](); } } - consumeGuardViolation(): boolean { - const violated = this.#guardViolation; - this.#guardViolation = false; - return violated; + async #wrapActive(run: () => Promise): Promise { + return await this.#runCtx.internalKeepAwake(run); } + // Records that a step context was used outside its step. Mirrors the value + // onto actor state and a KV flag so callers can observe the violation. #markGuardTriggered(): void { try { const state = this.#runCtx.state as Record; @@ -686,7 +1105,7 @@ export class ActorWorkflowContext< #createChildContext( ctx: WorkflowContextInterface, - ): ActorWorkflowContext< + ): WorkflowContext< TState, TConnParams, TConnState, @@ -696,7 +1115,7 @@ export class ActorWorkflowContext< TEvents, TQueues > { - return new ActorWorkflowContext(ctx, this.#runCtx); + return new WorkflowContext(ctx, this.#runCtx); } } @@ -712,7 +1131,7 @@ export type WorkflowContextOf = infer Q extends QueueSchemaConfig, any > - ? ActorWorkflowContext + ? WorkflowContext : never; export type WorkflowLoopContextOf = @@ -722,4 +1141,16 @@ export type WorkflowBranchContextOf = WorkflowContextOf; export type WorkflowStepContextOf = - WorkflowContextOf; + AD extends BaseActorDefinition< + infer S, + infer CP, + infer CS, + infer V, + infer I, + infer DB extends AnyDatabaseProvider, + infer E extends EventSchemaConfig, + infer Q extends QueueSchemaConfig, + any + > + ? WorkflowStepContext + : never; diff --git a/rivetkit-typescript/packages/rivetkit/src/workflow/driver.ts b/rivetkit-typescript/packages/rivetkit/src/workflow/driver.ts index ab456ae8dc..9d3619340b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/workflow/driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/workflow/driver.ts @@ -1,5 +1,3 @@ -// @ts-nocheck - import type { EngineDriver, KVEntry, @@ -13,6 +11,21 @@ import { makeWorkflowKey, workflowStoragePrefix } from "@/actor/keys"; const WORKFLOW_STORAGE_PREFIX = workflowStoragePrefix(); +// Mirrors the element shape returned by `queueManager.receive`. The actor +// instance is reached through a loose type here, so the call's result is +// untyped and the message must be annotated explicitly. +interface ReceivedQueueMessage { + id: bigint; + name: string; + body: unknown; + createdAt: number; + complete?: (response?: unknown) => Promise; +} + +// `kvListPrefix` returns key/value tuples, but the loose actor type erases that +// so the tuple destructures need an explicit annotation. +type KVEntryTuple = [Uint8Array, Uint8Array]; + function stripWorkflowKey(prefixed: Uint8Array): Uint8Array { return prefixed.slice(WORKFLOW_STORAGE_PREFIX.length); } @@ -62,7 +75,7 @@ class ActorWorkflowMessageDriver implements WorkflowMessageDriver { opts.completable, ), ); - return messages.map((message) => ({ + return messages.map((message: ReceivedQueueMessage) => ({ id: message.id.toString(), name: message.name, data: message.body, @@ -156,7 +169,7 @@ export class ActorWorkflowDriver implements EngineDriver { await this.#runCtx.internalKeepAwake( this.#actor.driver.kvBatchDelete( this.#actor.id, - entries.map(([key]) => key), + entries.map(([key]: KVEntryTuple) => key), ), ); } @@ -179,7 +192,7 @@ export class ActorWorkflowDriver implements EngineDriver { makeWorkflowKey(prefix), ), ); - return entries.map(([key, value]) => ({ + return entries.map(([key, value]: KVEntryTuple) => ({ key: stripWorkflowKey(key), value, })); @@ -295,7 +308,7 @@ export class ActorWorkflowControlDriver implements EngineDriver { } await this.#actor.driver.kvBatchDelete( this.#actor.id, - entries.map(([key]) => key), + entries.map(([key]: KVEntryTuple) => key), ); } @@ -312,7 +325,7 @@ export class ActorWorkflowControlDriver implements EngineDriver { this.#actor.id, makeWorkflowKey(prefix), ); - return entries.map(([key, value]) => ({ + return entries.map(([key, value]: KVEntryTuple) => ({ key: stripWorkflowKey(key), value, })); diff --git a/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts b/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts index ade9fc8ded..cf4e04ac82 100644 --- a/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/workflow/mod.ts @@ -1,5 +1,3 @@ -// @ts-nocheck - import { CriticalError, EntryInProgressError, @@ -9,6 +7,7 @@ import { RollbackCheckpointError, RollbackError, replayWorkflowFromStep, + type RunWorkflowOptions, runWorkflow, StepExhaustedError, type WorkflowErrorEvent, @@ -24,7 +23,7 @@ import { isActorAbortedError, RivetError } from "@/actor/errors"; import type { EventSchemaConfig, QueueSchemaConfig } from "@/actor/schema"; import type { AnyDatabaseProvider } from "@/common/database/config"; import { stringifyError } from "@/utils"; -import { ActorWorkflowContext } from "./context"; +import { WorkflowContext } from "./context"; import { ActorWorkflowControlDriver, ActorWorkflowDriver } from "./driver"; import { createWorkflowInspectorAdapter } from "./inspector"; @@ -42,11 +41,17 @@ export type { } from "@rivetkit/workflow-engine"; export { Loop } from "@rivetkit/workflow-engine"; export { - ActorWorkflowContext, + type WorkflowBranchConfig, type WorkflowBranchContextOf, + WorkflowContext, type WorkflowContextOf, + type WorkflowLoopConfig, type WorkflowLoopContextOf, + type WorkflowStepConfig, + WorkflowStepContext, type WorkflowStepContextOf, + type WorkflowTryConfig, + type WorkflowTryStepConfig, } from "./context"; function shouldRethrowWorkflowError(error: unknown): boolean { @@ -127,7 +132,7 @@ export function workflow< TQueues extends QueueSchemaConfig = Record, >( fn: ( - ctx: ActorWorkflowContext< + ctx: WorkflowContext< TState, TConnParams, TConnState, @@ -228,12 +233,14 @@ export function workflow< const handle = runWorkflow( actor.id, - async (ctx) => await fn(new ActorWorkflowContext(ctx, runCtx)), + async (ctx) => await fn(new WorkflowContext(ctx, runCtx)), undefined, driver, { mode: "live", - logger: runCtx.log, + // The actor logger and the engine's pino logger are runtime + // compatible but not structurally assignable. + logger: runCtx.log as RunWorkflowOptions["logger"], onHistoryUpdated: workflowInspector.update, onError: onError ? async (event) => await onError(runCtx, event) diff --git a/rivetkit-typescript/packages/rivetkit/tests/fixtures/engine-restart-serverless-runtime.ts b/rivetkit-typescript/packages/rivetkit/tests/fixtures/engine-restart-serverless-runtime.ts index 4e7dd5df35..8fe9ed0c8f 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/fixtures/engine-restart-serverless-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/fixtures/engine-restart-serverless-runtime.ts @@ -146,13 +146,13 @@ const sqliteCounter = actor({ logRuntimeEvent("gateway_health_request", { actorId: ctx.actorId, - key: ctx.key, + key: ctx.actorKey, }); return new Response( JSON.stringify({ ok: true, actorId: ctx.actorId, - key: ctx.key, + key: ctx.actorKey, }), { headers: { "Content-Type": "application/json" }, @@ -162,7 +162,7 @@ const sqliteCounter = actor({ onWebSocket: (ctx, websocket) => { logRuntimeEvent("gateway_websocket_open", { actorId: ctx.actorId, - key: ctx.key, + key: ctx.actorKey, path: ctx.request ? new URL(ctx.request.url).pathname : "unknown", }); websocket.addEventListener("message", (event: { data: unknown }) => { @@ -181,7 +181,7 @@ const sqliteCounter = actor({ type: "pong", sentAt: message.sentAt, actorId: ctx.actorId, - key: ctx.key, + key: ctx.actorKey, }), ); return; @@ -193,7 +193,7 @@ const sqliteCounter = actor({ websocket.addEventListener("close", () => { logRuntimeEvent("gateway_websocket_close", { actorId: ctx.actorId, - key: ctx.key, + key: ctx.actorKey, }); }); }, @@ -207,7 +207,7 @@ const sqliteCounter = actor({ vars.heartbeatSeq = 0; logRuntimeEvent("heartbeat_on_wake", { actorId: ctx.actorId, - key: ctx.key, + key: ctx.actorKey, mode: heartbeatMode, }); if (heartbeatMode === "none") { @@ -224,7 +224,7 @@ const sqliteCounter = actor({ vars.heartbeatSeq = seq; logRuntimeEvent("heartbeat_tick", { actorId: ctx.actorId, - key: ctx.key, + key: ctx.actorKey, seq, mode: heartbeatMode, }); @@ -234,7 +234,7 @@ const sqliteCounter = actor({ const count = await runHeartbeatSql(database); logRuntimeEvent("heartbeat_sql_ok", { actorId: ctx.actorId, - key: ctx.key, + key: ctx.actorKey, seq, count, }); @@ -242,7 +242,7 @@ const sqliteCounter = actor({ const count = await runHeartbeatKv(ctx.kv); logRuntimeEvent("heartbeat_kv_ok", { actorId: ctx.actorId, - key: ctx.key, + key: ctx.actorKey, seq, count, }); @@ -254,7 +254,7 @@ const sqliteCounter = actor({ : "heartbeat_kv_err", { actorId: ctx.actorId, - key: ctx.key, + key: ctx.actorKey, seq, error: stringifyError(error), }, @@ -277,7 +277,7 @@ const sqliteCounter = actor({ } logRuntimeEvent("heartbeat_abort", { actorId: ctx.actorId, - key: ctx.key, + key: ctx.actorKey, mode: heartbeatMode, }); }, @@ -294,7 +294,7 @@ const sqliteCounter = actor({ } logRuntimeEvent("heartbeat_on_sleep", { actorId: ctx.actorId, - key: ctx.key, + key: ctx.actorKey, mode: heartbeatMode, }); }, diff --git a/website/src/content/docs/actors/authentication.mdx b/website/src/content/docs/actors/authentication.mdx index 4c1b41a153..1eae467aa5 100644 --- a/website/src/content/docs/actors/authentication.mdx +++ b/website/src/content/docs/actors/authentication.mdx @@ -62,7 +62,7 @@ const chatRoom = actor({ state: { messages: [] as Message[] }, onBeforeConnect: async (c, params: ConnParams) => { - const roomName = c.key; + const roomName = c.actorKey; const isValid = await validateToken(params.authToken, roomName); if (!isValid) { throw new UserError("Forbidden", { code: "forbidden" }); @@ -112,7 +112,7 @@ const chatRoom = actor({ state: { messages: [] as Message[] }, createConnState: async (c, params: ConnParams): Promise => { - const roomName = c.key; + const roomName = c.actorKey; const payload = await validateToken(params.authToken, roomName); if (!payload) { throw new UserError("Forbidden", { code: "forbidden" }); @@ -149,7 +149,7 @@ Authentication hooks have access to several properties: | `c.request` | The underlying HTTP request object | | `c.request.headers` | Request headers for tokens, API keys (does not work for `.connect()`) | | `c.state` | Actor state for authorization decisions (see [state](/docs/actors/state)) | -| `c.key` | The actor's key (see [keys](/docs/actors/keys)) | +| `c.actorKey` | The actor's key (see [keys](/docs/actors/keys)) | It's recommended to use `params` instead of `c.request.headers` whenever possible since it works for both HTTP & WebSocket connections. @@ -384,7 +384,7 @@ const apiActor = actor({ ### Using `c.state` In Authorization -Access actor state via `c.state` and the actor's key via `c.key` to make authorization decisions: +Access actor state via `c.state` and the actor's key via `c.actorKey` to make authorization decisions: ```typescript import { actor, UserError } from "rivetkit"; diff --git a/website/src/content/docs/actors/design-patterns.mdx b/website/src/content/docs/actors/design-patterns.mdx index 9126394ac6..ad9f2efd71 100644 --- a/website/src/content/docs/actors/design-patterns.mdx +++ b/website/src/content/docs/actors/design-patterns.mdx @@ -412,7 +412,7 @@ const userSession = actor({ // external data stays fresh. createVars: async (c): Promise<{ user: User }> => { // Load from database on every wake - const user = await db.users.findById(c.key.join("-")); + const user = await db.users.findById(c.actorKey.join("-")); return { user }; }, @@ -423,9 +423,9 @@ const userSession = actor({ }, updateEmail: async (c, email: string) => { c.state.requestCount++; - await db.users.update(c.key.join("-"), { email }); + await db.users.update(c.actorKey.join("-"), { email }); // Refresh cached data - c.vars.user = await db.users.findById(c.key.join("-")); + c.vars.user = await db.users.findById(c.actorKey.join("-")); }, }, }); @@ -508,7 +508,7 @@ const userActor = actor({ onCreate: async (c, input: { email: string }) => { // Insert into database on actor creation await db.users.insert({ - id: c.key.join("-"), + id: c.actorKey.join("-"), email: input.email, createdAt: Date.now(), }); @@ -516,7 +516,7 @@ const userActor = actor({ onStateChange: async (c, newState) => { // Sync any state changes to database - await db.users.update(c.key.join("-"), { + await db.users.update(c.actorKey.join("-"), { email: newState.email, lastActive: newState.lastActive, }); diff --git a/website/src/content/docs/actors/index.mdx b/website/src/content/docs/actors/index.mdx index 60dcff17cc..4efb26c948 100644 --- a/website/src/content/docs/actors/index.mdx +++ b/website/src/content/docs/actors/index.mdx @@ -139,7 +139,7 @@ import { createClient } from "rivetkit/client"; const chatRoom = actor({ state: { messages: [] as string[] }, actions: { - getRoomInfo: (c) => ({ org: c.key[0], room: c.key[1] }), + getRoomInfo: (c) => ({ org: c.actorKey[0], room: c.actorKey[1] }), }, }); @@ -149,7 +149,7 @@ const client = createClient("http://localhost:6420"); // Compound key: [org, room] client.chatRoom.getOrCreate(["org-acme", "general"]); -// Access key inside actor via c.key +// Access key inside actor via c.actorKey ``` Don't build keys with string interpolation like `"org:${userId}"` when `userId` contains user data. Use arrays instead to prevent key injection attacks. @@ -363,9 +363,9 @@ const worker = actor({ await ctx.loop("task-loop", async (loopCtx) => { const message = await loopCtx.queue.next("wait-task"); - await loopCtx.step("process-task", async () => { + await loopCtx.step("process-task", async (step) => { await processTask(message.body.url); - loopCtx.state.processed += 1; + step.state.processed += 1; }); }); }), @@ -504,7 +504,7 @@ const chatRoom = actor({ createVars: () => ({ startTime: Date.now() }), // Actor lifecycle - onCreate: (c) => console.log("created", c.key), + onCreate: (c) => console.log("created", c.actorKey), onDestroy: (c) => console.log("destroyed"), onWake: (c) => console.log("actor started"), onSleep: (c) => console.log("actor sleeping"), @@ -720,7 +720,7 @@ actions: { addRoom: async (c, name: string) => { // Create the chat room actor const client = c.client(); -await client.chatRoom.create([c.key[0], name]); +await client.chatRoom.create([c.actorKey[0], name]); c.state.rooms.push(name); }, listRooms: (c) => c.state.rooms, @@ -806,12 +806,12 @@ const worker = actor({ control: queue(), }, run: workflow(async (ctx) => { - await ctx.step("setup", async () => { + await ctx.step("setup", async (step) => { await fetch("https://api.example.com/workers/init", { method: "POST", }); - ctx.state.phase = "running"; - ctx.state.stopReason = null; + step.state.phase = "running"; + step.state.stopReason = null; }); const stopReason = await ctx.loop("worker-loop", async (loopCtx) => { @@ -820,13 +820,13 @@ const worker = actor({ }); if (message.name === "work") { - await loopCtx.step("apply-work", async () => { + await loopCtx.step("apply-work", async (step) => { await fetch("https://api.example.com/workers/process", { method: "POST", body: JSON.stringify({ amount: message.body.amount }), }); - loopCtx.state.processed += 1; - loopCtx.state.total += message.body.amount; + step.state.processed += 1; + step.state.total += message.body.amount; }); return; } @@ -834,12 +834,12 @@ const worker = actor({ return Loop.break((message.body as ControlMessage).reason); }); - await ctx.step("teardown", async () => { + await ctx.step("teardown", async (step) => { await fetch("https://api.example.com/workers/shutdown", { method: "POST", }); - ctx.state.phase = "stopped"; - ctx.state.stopReason = stopReason; + step.state.phase = "stopped"; + step.state.stopReason = stopReason; }); }), }); diff --git a/website/src/content/docs/actors/keys.mdx b/website/src/content/docs/actors/keys.mdx index 481e226f5e..562c8704a5 100644 --- a/website/src/content/docs/actors/keys.mdx +++ b/website/src/content/docs/actors/keys.mdx @@ -124,7 +124,7 @@ const chatRoom = actor({ actions: { getRoomName: (c) => { // Access the key from metadata - const key = c.key; + const key = c.actorKey; return key[1]; // Get "general" from ["room", "general"] } } @@ -141,7 +141,7 @@ import { createClient } from "rivetkit/client"; const chatRoom = actor({ state: { messages: [] as string[] }, - actions: { getRoomName: (c) => c.key[1] } + actions: { getRoomName: (c) => c.actorKey[1] } }); const registry = setup({ use: { chatRoom } }); @@ -184,7 +184,7 @@ interface UserSessionState { const userSession = actor({ state: { userId: "", loginTime: 0, preferences: {} } as UserSessionState, createState: (c): UserSessionState => ({ - userId: c.key[0], // Extract user ID from key + userId: c.actorKey[0], // Extract user ID from key loginTime: Date.now(), preferences: {} }), diff --git a/website/src/content/docs/actors/metadata.mdx b/website/src/content/docs/actors/metadata.mdx index 6a86c80f19..db7b0107c4 100644 --- a/website/src/content/docs/actors/metadata.mdx +++ b/website/src/content/docs/actors/metadata.mdx @@ -33,7 +33,7 @@ const example = actor({ state: {}, actions: { getName: (c) => { - const actorName = c.name; + const actorName = c.actorName; return actorName; }, }, @@ -53,7 +53,7 @@ const example = actor({ state: {}, actions: { getKey: (c) => { - const actorKey = c.key; + const actorKey = c.actorKey; return actorKey; }, }, @@ -101,8 +101,8 @@ const chatRoom = actor({ getMetadata: (c) => { return { actorId: c.actorId, - name: c.name, - key: c.key, + name: c.actorName, + key: c.actorKey, region: c.region, }; }, diff --git a/website/src/content/docs/actors/state.mdx b/website/src/content/docs/actors/state.mdx index 72a22beb00..b32e75b67a 100644 --- a/website/src/content/docs/actors/state.mdx +++ b/website/src/content/docs/actors/state.mdx @@ -412,13 +412,13 @@ const userActor = actor({ // Open a connection to the external database and load initial data on every start createVars: async (c) => { const pool = new Pool({ connectionString: process.env.DATABASE_URL }); - const result = await pool.query("SELECT * FROM users WHERE id = $1", [c.key[0]]); + const result = await pool.query("SELECT * FROM users WHERE id = $1", [c.actorKey[0]]); return { pool, profile: result.rows[0] }; }, actions: { updateEmail: async (c, email: string) => { - await c.vars.pool.query("UPDATE users SET email = $1 WHERE id = $2", [email, c.key[0]]); + await c.vars.pool.query("UPDATE users SET email = $1 WHERE id = $2", [email, c.actorKey[0]]); } } }); diff --git a/website/src/content/docs/actors/types.mdx b/website/src/content/docs/actors/types.mdx index 2b6e5a5d93..6904c593ba 100644 --- a/website/src/content/docs/actors/types.mdx +++ b/website/src/content/docs/actors/types.mdx @@ -98,10 +98,8 @@ Each lifecycle hook and handler has a corresponding `*ContextOf` type, exported | `onBeforeActionResponse` | `BeforeActionResponseContextOf` | | `actions.*` | `ActionContextOf` | | `run` | `RunContextOf` | -| `workflow` root context helpers | `WorkflowContextOf` | -| `workflow` loop helpers | `WorkflowLoopContextOf` | -| `workflow` branch helpers | `WorkflowBranchContextOf` | -| `workflow` standalone step helpers | `WorkflowStepContextOf` | +| `workflow` orchestration helpers (root, `loop`, `try`, `race`, `join` branches) | `WorkflowContextOf` | +| `workflow` `step` / `tryStep` run + rollback helpers | `WorkflowStepContextOf` | | `onRequest` | `RequestContextOf` | | `onWebSocket` | `WebSocketContextOf` | diff --git a/website/src/content/docs/actors/workflows.mdx b/website/src/content/docs/actors/workflows.mdx index 164b81e8a6..8e8043d4b3 100644 --- a/website/src/content/docs/actors/workflows.mdx +++ b/website/src/content/docs/actors/workflows.mdx @@ -23,7 +23,7 @@ Use this when you need a short multi-step sequence. ```ts index.ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; export const invoiceActor = actor({ state: { @@ -42,8 +42,8 @@ export const invoiceActor = actor({ calculateTax(subtotal), ); - await ctx.step("save-invoice", async () => - saveInvoice(ctx, subtotal, tax), + await ctx.step("save-invoice", async (step) => + saveInvoice(step, subtotal, tax), ); }), actions: { @@ -76,7 +76,7 @@ async function calculateTax(subtotal: number): Promise { } async function saveInvoice( - ctx: WorkflowContextOf, + ctx: WorkflowStepContextOf, subtotal: number, tax: number, ): Promise { @@ -125,7 +125,7 @@ This is the recommended workflow shape for most actor workloads. ```ts index.ts import { actor, queue, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; export const workflowCounter = actor({ state: { @@ -140,8 +140,8 @@ export const workflowCounter = actor({ await ctx.loop("counter-loop", async (loopCtx) => { const message = await loopCtx.queue.next("wait-counter-command"); - await loopCtx.step("apply-counter-command", async () => - applyCounterCommand(loopCtx, message.body.delta), + await loopCtx.step("apply-counter-command", async (step) => + applyCounterCommand(step, message.body.delta), ); }); @@ -152,7 +152,7 @@ export const workflowCounter = actor({ }); async function applyCounterCommand( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, delta: number, ): Promise { const response = await fetch("https://api.example.com/counter/apply", { @@ -199,7 +199,7 @@ Use this when the workflow should initialize resources, process queued commands, ```ts index.ts import { actor, queue, setup } from "rivetkit"; -import { Loop, type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { Loop, type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; type WorkMessage = { amount: number }; type ControlMessage = { type: "stop"; reason: string }; @@ -217,7 +217,7 @@ export const setupRunTeardownActor = actor({ control: queue(), }, run: workflow(async (ctx) => { - await ctx.step("setup", async () => setupWorkerSession(ctx)); + await ctx.step("setup", async (step) => setupWorkerSession(step)); const stopReason = await ctx.loop("worker-loop", async (loopCtx) => { const message = await loopCtx.queue.next("wait-command", { @@ -226,8 +226,8 @@ export const setupRunTeardownActor = actor({ if (message.name === "work") { const work = message.body as WorkMessage; - await loopCtx.step("apply-work", async () => - applyWorkerMessage(loopCtx, work), + await loopCtx.step("apply-work", async (step) => + applyWorkerMessage(step, work), ); return; } @@ -239,8 +239,8 @@ export const setupRunTeardownActor = actor({ }); - await ctx.step("teardown", async () => - teardownWorkerSession(ctx, stopReason), + await ctx.step("teardown", async (step) => + teardownWorkerSession(step, stopReason), ); }), actions: { @@ -249,7 +249,7 @@ export const setupRunTeardownActor = actor({ }); async function setupWorkerSession( - ctx: WorkflowContextOf, + ctx: WorkflowStepContextOf, ): Promise { const response = await fetch("https://api.example.com/workers/session", { method: "POST", @@ -264,7 +264,7 @@ async function setupWorkerSession( } async function applyWorkerMessage( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, work: WorkMessage, ): Promise { const response = await fetch("https://api.example.com/workers/process", { @@ -286,7 +286,7 @@ async function applyWorkerMessage( } async function teardownWorkerSession( - ctx: WorkflowContextOf, + ctx: WorkflowStepContextOf, stopReason: string, ): Promise { if (ctx.state.workerSessionId) { @@ -336,7 +336,7 @@ Use this when the caller needs a response from queued processing. ```ts index.ts import { actor, queue, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; export const requestResponseActor = actor({ state: { @@ -353,8 +353,8 @@ export const requestResponseActor = actor({ if (!message.complete) return; - const doubled = await loopCtx.step("handle-request", async () => { - loopCtx.state.handled += 1; + const doubled = await loopCtx.step("handle-request", async (step) => { + step.state.handled += 1; return message.body.value * 2; }); @@ -393,7 +393,7 @@ Use queue messages as the trigger source, then sleep durably inside the workflow ```ts index.ts import { actor, queue, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; type Reminder = { text: string; @@ -414,8 +414,8 @@ export const reminderActor = actor({ const runAt = Math.max(Date.now(), message.body.at); await loopCtx.sleepUntil("wait-until-reminder", runAt); - await loopCtx.step("record-reminder", async () => { - loopCtx.state.fired.push(message.body.text); + await loopCtx.step("record-reminder", async (step) => { + step.state.fired.push(message.body.text); }); }); @@ -452,7 +452,7 @@ Use `join` when several independent tasks can run in parallel. ```ts index.ts import { actor, queue, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; export const dashboardActor = actor({ state: { @@ -487,8 +487,8 @@ export const dashboardActor = actor({ }, }); - await loopCtx.step("save-summary", async () => { - loopCtx.state.summary = summary; + await loopCtx.step("save-summary", async (step) => { + step.state.summary = summary; }); }); @@ -526,7 +526,7 @@ Use `race` when you need first-winner behavior. ```ts index.ts import { actor, queue, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; export const auctionActor = actor({ state: { result: null as "sold" | "expired" | null }, @@ -553,9 +553,9 @@ export const auctionActor = actor({ }, ]); - await ctx.step("finalize", async () => { + await ctx.step("finalize", async (step) => { await finalizeAuction("item-123", winner); - ctx.state.result = winner === "bid" ? "sold" : "expired"; + step.state.result = winner === "bid" ? "sold" : "expired"; }); }), actions: { @@ -600,7 +600,7 @@ Use step timeouts and retries for slow or flaky dependencies. ```ts import { actor, queue, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; async function chargeCard(orderId: string): Promise { return `charge-${orderId}`; @@ -626,8 +626,8 @@ export const timeoutActor = actor({ run: async () => await chargeCard(message.body.orderId), }); - await loopCtx.step("save-charge", async () => { - loopCtx.state.lastChargeId = chargeId; + await loopCtx.step("save-charge", async (step) => { + step.state.lastChargeId = chargeId; }); }); @@ -657,15 +657,15 @@ export const paymentActor = actor({ run: async () => await chargeCard("order-123"), }); - await ctx.step("store-charge-result", async () => { + await ctx.step("store-charge-result", async (step) => { if (!charge.ok) { - ctx.state.status = "manual-review"; - ctx.state.reason = charge.failure.error.message; + step.state.status = "manual-review"; + step.state.reason = charge.failure.error.message; return; } - ctx.state.status = "paid"; - ctx.state.reason = null; + step.state.status = "paid"; + step.state.reason = null; }); }), actions: { @@ -760,7 +760,7 @@ Use rollback checkpoints before steps that have compensating actions. ```ts import { actor, queue, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; export const checkoutActor = actor({ state: { status: "pending" as string }, @@ -789,8 +789,8 @@ export const checkoutActor = actor({ }, }); - await loopCtx.step("confirm", async () => { - loopCtx.state.status = "confirmed"; + await loopCtx.step("confirm", async (step) => { + step.state.status = "confirmed"; }); }); @@ -843,7 +843,7 @@ Store progress in `state` so replay and recovery always restore it. Broadcast st ```ts index.ts import { actor, event, queue, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; type Progress = { stage: "idle" | "running" | "completed"; @@ -870,12 +870,12 @@ export const progressActor = actor({ await ctx.loop("progress-loop", async (loopCtx) => { const message = await loopCtx.queue.next("wait-job"); - await loopCtx.step("mark-running", async () => - markProgressRunning(loopCtx), + await loopCtx.step("mark-running", async (step) => + markProgressRunning(step), ); - await loopCtx.step("apply-job", async () => - applyProgressJob(loopCtx, message.body.value), + await loopCtx.step("apply-job", async (step) => + applyProgressJob(step, message.body.value), ); }); @@ -885,7 +885,7 @@ export const progressActor = actor({ }, }); -function markProgressRunning(ctx: WorkflowLoopContextOf): void { +function markProgressRunning(ctx: WorkflowStepContextOf): void { ctx.state.progress = { stage: "running", completed: ctx.state.progress.completed, @@ -895,7 +895,7 @@ function markProgressRunning(ctx: WorkflowLoopContextOf): } function applyProgressJob( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, value: number, ): void { ctx.state.sum += value; @@ -936,7 +936,7 @@ Rivet scheduling triggers actions. For cron-like workflows, use a small schedule ```ts import { actor, queue, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; function nextMinute(timestamp: number): number { const minuteMs = 60_000; @@ -968,9 +968,9 @@ export const cronActor = actor({ await ctx.loop("cron-loop", async (loopCtx) => { const message = await loopCtx.queue.next("wait-cron-tick"); - await loopCtx.step("run-cron-job", async () => { - loopCtx.state.runs += 1; - loopCtx.state.lastRunAt = message.body.scheduledAt; + await loopCtx.step("run-cron-job", async (step) => { + step.state.runs += 1; + step.state.lastRunAt = message.body.scheduledAt; }); }); @@ -989,7 +989,7 @@ Use this when external systems enqueue work and the actor should process each it ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; type Job = { id: string; amount: number }; @@ -1007,9 +1007,9 @@ export const queueWorkerActor = actor({ if (!message) return; const job = message.body as Job; - await loopCtx.step("process-job", async () => { - loopCtx.state.processed += 1; - loopCtx.state.totalAmount += job.amount; + await loopCtx.step("process-job", async (step) => { + step.state.processed += 1; + step.state.totalAmount += job.amount; }); }); @@ -1028,7 +1028,7 @@ Use this when you need one-time initialization before a long-lived loop, plus cl ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; function openResource(): string { return "connected"; @@ -1053,15 +1053,15 @@ export const setupRunTeardownActor = actor({ c.vars.resource = null; }, run: workflow(async (ctx) => { - await ctx.step("setup", async () => { - if (!ctx.vars.resource) ctx.vars.resource = openResource(); - ctx.state.initialized = true; + await ctx.step("setup", async (step) => { + if (!step.vars.resource) step.vars.resource = openResource(); + step.state.initialized = true; }); await ctx.loop("main-loop", async (loopCtx) => { await loopCtx.sleep("tick", 1_000); - await loopCtx.step("tick-step", async () => { - loopCtx.state.ticks += 1; + await loopCtx.step("tick-step", async (step) => { + step.state.ticks += 1; }); }); }), @@ -1079,7 +1079,7 @@ Use this when an operation must pause for a user or system decision before conti ```ts import { actor, queue, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; export const approvalGateActor = actor({ state: { status: "pending" as string }, @@ -1087,22 +1087,22 @@ export const approvalGateActor = actor({ approval: queue<{ approved: boolean }>(), }, run: workflow(async (ctx) => { - await ctx.step("validate-order", async () => { + await ctx.step("validate-order", async (step) => { await validateOrder("order-123"); - ctx.state.status = "awaiting_approval"; + step.state.status = "awaiting_approval"; }); const decision = await ctx.queue.next("wait-approval"); if (decision.body.approved) { - await ctx.step("fulfill-order", async () => { + await ctx.step("fulfill-order", async (step) => { await fulfillOrder("order-123"); - ctx.state.status = "fulfilled"; + step.state.status = "fulfilled"; }); } else { - await ctx.step("cancel-order", async () => { + await ctx.step("cancel-order", async (step) => { await cancelOrder("order-123"); - ctx.state.status = "cancelled"; + step.state.status = "cancelled"; }); } }), @@ -1140,7 +1140,7 @@ Use this when independent work items can run in parallel and you need a single m ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; export const fanInOutActor = actor({ state: { @@ -1169,8 +1169,8 @@ export const fanInOutActor = actor({ }, }); - await loopCtx.step("merge-results", async () => { - loopCtx.state.total = + await loopCtx.step("merge-results", async (step) => { + step.state.total = joined.users + joined.orders + joined.invoices; }); @@ -1196,7 +1196,7 @@ Use this when throughput matters and handling one message at a time is too expen ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; type MetricMessage = { value: number }; @@ -1212,16 +1212,19 @@ export const batchDrainerActor = actor({ timeout: 5_000, }); - if (message) { - const metric = message.body as MetricMessage; - await loopCtx.step("buffer-message", async () => { - loopCtx.state.pending.push(metric.value); - }); - } + const pendingCount = await loopCtx.step( + "buffer-message", + async (step) => { + if (message) { + step.state.pending.push((message.body as MetricMessage).value); + } + return step.state.pending.length; + }, + ); - if (loopCtx.state.pending.length < 5) return; + if (pendingCount < 5) return; - await loopCtx.step("flush-batch", async () => flushBatch(loopCtx)); + await loopCtx.step("flush-batch", async (step) => flushBatch(step)); }); }), @@ -1230,7 +1233,7 @@ export const batchDrainerActor = actor({ }, }); -function flushBatch(ctx: WorkflowLoopContextOf): void { +function flushBatch(ctx: WorkflowStepContextOf): void { const total = ctx.state.pending.reduce( (sum: number, value: number) => sum + value, 0, @@ -1249,7 +1252,7 @@ Use this when one actor orchestrates work by calling actions on other actors. ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; type TaskMessage = { taskId: string; @@ -1277,13 +1280,13 @@ export const coordinatorActor = actor({ if (!message) return; const task = message.body as TaskMessage; - const result = await loopCtx.step("dispatch-rpc", async () => - dispatchTask(loopCtx, task), + const result = await loopCtx.step("dispatch-rpc", async (step) => + dispatchTask(step, task), ); - await loopCtx.step("record-result", async () => { - loopCtx.state.lastTaskId = task.taskId; - loopCtx.state.lastResult = result as number; + await loopCtx.step("record-result", async (step) => { + step.state.lastTaskId = task.taskId; + step.state.lastResult = result as number; }); }); @@ -1294,7 +1297,7 @@ export const coordinatorActor = actor({ }); async function dispatchTask( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, task: TaskMessage, ): Promise { const client = ctx.client(); @@ -1312,7 +1315,7 @@ Use this when you want decoupled actor-to-actor communication with durable waits ```ts index.ts import { actor, queue, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; type RequestMessage = { value: number }; @@ -1331,8 +1334,8 @@ export const requestResponseActor = actor({ if (!message.complete) return; - const doubled = await loopCtx.step("handle-request", async () => { - loopCtx.state.handled += 1; + const doubled = await loopCtx.step("handle-request", async (step) => { + step.state.handled += 1; return message.body.value * 2; }); @@ -1366,7 +1369,7 @@ Use this when multiple actors can process independent parts of a request in para ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; type ScatterMessage = { input: number }; @@ -1392,26 +1395,26 @@ export const scatterGatherActor = actor({ const gathered = await loopCtx.join("gather", { shardA: { run: async (joinCtx) => - await joinCtx.step("call-shard-a", async () => - callShard(joinCtx, "a", scatter.input), + await joinCtx.step("call-shard-a", async (step) => + callShard(step, "a", scatter.input), ), }, shardB: { run: async (joinCtx) => - await joinCtx.step("call-shard-b", async () => - callShard(joinCtx, "b", scatter.input), + await joinCtx.step("call-shard-b", async (step) => + callShard(step, "b", scatter.input), ), }, shardC: { run: async (joinCtx) => - await joinCtx.step("call-shard-c", async () => - callShard(joinCtx, "c", scatter.input), + await joinCtx.step("call-shard-c", async (step) => + callShard(step, "c", scatter.input), ), }, }); - await loopCtx.step("aggregate", async () => { - loopCtx.state.lastSum = gathered.shardA + gathered.shardB + gathered.shardC; + await loopCtx.step("aggregate", async (step) => { + step.state.lastSum = gathered.shardA + gathered.shardB + gathered.shardC; }); }); @@ -1422,7 +1425,7 @@ export const scatterGatherActor = actor({ }); async function callShard( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, shardId: "a" | "b" | "c", input: number, ): Promise { @@ -1440,7 +1443,7 @@ Use this when a primary actor call might be slow or unavailable and you need a d ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; export const primaryServiceActor = actor({ actions: { @@ -1472,8 +1475,8 @@ export const timeoutFallbackActor = actor({ { name: "primary", run: async (raceCtx) => - await raceCtx.step("call-primary", async () => - callPrimaryValue(raceCtx), + await raceCtx.step("call-primary", async (step) => + callPrimaryValue(step), ), }, { @@ -1489,15 +1492,15 @@ export const timeoutFallbackActor = actor({ let source: "primary" | "fallback" = "primary"; if (winner.winner === "timeout") { - value = (await loopCtx.step("fallback-call", async () => - callFallbackValue(loopCtx), + value = (await loopCtx.step("fallback-call", async (step) => + callFallbackValue(step), )) as string; source = "fallback"; } - await loopCtx.step("record-choice", async () => { - loopCtx.state.lastSource = source; - loopCtx.state.lastValue = value; + await loopCtx.step("record-choice", async (step) => { + step.state.lastSource = source; + step.state.lastValue = value; }); }); @@ -1508,7 +1511,7 @@ export const timeoutFallbackActor = actor({ }); async function callPrimaryValue( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, ): Promise { const client = ctx.client(); const primary = client.primaryServiceActor.getOrCreate(["main"]); @@ -1516,7 +1519,7 @@ async function callPrimaryValue( } async function callFallbackValue( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, ): Promise { const client = ctx.client(); const fallback = client.fallbackServiceActor.getOrCreate(["main"]); @@ -1534,7 +1537,7 @@ Use this when a workflow spans multiple actors and each side effect may need com ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; type CheckoutMessage = { orderId: string; @@ -1572,21 +1575,24 @@ export const checkoutSagaActor = actor({ await loopCtx.step({ name: "reserve-inventory", - run: async () => reserveInventoryForCheckout(loopCtx, checkout.orderId), - rollback: async (_rollbackCtx, output) => { - await releaseInventoryForCheckout(loopCtx, output as string); + run: async (step) => + reserveInventoryForCheckout(step, checkout.orderId), + rollback: async (step, output) => { + await releaseInventoryForCheckout(step, output as string); }, }); await loopCtx.step({ name: "charge-card", - run: async () => chargeCheckout(loopCtx, checkout.amount), - rollback: async (_rollbackCtx, output) => { - await refundCheckout(loopCtx, output as string); + run: async (step) => chargeCheckout(step, checkout.amount), + rollback: async (step, output) => { + await refundCheckout(step, output as string); }, }); - await loopCtx.step("mark-complete", async () => markOrderComplete(loopCtx)); + await loopCtx.step("mark-complete", async (step) => + markOrderComplete(step), + ); }); }), @@ -1596,7 +1602,7 @@ export const checkoutSagaActor = actor({ }); async function reserveInventoryForCheckout( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, orderId: string, ): Promise { const client = ctx.client(); @@ -1605,7 +1611,7 @@ async function reserveInventoryForCheckout( } async function releaseInventoryForCheckout( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, reservationId: string, ): Promise { const client = ctx.client(); @@ -1614,7 +1620,7 @@ async function releaseInventoryForCheckout( } async function chargeCheckout( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, amount: number, ): Promise { const client = ctx.client(); @@ -1623,7 +1629,7 @@ async function chargeCheckout( } async function refundCheckout( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, chargeId: string, ): Promise { const client = ctx.client(); @@ -1632,7 +1638,7 @@ async function refundCheckout( } function markOrderComplete( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, ): void { ctx.state.completedOrders += 1; } @@ -1648,7 +1654,7 @@ Use this when workflow progress should be triggered by commands/events instead o ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; type ControlSignal = { kind: "pause" | "resume" | "stop" }; @@ -1666,8 +1672,8 @@ export const controlLoopActor = actor({ if (!message) return; const signal = message.body as ControlSignal; - await loopCtx.step("apply-signal", async () => - applyControlSignal(loopCtx, signal.kind), + await loopCtx.step("apply-signal", async (step) => + applyControlSignal(step, signal.kind), ); }); @@ -1678,7 +1684,7 @@ export const controlLoopActor = actor({ }); function applyControlSignal( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, kind: ControlSignal["kind"], ): void { ctx.state.handledSignals += 1; @@ -1696,7 +1702,7 @@ Use this when an external dependency has variable availability and retries shoul ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; async function pollExternal(attempt: number): Promise { return attempt % 3 === 0; @@ -1710,26 +1716,27 @@ export const pollBackoffActor = actor({ }, run: workflow(async (ctx) => { await ctx.loop("poll-loop", async (loopCtx) => { - const success = await loopCtx.step("poll-target", async () => { - loopCtx.state.attempts += 1; - return pollExternal(loopCtx.state.attempts); + const success = await loopCtx.step("poll-target", async (step) => { + step.state.attempts += 1; + return pollExternal(step.state.attempts); }); if (success) { - await loopCtx.step("reset-backoff", async () => { - loopCtx.state.status = "healthy"; - loopCtx.state.backoffMs = 100; + await loopCtx.step("reset-backoff", async (step) => { + step.state.status = "healthy"; + step.state.backoffMs = 100; }); await loopCtx.sleep("healthy-interval", 1_000); return; } - await loopCtx.step("grow-backoff", async () => { - loopCtx.state.status = "retrying"; - loopCtx.state.backoffMs = Math.min(loopCtx.state.backoffMs * 2, 5_000); + const backoffMs = await loopCtx.step("grow-backoff", async (step) => { + step.state.status = "retrying"; + step.state.backoffMs = Math.min(step.state.backoffMs * 2, 5_000); + return step.state.backoffMs; }); - await loopCtx.sleep("retry-delay", loopCtx.state.backoffMs); + await loopCtx.sleep("retry-delay", backoffMs); }); }), actions: { @@ -1746,7 +1753,7 @@ Use this when one workflow coordinates many child workers (actors or worker work ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; type BatchMessage = { payload: number }; @@ -1761,7 +1768,7 @@ export const orchestratorActor = actor({ lastTotal: 0, }, run: workflow(async (ctx) => { - await ctx.step("start-children", async () => startChildren(ctx)); + await ctx.step("start-children", async (step) => startChildren(step)); await ctx.loop("orchestrate-loop", async (loopCtx) => { const [message] = await loopCtx.queue.nextBatch("wait-batch", { @@ -1774,26 +1781,26 @@ export const orchestratorActor = actor({ const results = await loopCtx.join("collect-updates", { a: { run: async (joinCtx) => - await joinCtx.step("run-child-a", async () => - runChildWorker(joinCtx, "child-a", batch.payload), + await joinCtx.step("run-child-a", async (step) => + runChildWorker(step, "child-a", batch.payload), ), }, b: { run: async (joinCtx) => - await joinCtx.step("run-child-b", async () => - runChildWorker(joinCtx, "child-b", batch.payload), + await joinCtx.step("run-child-b", async (step) => + runChildWorker(step, "child-b", batch.payload), ), }, c: { run: async (joinCtx) => - await joinCtx.step("run-child-c", async () => - runChildWorker(joinCtx, "child-c", batch.payload), + await joinCtx.step("run-child-c", async (step) => + runChildWorker(step, "child-c", batch.payload), ), }, }); - await loopCtx.step("reconcile", async () => { - loopCtx.state.lastTotal = results.a + results.b + results.c; + await loopCtx.step("reconcile", async (step) => { + step.state.lastTotal = results.a + results.b + results.c; }); }); @@ -1804,7 +1811,7 @@ export const orchestratorActor = actor({ }); async function startChildren( - ctx: WorkflowContextOf, + ctx: WorkflowStepContextOf, ): Promise { const client = ctx.client(); await client.childWorkerActor.getOrCreate(["child-a"]).process(0); @@ -1813,7 +1820,7 @@ async function startChildren( } async function runChildWorker( - ctx: WorkflowBranchContextOf, + ctx: WorkflowStepContextOf, workerId: "child-a" | "child-b" | "child-c", payload: number, ): Promise { @@ -1830,7 +1837,7 @@ Use this when inbound work can spike and you need predictable per-iteration limi ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; type WorkMessage = { id: string; value: number }; @@ -1877,8 +1884,8 @@ export const boundedDrainActor = actor({ if (window.length === 0) return; - await loopCtx.step("process-window", async () => - processWindow(loopCtx, window), + await loopCtx.step("process-window", async (step) => + processWindow(step, window), ); }); @@ -1889,7 +1896,7 @@ export const boundedDrainActor = actor({ }); async function processWindow( - ctx: WorkflowLoopContextOf, + ctx: WorkflowStepContextOf, window: WorkMessage[], ): Promise { let windowTotal = 0; @@ -1912,23 +1919,23 @@ Use this when workflow structure changes across deployments and old histories mu ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; export const versionedWorkflowActor = actor({ state: { runs: 0, }, run: workflow(async (ctx) => { - await ctx.step("validate-v2", async () => { - ctx.state.runs += 1; + await ctx.step("validate-v2", async (step) => { + step.state.runs += 1; }); await ctx.removed("validate-v1", "step"); await ctx.loop("main-loop-v2", async (loopCtx) => { await loopCtx.sleep("idle", 500); - await loopCtx.step("heartbeat-v2", async () => { - loopCtx.state.runs += 1; + await loopCtx.step("heartbeat-v2", async (step) => { + step.state.runs += 1; }); }); }), @@ -1946,7 +1953,7 @@ Use this when you need reliable replay and resume semantics across crashes and r ```ts import { actor, setup } from "rivetkit"; -import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow"; +import { type WorkflowStepContextOf, workflow } from "rivetkit/workflow"; type PaymentMessage = { id: string; amount: number }; @@ -1971,10 +1978,10 @@ export const checkpointFriendlyActor = actor({ buildPaymentPlan(payment), )) as { paymentId: string; amount: number }; - await loopCtx.step("apply-side-effects", async () => { - loopCtx.state.appliedCount += 1; - loopCtx.state.totalAmount += plan.amount; - loopCtx.state.lastPaymentId = plan.paymentId; + await loopCtx.step("apply-side-effects", async (step) => { + step.state.appliedCount += 1; + step.state.totalAmount += plan.amount; + step.state.lastPaymentId = plan.paymentId; }); }); diff --git a/website/src/content/docs/clients/javascript.mdx b/website/src/content/docs/clients/javascript.mdx index fb2247c2a2..8a38cbc3e4 100644 --- a/website/src/content/docs/clients/javascript.mdx +++ b/website/src/content/docs/clients/javascript.mdx @@ -218,7 +218,7 @@ import { actor, setup } from "rivetkit"; export const chatRoom = actor({ state: { messages: [] as string[] }, actions: { - getRoomInfo: (c) => ({ org: c.key[0], room: c.key[1] }), + getRoomInfo: (c) => ({ org: c.actorKey[0], room: c.actorKey[1] }), }, }); diff --git a/website/src/content/docs/clients/react.mdx b/website/src/content/docs/clients/react.mdx index d23c9d031c..71e1bfb1c1 100644 --- a/website/src/content/docs/clients/react.mdx +++ b/website/src/content/docs/clients/react.mdx @@ -235,7 +235,7 @@ import { actor, setup } from "rivetkit"; export const chatRoom = actor({ state: { messages: [] as string[] }, actions: { - getRoomInfo: (c) => ({ org: c.key[0], room: c.key[1] }), + getRoomInfo: (c) => ({ org: c.actorKey[0], room: c.actorKey[1] }), }, });