diff --git a/src/cloudflare/internal/test/workflows/workflows-api-rpc-test.js b/src/cloudflare/internal/test/workflows/workflows-api-rpc-test.js new file mode 100644 index 00000000000..215d206afb8 --- /dev/null +++ b/src/cloudflare/internal/test/workflows/workflows-api-rpc-test.js @@ -0,0 +1,118 @@ +// Copyright (c) 2024 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +import * as assert from 'node:assert'; + +async function lastRestartBody(env, id) { + return env.mock.lastRestart(id); +} + +export const tests = { + async test(_, env) { + { + const instance = await env.workflow.create({ + id: 'foo', + payload: { bar: 'baz' }, + }); + assert.deepStrictEqual(instance.id, 'foo'); + } + + { + const instance = await env.workflow.get('bar'); + assert.deepStrictEqual(instance.id, 'bar'); + } + + { + const instances = await env.workflow.createBatch([ + { id: 'foo', payload: { bar: 'baz' } }, + { id: 'bar', payload: { bar: 'baz' } }, + ]); + assert.deepStrictEqual(instances[0].id, 'foo'); + assert.deepStrictEqual(instances[1].id, 'bar'); + } + + { + const instance = await env.workflow.get('inst'); + await instance.pause(); + await instance.resume(); + await instance.terminate(); + await instance.sendEvent({ + type: 'my-event', + payload: { hello: 'world' }, + }); + } + + { + const instance = await env.workflow.get('status-rpc'); + const status = await instance.status(); + assert.deepStrictEqual(status.status, 'running'); + assert.strictEqual(status.transport, 'rpc'); + assert.strictEqual(status.output, 'status-rpc'); + } + + { + for (const method of ['get', 'create', 'createBatch']) { + assert.strictEqual(typeof env.workflow[method], 'function'); + } + + const fromGet = await env.workflow.get('a'); + const fromCreate = await env.workflow.create({ id: 'b' }); + const [fromBatch] = await env.workflow.createBatch([{ id: 'c' }]); + + const proto = Object.getPrototypeOf(fromGet); + assert.strictEqual(Object.getPrototypeOf(fromCreate), proto); + assert.strictEqual(Object.getPrototypeOf(fromBatch), proto); + + for (const method of [ + 'pause', + 'resume', + 'terminate', + 'restart', + 'status', + 'sendEvent', + ]) { + assert.strictEqual(typeof fromGet[method], 'function'); + } + } + + { + await assert.rejects(env.workflow.get('throw'), { + message: 'workflow instance not found', + }); + } + }, + + async testRestartNoOptions(_, env) { + const instance = await env.workflow.get('rpc-restart-basic'); + await instance.restart(); + + const body = await lastRestartBody(env, 'rpc-restart-basic'); + assert.deepStrictEqual(body.id, 'rpc-restart-basic'); + assert.strictEqual(body.from, undefined); + }, + + async testRestartFromStepNameOnly(_, env) { + const instance = await env.workflow.get('rpc-restart-step'); + await instance.restart({ from: { name: 'fetch data' } }); + + const body = await lastRestartBody(env, 'rpc-restart-step'); + assert.deepStrictEqual(body.id, 'rpc-restart-step'); + assert.deepStrictEqual(body.from, { name: 'fetch data' }); + }, + + async testRestartFromStepAllOptions(_, env) { + const instance = await env.workflow.get('rpc-restart-full'); + await instance.restart({ + from: { name: 'process item', count: 3, type: 'do' }, + }); + + const body = await lastRestartBody(env, 'rpc-restart-full'); + assert.deepStrictEqual(body.id, 'rpc-restart-full'); + assert.deepStrictEqual(body.from, { + name: 'process item', + count: 3, + type: 'do', + }); + }, +}; diff --git a/src/cloudflare/internal/test/workflows/workflows-api-test.js b/src/cloudflare/internal/test/workflows/workflows-api-test.js index 662acf76b01..6cd31067478 100644 --- a/src/cloudflare/internal/test/workflows/workflows-api-test.js +++ b/src/cloudflare/internal/test/workflows/workflows-api-test.js @@ -44,6 +44,38 @@ export const tests = { assert.deepStrictEqual(instances[0].id, 'foo'); assert.deepStrictEqual(instances[1].id, 'bar'); } + + { + const instance = await env.workflow.get('status-http'); + const status = await instance.status(); + assert.deepStrictEqual(status.status, 'running'); + assert.strictEqual(status.transport, 'http'); + } + + { + for (const method of ['get', 'create', 'createBatch']) { + assert.strictEqual(typeof env.workflow[method], 'function'); + } + + const fromGet = await env.workflow.get('a'); + const fromCreate = await env.workflow.create({ id: 'b' }); + const [fromBatch] = await env.workflow.createBatch([{ id: 'c' }]); + + const proto = Object.getPrototypeOf(fromGet); + assert.strictEqual(Object.getPrototypeOf(fromCreate), proto); + assert.strictEqual(Object.getPrototypeOf(fromBatch), proto); + + for (const method of [ + 'pause', + 'resume', + 'terminate', + 'restart', + 'status', + 'sendEvent', + ]) { + assert.strictEqual(typeof fromGet[method], 'function'); + } + } }, async testRestartNoOptions(_, env) { diff --git a/src/cloudflare/internal/test/workflows/workflows-api-test.wd-test b/src/cloudflare/internal/test/workflows/workflows-api-test.wd-test index cf29c59e4d4..4b6b03854d3 100644 --- a/src/cloudflare/internal/test/workflows/workflows-api-test.wd-test +++ b/src/cloudflare/internal/test/workflows/workflows-api-test.wd-test @@ -26,9 +26,35 @@ const unitTests :Workerd.Config = ( ], ) ), + ( name = "workflows-api-rpc-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "workflows-api-rpc-test.js") + ], + compatibilityFlags = ["nodejs_compat", "workflows_bindings_rpc", + "experimental", "service_binding_extra_handlers", + "rpc"], + bindings = [ + ( + name = "workflow", + wrapped = ( + moduleName = "cloudflare-internal:workflows-api", + innerBindings = [( + name = "fetcher", + service = "workflows-mock" + )], + ) + ), + ( + name = "mock", + service = "workflows-mock" + ) + ], + ) + ), ( name = "workflows-mock", worker = ( - compatibilityFlags = ["nodejs_compat"], + compatibilityFlags = ["experimental", "nodejs_compat"], modules = [ (name = "worker", esModule = embed "workflows-mock.js") ], diff --git a/src/cloudflare/internal/test/workflows/workflows-mock.js b/src/cloudflare/internal/test/workflows/workflows-mock.js index 055297c861d..7cd53df5514 100644 --- a/src/cloudflare/internal/test/workflows/workflows-mock.js +++ b/src/cloudflare/internal/test/workflows/workflows-mock.js @@ -2,70 +2,110 @@ // Licensed under the Apache 2.0 license found in the LICENSE file or at: // https://opensource.org/licenses/Apache-2.0 +import { WorkerEntrypoint } from 'cloudflare:workers'; + const restartBodies = new Map(); -export default { - async fetch(request, env, ctx) { - const data = await request.json(); - const reqUrl = new URL(request.url); - - if (reqUrl.pathname === '/get' && request.method === 'POST') { - return Response.json( - { - result: { - id: data.id, - }, - }, - { - status: 200, - headers: { - 'content-type': 'application/json', - }, - } - ); - } +const THROW_ID = 'throw'; - if (reqUrl.pathname === '/create' && request.method === 'POST') { - return Response.json( - { - result: { - id: data.id, - }, - }, - { - status: 201, - headers: { - 'content-type': 'application/json', - }, - } - ); - } +function getInstance(id) { + if (id === THROW_ID) { + throw new Error('workflow instance not found'); + } + return { id }; +} - if (reqUrl.pathname === '/createBatch' && request.method === 'POST') { - return Response.json( - { - result: data.map((val) => ({ id: val.id })), - }, - { - status: 201, - headers: { - 'content-type': 'application/json', - }, - } - ); - } +function createInstance(options) { + return { id: options?.id }; +} - if (reqUrl.pathname === '/restart' && request.method === 'POST') { - restartBodies.set(data.id, data); - return Response.json({}, { status: 200 }); - } +function createBatchInstances(options) { + return options.map((val) => ({ id: val.id })); +} - // Test-only: returns the body from the last /restart call for a given id - if (reqUrl.pathname === '/last-restart' && request.method === 'POST') { - return Response.json( - { result: restartBodies.get(data.id) ?? null }, - { status: 200 } - ); +function instanceStatus(id, transport) { + return { status: 'running', output: id, transport }; +} + +async function handleHttp(request) { + const data = await request.json(); + const reqUrl = new URL(request.url); + + if (request.method !== 'POST') { + return Response.json({ success: false }, { status: 500 }); + } + + try { + switch (reqUrl.pathname) { + case '/get': + return Response.json({ result: getInstance(data.id) }, { status: 200 }); + case '/create': + return Response.json({ result: createInstance(data) }, { status: 201 }); + case '/createBatch': + return Response.json( + { result: createBatchInstances(data) }, + { status: 201 } + ); + case '/pause': + case '/resume': + case '/terminate': + case '/send-event': + return Response.json({ result: null }, { status: 200 }); + case '/restart': + restartBodies.set(data.id, data); + return Response.json({ result: null }, { status: 200 }); + case '/status': + return Response.json( + { result: instanceStatus(data.id, 'http') }, + { status: 200 } + ); + case '/last-restart': + return Response.json( + { result: restartBodies.get(data.id) ?? null }, + { status: 200 } + ); + default: + return Response.json({ success: false }, { status: 404 }); } - }, -}; + } catch (err) { + return Response.json({ error: { message: err.message } }, { status: 500 }); + } +} + +export default class WorkflowsMock extends WorkerEntrypoint { + async getInstance(id) { + return getInstance(id); + } + + async create(options) { + return createInstance(options); + } + + async createBatch(options) { + return createBatchInstances(options); + } + + async pause(_id) {} + + async resume(_id) {} + + async terminate(_id) {} + + async restart(id, options) { + restartBodies.set(id, { ...options, id }); + } + + async status(id) { + return instanceStatus(id, 'rpc'); + } + + async sendEvent(_id, _event) {} + + async lastRestart(id) { + return restartBodies.get(id) ?? null; + } + + async fetch(request) { + return handleHttp(request); + } +} diff --git a/src/cloudflare/internal/workflows-api.ts b/src/cloudflare/internal/workflows-api.ts index 5af8b0617cb..38b063c91e2 100644 --- a/src/cloudflare/internal/workflows-api.ts +++ b/src/cloudflare/internal/workflows-api.ts @@ -9,8 +9,26 @@ export class NonRetryableError extends Error { } } +const workflowsBindingsRpc = + !!Cloudflare.compatibilityFlags['workflows_bindings_rpc']; + interface Fetcher { fetch: typeof fetch; + getInstance(id: string): Promise<{ id: string }>; + create(options?: WorkflowInstanceCreateOptions): Promise<{ id: string }>; + createBatch( + options: WorkflowInstanceCreateOptions[] + ): Promise<{ id: string }[]>; + + pause(id: string): Promise; + resume(id: string): Promise; + terminate(id: string): Promise; + restart(id: string, options?: WorkflowInstanceRestartOptions): Promise; + status(id: string): Promise; + sendEvent( + id: string, + event: { type: string; payload: unknown } + ): Promise; } async function callFetcher( @@ -51,23 +69,39 @@ class InstanceImpl implements WorkflowInstance { } async pause(): Promise { + if (workflowsBindingsRpc) { + await this.fetcher.pause(this.id); + return; + } await callFetcher(this.fetcher, '/pause', { id: this.id, }); } async resume(): Promise { + if (workflowsBindingsRpc) { + await this.fetcher.resume(this.id); + return; + } await callFetcher(this.fetcher, '/resume', { id: this.id, }); } async terminate(): Promise { + if (workflowsBindingsRpc) { + await this.fetcher.terminate(this.id); + return; + } await callFetcher(this.fetcher, '/terminate', { id: this.id, }); } async restart(options?: WorkflowInstanceRestartOptions): Promise { + if (workflowsBindingsRpc) { + await this.fetcher.restart(this.id, options); + return; + } await callFetcher(this.fetcher, '/restart', { ...options, id: this.id, @@ -75,6 +109,9 @@ class InstanceImpl implements WorkflowInstance { } async status(): Promise { + if (workflowsBindingsRpc) { + return await this.fetcher.status(this.id); + } const result = await callFetcher(this.fetcher, '/status', { id: this.id, }); @@ -88,6 +125,10 @@ class InstanceImpl implements WorkflowInstance { type: string; payload: unknown; }): Promise { + if (workflowsBindingsRpc) { + await this.fetcher.sendEvent(this.id, { type, payload }); + return; + } await callFetcher(this.fetcher, '/send-event', { type, payload, @@ -106,9 +147,12 @@ class WorkflowImpl { } async get(id: string): Promise { - const result = await callFetcher<{ - id: string; - }>(this.fetcher, '/get', { id }); + const result = workflowsBindingsRpc + // getInstance, not get: avoids colliding with the built-in Fetcher.get(url). + ? await this.fetcher.getInstance(id) + : await callFetcher<{ + id: string; + }>(this.fetcher, '/get', { id }); return new InstanceImpl(result.id, this.fetcher); } @@ -116,9 +160,11 @@ class WorkflowImpl { async create( options?: WorkflowInstanceCreateOptions ): Promise { - const result = await callFetcher<{ - id: string; - }>(this.fetcher, '/create', options ?? {}); + const result = workflowsBindingsRpc + ? await this.fetcher.create(options) + : await callFetcher<{ + id: string; + }>(this.fetcher, '/create', options ?? {}); return new InstanceImpl(result.id, this.fetcher); } @@ -126,11 +172,13 @@ class WorkflowImpl { async createBatch( options: WorkflowInstanceCreateOptions[] ): Promise { - const results = await callFetcher< - { - id: string; - }[] - >(this.fetcher, '/createBatch', options); + const results = workflowsBindingsRpc + ? await this.fetcher.createBatch(options) + : await callFetcher< + { + id: string; + }[] + >(this.fetcher, '/createBatch', options); return results.map((result) => new InstanceImpl(result.id, this.fetcher)); } diff --git a/src/workerd/io/compatibility-date.capnp b/src/workerd/io/compatibility-date.capnp index 56d8a6d08c8..a6bea6bf9e4 100644 --- a/src/workerd/io/compatibility-date.capnp +++ b/src/workerd/io/compatibility-date.capnp @@ -1540,4 +1540,12 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef { # startup. This allows packages to extend `sys.path` declaratively (e.g. to # add subdirectories or register import hooks). Without this flag, `.pth` # files in `python_modules/` are ignored. + + workflowsBindingsRpc @177 :Bool + $compatEnableFlag("workflows_bindings_rpc") + $experimental; + # When enabled, the `env.WORKFLOW` binding (cloudflare-internal:workflows-api) + # dispatches its methods as JSRPC calls on the inner fetcher instead of HTTP + # requests against the binding-shim worker. Without the flag the legacy HTTP + # transport is used. }