From 93ea41327e80a4d6f86f40efa95bfefe4ff376c3 Mon Sep 17 00:00:00 2001 From: Caio Nogueira Date: Mon, 15 Jun 2026 20:39:04 +0100 Subject: [PATCH] WOR-1263: migrate workflow wrapped binding into jsrpc Gates the JSRPC transport for the env.WORKFLOW binding while the RPC callee is rolled out. Experimental opt-in only (no enable date); the legacy HTTP transport remains the default. feat(workflows): route binding through JSRPC behind compat flag When workflows_bindings_rpc is enabled, WorkflowImpl and InstanceImpl dispatch their methods as JSRPC calls directly on the inner fetcher instead of issuing HTTP requests. The wrapper classes are preserved on both transports (results are always wrapped in InstanceImpl), so prototypes and instanceof stability are unchanged. The Fetcher interface now declares the RPC method surface the callee must implement. The legacy callFetcher HTTP path is retained for un-flagged workers. test(workflows): cover both binding transports end-to-end Upgrade the mock to a dual-transport WorkerEntrypoint (RPC methods + fetch handler) sharing one set of business-logic helpers so both transports behave identically, including an error-trigger id. The wd-test now runs the same suite against a flag-off (HTTP) service and a flag-on (RPC) service pointed at the WorkflowsMock entrypoint. Tests add instance-method, prototype-preservation, and error-propagation coverage. --- .../test/workflows/workflows-api-rpc-test.js | 118 +++++++++++++ .../test/workflows/workflows-api-test.js | 32 ++++ .../test/workflows/workflows-api-test.wd-test | 28 ++- .../internal/test/workflows/workflows-mock.js | 160 +++++++++++------- src/cloudflare/internal/workflows-api.ts | 70 ++++++-- src/workerd/io/compatibility-date.capnp | 8 + 6 files changed, 344 insertions(+), 72 deletions(-) create mode 100644 src/cloudflare/internal/test/workflows/workflows-api-rpc-test.js diff --git a/src/cloudflare/internal/test/workflows/workflows-api-rpc-test.js b/src/cloudflare/internal/test/workflows/workflows-api-rpc-test.js new file mode 100644 index 00000000000..215d206afb8 --- /dev/null +++ b/src/cloudflare/internal/test/workflows/workflows-api-rpc-test.js @@ -0,0 +1,118 @@ +// Copyright (c) 2024 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +import * as assert from 'node:assert'; + +async function lastRestartBody(env, id) { + return env.mock.lastRestart(id); +} + +export const tests = { + async test(_, env) { + { + const instance = await env.workflow.create({ + id: 'foo', + payload: { bar: 'baz' }, + }); + assert.deepStrictEqual(instance.id, 'foo'); + } + + { + const instance = await env.workflow.get('bar'); + assert.deepStrictEqual(instance.id, 'bar'); + } + + { + const instances = await env.workflow.createBatch([ + { id: 'foo', payload: { bar: 'baz' } }, + { id: 'bar', payload: { bar: 'baz' } }, + ]); + assert.deepStrictEqual(instances[0].id, 'foo'); + assert.deepStrictEqual(instances[1].id, 'bar'); + } + + { + const instance = await env.workflow.get('inst'); + await instance.pause(); + await instance.resume(); + await instance.terminate(); + await instance.sendEvent({ + type: 'my-event', + payload: { hello: 'world' }, + }); + } + + { + const instance = await env.workflow.get('status-rpc'); + const status = await instance.status(); + assert.deepStrictEqual(status.status, 'running'); + assert.strictEqual(status.transport, 'rpc'); + assert.strictEqual(status.output, 'status-rpc'); + } + + { + for (const method of ['get', 'create', 'createBatch']) { + assert.strictEqual(typeof env.workflow[method], 'function'); + } + + const fromGet = await env.workflow.get('a'); + const fromCreate = await env.workflow.create({ id: 'b' }); + const [fromBatch] = await env.workflow.createBatch([{ id: 'c' }]); + + const proto = Object.getPrototypeOf(fromGet); + assert.strictEqual(Object.getPrototypeOf(fromCreate), proto); + assert.strictEqual(Object.getPrototypeOf(fromBatch), proto); + + for (const method of [ + 'pause', + 'resume', + 'terminate', + 'restart', + 'status', + 'sendEvent', + ]) { + assert.strictEqual(typeof fromGet[method], 'function'); + } + } + + { + await assert.rejects(env.workflow.get('throw'), { + message: 'workflow instance not found', + }); + } + }, + + async testRestartNoOptions(_, env) { + const instance = await env.workflow.get('rpc-restart-basic'); + await instance.restart(); + + const body = await lastRestartBody(env, 'rpc-restart-basic'); + assert.deepStrictEqual(body.id, 'rpc-restart-basic'); + assert.strictEqual(body.from, undefined); + }, + + async testRestartFromStepNameOnly(_, env) { + const instance = await env.workflow.get('rpc-restart-step'); + await instance.restart({ from: { name: 'fetch data' } }); + + const body = await lastRestartBody(env, 'rpc-restart-step'); + assert.deepStrictEqual(body.id, 'rpc-restart-step'); + assert.deepStrictEqual(body.from, { name: 'fetch data' }); + }, + + async testRestartFromStepAllOptions(_, env) { + const instance = await env.workflow.get('rpc-restart-full'); + await instance.restart({ + from: { name: 'process item', count: 3, type: 'do' }, + }); + + const body = await lastRestartBody(env, 'rpc-restart-full'); + assert.deepStrictEqual(body.id, 'rpc-restart-full'); + assert.deepStrictEqual(body.from, { + name: 'process item', + count: 3, + type: 'do', + }); + }, +}; diff --git a/src/cloudflare/internal/test/workflows/workflows-api-test.js b/src/cloudflare/internal/test/workflows/workflows-api-test.js index 662acf76b01..6cd31067478 100644 --- a/src/cloudflare/internal/test/workflows/workflows-api-test.js +++ b/src/cloudflare/internal/test/workflows/workflows-api-test.js @@ -44,6 +44,38 @@ export const tests = { assert.deepStrictEqual(instances[0].id, 'foo'); assert.deepStrictEqual(instances[1].id, 'bar'); } + + { + const instance = await env.workflow.get('status-http'); + const status = await instance.status(); + assert.deepStrictEqual(status.status, 'running'); + assert.strictEqual(status.transport, 'http'); + } + + { + for (const method of ['get', 'create', 'createBatch']) { + assert.strictEqual(typeof env.workflow[method], 'function'); + } + + const fromGet = await env.workflow.get('a'); + const fromCreate = await env.workflow.create({ id: 'b' }); + const [fromBatch] = await env.workflow.createBatch([{ id: 'c' }]); + + const proto = Object.getPrototypeOf(fromGet); + assert.strictEqual(Object.getPrototypeOf(fromCreate), proto); + assert.strictEqual(Object.getPrototypeOf(fromBatch), proto); + + for (const method of [ + 'pause', + 'resume', + 'terminate', + 'restart', + 'status', + 'sendEvent', + ]) { + assert.strictEqual(typeof fromGet[method], 'function'); + } + } }, async testRestartNoOptions(_, env) { diff --git a/src/cloudflare/internal/test/workflows/workflows-api-test.wd-test b/src/cloudflare/internal/test/workflows/workflows-api-test.wd-test index cf29c59e4d4..4b6b03854d3 100644 --- a/src/cloudflare/internal/test/workflows/workflows-api-test.wd-test +++ b/src/cloudflare/internal/test/workflows/workflows-api-test.wd-test @@ -26,9 +26,35 @@ const unitTests :Workerd.Config = ( ], ) ), + ( name = "workflows-api-rpc-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "workflows-api-rpc-test.js") + ], + compatibilityFlags = ["nodejs_compat", "workflows_bindings_rpc", + "experimental", "service_binding_extra_handlers", + "rpc"], + bindings = [ + ( + name = "workflow", + wrapped = ( + moduleName = "cloudflare-internal:workflows-api", + innerBindings = [( + name = "fetcher", + service = "workflows-mock" + )], + ) + ), + ( + name = "mock", + service = "workflows-mock" + ) + ], + ) + ), ( name = "workflows-mock", worker = ( - compatibilityFlags = ["nodejs_compat"], + compatibilityFlags = ["experimental", "nodejs_compat"], modules = [ (name = "worker", esModule = embed "workflows-mock.js") ], diff --git a/src/cloudflare/internal/test/workflows/workflows-mock.js b/src/cloudflare/internal/test/workflows/workflows-mock.js index 055297c861d..7cd53df5514 100644 --- a/src/cloudflare/internal/test/workflows/workflows-mock.js +++ b/src/cloudflare/internal/test/workflows/workflows-mock.js @@ -2,70 +2,110 @@ // Licensed under the Apache 2.0 license found in the LICENSE file or at: // https://opensource.org/licenses/Apache-2.0 +import { WorkerEntrypoint } from 'cloudflare:workers'; + const restartBodies = new Map(); -export default { - async fetch(request, env, ctx) { - const data = await request.json(); - const reqUrl = new URL(request.url); - - if (reqUrl.pathname === '/get' && request.method === 'POST') { - return Response.json( - { - result: { - id: data.id, - }, - }, - { - status: 200, - headers: { - 'content-type': 'application/json', - }, - } - ); - } +const THROW_ID = 'throw'; - if (reqUrl.pathname === '/create' && request.method === 'POST') { - return Response.json( - { - result: { - id: data.id, - }, - }, - { - status: 201, - headers: { - 'content-type': 'application/json', - }, - } - ); - } +function getInstance(id) { + if (id === THROW_ID) { + throw new Error('workflow instance not found'); + } + return { id }; +} - if (reqUrl.pathname === '/createBatch' && request.method === 'POST') { - return Response.json( - { - result: data.map((val) => ({ id: val.id })), - }, - { - status: 201, - headers: { - 'content-type': 'application/json', - }, - } - ); - } +function createInstance(options) { + return { id: options?.id }; +} - if (reqUrl.pathname === '/restart' && request.method === 'POST') { - restartBodies.set(data.id, data); - return Response.json({}, { status: 200 }); - } +function createBatchInstances(options) { + return options.map((val) => ({ id: val.id })); +} - // Test-only: returns the body from the last /restart call for a given id - if (reqUrl.pathname === '/last-restart' && request.method === 'POST') { - return Response.json( - { result: restartBodies.get(data.id) ?? null }, - { status: 200 } - ); +function instanceStatus(id, transport) { + return { status: 'running', output: id, transport }; +} + +async function handleHttp(request) { + const data = await request.json(); + const reqUrl = new URL(request.url); + + if (request.method !== 'POST') { + return Response.json({ success: false }, { status: 500 }); + } + + try { + switch (reqUrl.pathname) { + case '/get': + return Response.json({ result: getInstance(data.id) }, { status: 200 }); + case '/create': + return Response.json({ result: createInstance(data) }, { status: 201 }); + case '/createBatch': + return Response.json( + { result: createBatchInstances(data) }, + { status: 201 } + ); + case '/pause': + case '/resume': + case '/terminate': + case '/send-event': + return Response.json({ result: null }, { status: 200 }); + case '/restart': + restartBodies.set(data.id, data); + return Response.json({ result: null }, { status: 200 }); + case '/status': + return Response.json( + { result: instanceStatus(data.id, 'http') }, + { status: 200 } + ); + case '/last-restart': + return Response.json( + { result: restartBodies.get(data.id) ?? null }, + { status: 200 } + ); + default: + return Response.json({ success: false }, { status: 404 }); } - }, -}; + } catch (err) { + return Response.json({ error: { message: err.message } }, { status: 500 }); + } +} + +export default class WorkflowsMock extends WorkerEntrypoint { + async getInstance(id) { + return getInstance(id); + } + + async create(options) { + return createInstance(options); + } + + async createBatch(options) { + return createBatchInstances(options); + } + + async pause(_id) {} + + async resume(_id) {} + + async terminate(_id) {} + + async restart(id, options) { + restartBodies.set(id, { ...options, id }); + } + + async status(id) { + return instanceStatus(id, 'rpc'); + } + + async sendEvent(_id, _event) {} + + async lastRestart(id) { + return restartBodies.get(id) ?? null; + } + + async fetch(request) { + return handleHttp(request); + } +} diff --git a/src/cloudflare/internal/workflows-api.ts b/src/cloudflare/internal/workflows-api.ts index 5af8b0617cb..38b063c91e2 100644 --- a/src/cloudflare/internal/workflows-api.ts +++ b/src/cloudflare/internal/workflows-api.ts @@ -9,8 +9,26 @@ export class NonRetryableError extends Error { } } +const workflowsBindingsRpc = + !!Cloudflare.compatibilityFlags['workflows_bindings_rpc']; + interface Fetcher { fetch: typeof fetch; + getInstance(id: string): Promise<{ id: string }>; + create(options?: WorkflowInstanceCreateOptions): Promise<{ id: string }>; + createBatch( + options: WorkflowInstanceCreateOptions[] + ): Promise<{ id: string }[]>; + + pause(id: string): Promise; + resume(id: string): Promise; + terminate(id: string): Promise; + restart(id: string, options?: WorkflowInstanceRestartOptions): Promise; + status(id: string): Promise; + sendEvent( + id: string, + event: { type: string; payload: unknown } + ): Promise; } async function callFetcher( @@ -51,23 +69,39 @@ class InstanceImpl implements WorkflowInstance { } async pause(): Promise { + if (workflowsBindingsRpc) { + await this.fetcher.pause(this.id); + return; + } await callFetcher(this.fetcher, '/pause', { id: this.id, }); } async resume(): Promise { + if (workflowsBindingsRpc) { + await this.fetcher.resume(this.id); + return; + } await callFetcher(this.fetcher, '/resume', { id: this.id, }); } async terminate(): Promise { + if (workflowsBindingsRpc) { + await this.fetcher.terminate(this.id); + return; + } await callFetcher(this.fetcher, '/terminate', { id: this.id, }); } async restart(options?: WorkflowInstanceRestartOptions): Promise { + if (workflowsBindingsRpc) { + await this.fetcher.restart(this.id, options); + return; + } await callFetcher(this.fetcher, '/restart', { ...options, id: this.id, @@ -75,6 +109,9 @@ class InstanceImpl implements WorkflowInstance { } async status(): Promise { + if (workflowsBindingsRpc) { + return await this.fetcher.status(this.id); + } const result = await callFetcher(this.fetcher, '/status', { id: this.id, }); @@ -88,6 +125,10 @@ class InstanceImpl implements WorkflowInstance { type: string; payload: unknown; }): Promise { + if (workflowsBindingsRpc) { + await this.fetcher.sendEvent(this.id, { type, payload }); + return; + } await callFetcher(this.fetcher, '/send-event', { type, payload, @@ -106,9 +147,12 @@ class WorkflowImpl { } async get(id: string): Promise { - const result = await callFetcher<{ - id: string; - }>(this.fetcher, '/get', { id }); + const result = workflowsBindingsRpc + // getInstance, not get: avoids colliding with the built-in Fetcher.get(url). + ? await this.fetcher.getInstance(id) + : await callFetcher<{ + id: string; + }>(this.fetcher, '/get', { id }); return new InstanceImpl(result.id, this.fetcher); } @@ -116,9 +160,11 @@ class WorkflowImpl { async create( options?: WorkflowInstanceCreateOptions ): Promise { - const result = await callFetcher<{ - id: string; - }>(this.fetcher, '/create', options ?? {}); + const result = workflowsBindingsRpc + ? await this.fetcher.create(options) + : await callFetcher<{ + id: string; + }>(this.fetcher, '/create', options ?? {}); return new InstanceImpl(result.id, this.fetcher); } @@ -126,11 +172,13 @@ class WorkflowImpl { async createBatch( options: WorkflowInstanceCreateOptions[] ): Promise { - const results = await callFetcher< - { - id: string; - }[] - >(this.fetcher, '/createBatch', options); + const results = workflowsBindingsRpc + ? await this.fetcher.createBatch(options) + : await callFetcher< + { + id: string; + }[] + >(this.fetcher, '/createBatch', options); return results.map((result) => new InstanceImpl(result.id, this.fetcher)); } diff --git a/src/workerd/io/compatibility-date.capnp b/src/workerd/io/compatibility-date.capnp index 56d8a6d08c8..a6bea6bf9e4 100644 --- a/src/workerd/io/compatibility-date.capnp +++ b/src/workerd/io/compatibility-date.capnp @@ -1540,4 +1540,12 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef { # startup. This allows packages to extend `sys.path` declaratively (e.g. to # add subdirectories or register import hooks). Without this flag, `.pth` # files in `python_modules/` are ignored. + + workflowsBindingsRpc @177 :Bool + $compatEnableFlag("workflows_bindings_rpc") + $experimental; + # When enabled, the `env.WORKFLOW` binding (cloudflare-internal:workflows-api) + # dispatches its methods as JSRPC calls on the inner fetcher instead of HTTP + # requests against the binding-shim worker. Without the flag the legacy HTTP + # transport is used. }