Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions src/cloudflare/internal/test/workflows/workflows-api-rpc-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import * as assert from 'node:assert';

async function lastRestartBody(env, id) {
return env.mock.lastRestart(id);
}

export const tests = {
async test(_, env) {
{
const instance = await env.workflow.create({
id: 'foo',
payload: { bar: 'baz' },
});
assert.deepStrictEqual(instance.id, 'foo');
}

{
const instance = await env.workflow.get('bar');
assert.deepStrictEqual(instance.id, 'bar');
}

{
const instances = await env.workflow.createBatch([
{ id: 'foo', payload: { bar: 'baz' } },
{ id: 'bar', payload: { bar: 'baz' } },
]);
assert.deepStrictEqual(instances[0].id, 'foo');
assert.deepStrictEqual(instances[1].id, 'bar');
}

{
const instance = await env.workflow.get('inst');
await instance.pause();
await instance.resume();
await instance.terminate();
await instance.sendEvent({
type: 'my-event',
payload: { hello: 'world' },
});
}

{
const instance = await env.workflow.get('status-rpc');
const status = await instance.status();
assert.deepStrictEqual(status.status, 'running');
assert.strictEqual(status.transport, 'rpc');
assert.strictEqual(status.output, 'status-rpc');
}

{
for (const method of ['get', 'create', 'createBatch']) {
assert.strictEqual(typeof env.workflow[method], 'function');
}

const fromGet = await env.workflow.get('a');
const fromCreate = await env.workflow.create({ id: 'b' });
const [fromBatch] = await env.workflow.createBatch([{ id: 'c' }]);

const proto = Object.getPrototypeOf(fromGet);
assert.strictEqual(Object.getPrototypeOf(fromCreate), proto);
assert.strictEqual(Object.getPrototypeOf(fromBatch), proto);

for (const method of [
'pause',
'resume',
'terminate',
'restart',
'status',
'sendEvent',
]) {
assert.strictEqual(typeof fromGet[method], 'function');
}
}

{
await assert.rejects(env.workflow.get('throw'), {
message: 'workflow instance not found',
});
}
},

async testRestartNoOptions(_, env) {
const instance = await env.workflow.get('rpc-restart-basic');
await instance.restart();

const body = await lastRestartBody(env, 'rpc-restart-basic');
assert.deepStrictEqual(body.id, 'rpc-restart-basic');
assert.strictEqual(body.from, undefined);
},

async testRestartFromStepNameOnly(_, env) {
const instance = await env.workflow.get('rpc-restart-step');
await instance.restart({ from: { name: 'fetch data' } });

const body = await lastRestartBody(env, 'rpc-restart-step');
assert.deepStrictEqual(body.id, 'rpc-restart-step');
assert.deepStrictEqual(body.from, { name: 'fetch data' });
},

async testRestartFromStepAllOptions(_, env) {
const instance = await env.workflow.get('rpc-restart-full');
await instance.restart({
from: { name: 'process item', count: 3, type: 'do' },
});

const body = await lastRestartBody(env, 'rpc-restart-full');
assert.deepStrictEqual(body.id, 'rpc-restart-full');
assert.deepStrictEqual(body.from, {
name: 'process item',
count: 3,
type: 'do',
});
},
};
32 changes: 32 additions & 0 deletions src/cloudflare/internal/test/workflows/workflows-api-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,38 @@ export const tests = {
assert.deepStrictEqual(instances[0].id, 'foo');
assert.deepStrictEqual(instances[1].id, 'bar');
}

{
const instance = await env.workflow.get('status-http');
const status = await instance.status();
assert.deepStrictEqual(status.status, 'running');
assert.strictEqual(status.transport, 'http');
}

{
for (const method of ['get', 'create', 'createBatch']) {
assert.strictEqual(typeof env.workflow[method], 'function');
}

const fromGet = await env.workflow.get('a');
const fromCreate = await env.workflow.create({ id: 'b' });
const [fromBatch] = await env.workflow.createBatch([{ id: 'c' }]);

const proto = Object.getPrototypeOf(fromGet);
assert.strictEqual(Object.getPrototypeOf(fromCreate), proto);
assert.strictEqual(Object.getPrototypeOf(fromBatch), proto);

for (const method of [
'pause',
'resume',
'terminate',
'restart',
'status',
'sendEvent',
]) {
assert.strictEqual(typeof fromGet[method], 'function');
}
}
},

async testRestartNoOptions(_, env) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,35 @@ const unitTests :Workerd.Config = (
],
)
),
( name = "workflows-api-rpc-test",
worker = (
modules = [
(name = "worker", esModule = embed "workflows-api-rpc-test.js")
],
compatibilityFlags = ["nodejs_compat", "workflows_bindings_rpc",
"experimental", "service_binding_extra_handlers",
"rpc"],
bindings = [
(
name = "workflow",
wrapped = (
moduleName = "cloudflare-internal:workflows-api",
innerBindings = [(
name = "fetcher",
service = "workflows-mock"
)],
)
),
(
name = "mock",
service = "workflows-mock"
)
],
)
),
( name = "workflows-mock",
worker = (
compatibilityFlags = ["nodejs_compat"],
compatibilityFlags = ["experimental", "nodejs_compat"],
modules = [
(name = "worker", esModule = embed "workflows-mock.js")
],
Expand Down
160 changes: 100 additions & 60 deletions src/cloudflare/internal/test/workflows/workflows-mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,110 @@
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import { WorkerEntrypoint } from 'cloudflare:workers';

const restartBodies = new Map();

export default {
async fetch(request, env, ctx) {
const data = await request.json();
const reqUrl = new URL(request.url);

if (reqUrl.pathname === '/get' && request.method === 'POST') {
return Response.json(
{
result: {
id: data.id,
},
},
{
status: 200,
headers: {
'content-type': 'application/json',
},
}
);
}
const THROW_ID = 'throw';

if (reqUrl.pathname === '/create' && request.method === 'POST') {
return Response.json(
{
result: {
id: data.id,
},
},
{
status: 201,
headers: {
'content-type': 'application/json',
},
}
);
}
function getInstance(id) {
if (id === THROW_ID) {
throw new Error('workflow instance not found');
}
return { id };
}

if (reqUrl.pathname === '/createBatch' && request.method === 'POST') {
return Response.json(
{
result: data.map((val) => ({ id: val.id })),
},
{
status: 201,
headers: {
'content-type': 'application/json',
},
}
);
}
function createInstance(options) {
return { id: options?.id };
}

if (reqUrl.pathname === '/restart' && request.method === 'POST') {
restartBodies.set(data.id, data);
return Response.json({}, { status: 200 });
}
function createBatchInstances(options) {
return options.map((val) => ({ id: val.id }));
}

// Test-only: returns the body from the last /restart call for a given id
if (reqUrl.pathname === '/last-restart' && request.method === 'POST') {
return Response.json(
{ result: restartBodies.get(data.id) ?? null },
{ status: 200 }
);
function instanceStatus(id, transport) {
return { status: 'running', output: id, transport };
}

async function handleHttp(request) {
const data = await request.json();
const reqUrl = new URL(request.url);

if (request.method !== 'POST') {
return Response.json({ success: false }, { status: 500 });
}

try {
switch (reqUrl.pathname) {
case '/get':
return Response.json({ result: getInstance(data.id) }, { status: 200 });
case '/create':
return Response.json({ result: createInstance(data) }, { status: 201 });
case '/createBatch':
return Response.json(
{ result: createBatchInstances(data) },
{ status: 201 }
);
case '/pause':
case '/resume':
case '/terminate':
case '/send-event':
return Response.json({ result: null }, { status: 200 });
case '/restart':
restartBodies.set(data.id, data);
return Response.json({ result: null }, { status: 200 });
case '/status':
return Response.json(
{ result: instanceStatus(data.id, 'http') },
{ status: 200 }
);
case '/last-restart':
return Response.json(
{ result: restartBodies.get(data.id) ?? null },
{ status: 200 }
);
default:
return Response.json({ success: false }, { status: 404 });
}
},
};
} catch (err) {
return Response.json({ error: { message: err.message } }, { status: 500 });
}
}

export default class WorkflowsMock extends WorkerEntrypoint {
async getInstance(id) {
return getInstance(id);
}

async create(options) {
return createInstance(options);
}

async createBatch(options) {
return createBatchInstances(options);
}

async pause(_id) {}

async resume(_id) {}

async terminate(_id) {}

async restart(id, options) {
restartBodies.set(id, { ...options, id });
}

async status(id) {
return instanceStatus(id, 'rpc');
}

async sendEvent(_id, _event) {}

async lastRestart(id) {
return restartBodies.get(id) ?? null;
}

async fetch(request) {
return handleHttp(request);
}
}
Loading
Loading