Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/tired-pigs-hug.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@workflow/core": minor
"workflow": minor
---
Comment thread
TooTallNate marked this conversation as resolved.

A `WritableStream` from a workflow's `getWritable()` can now be passed as an argument to a child workflow via `start()`; the child's writes land on the parent run's stream directly for the full lifetime of the child run.
44 changes: 44 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,50 @@ describe('e2e', () => {
expect(await run.returnValue).toEqual('done');
});

// A WritableStream passed as a workflow argument to start() should
// land raw bytes on the parent's output stream when the child step
// writes to it. Covered for both:
// - `writableForwardedFromWorkflowWorkflow`: parent calls
// `getWritable()` in workflow context (fake handle revived in the
// intermediary step).
// - `writableForwardedFromStepWorkflow`: parent calls `getWritable()`
// in step context (real `serialize.writable` passed straight to
// `start()`).
test.each([
'writableForwardedFromWorkflowWorkflow',
'writableForwardedFromStepWorkflow',
] as const)('%s', { timeout: 120_000 }, async (workflowName) => {
const payload = `hello-from-child-${Date.now()}\n`;
const run = await start(await e2e(workflowName), [payload]);

const reader = run.getReadable().getReader();
// `fatal: true` makes the decoder throw on any invalid UTF-8
// sequence, so a successful decode is itself a round-trip
// assertion that the bytes survived intact.
const decoder = new TextDecoder('utf-8', { fatal: true });

// The child step performs exactly one write of `payload` as
// UTF-8 bytes, so we should receive a single chunk containing
// exactly those bytes before the stream closes.
const { value, done } = await reader.read();
expect(done).toBeFalsy();
assert(value);
assert(value instanceof Uint8Array);

const expectedBytes = new TextEncoder().encode(payload);
expect(value.byteLength).toBe(expectedBytes.byteLength);
expect(decoder.decode(value)).toBe(payload);

// Default stream should close cleanly after the parent closes its
// writable.
expect((await reader.read()).done).toBe(true);

const returnValue = await run.returnValue;
expect(returnValue).toMatchObject({
childRunId: expect.stringMatching(/^wrun_/),
});
});

test('fetchWorkflow', { timeout: 60_000 }, async () => {
const run = await start(await e2e('fetchWorkflow'), []);
const returnValue = await run.returnValue;
Expand Down
25 changes: 20 additions & 5 deletions packages/core/src/encryption.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,34 @@ const KEY_LENGTH = 32; // bytes (AES-256)
* Callers should call this once per run (after `getEncryptionKeyForRun()`)
* and pass the resulting `CryptoKey` to all subsequent encrypt/decrypt calls.
*
* Pass `usages: ['encrypt']` (or `['decrypt']`) for cross-run scenarios
* where the caller should not be able to perform the inverse operation
* with the key — for example a child workflow writing into a parent
* run's forwarded WritableStream only needs to encrypt, never decrypt.
*
* @param raw - Raw 32-byte AES-256 key (from World.getEncryptionKeyForRun)
* @param usages - Key usages. Defaults to `['encrypt', 'decrypt']`.
* @returns CryptoKey ready for AES-GCM operations
*/
export async function importKey(raw: Uint8Array) {
export async function importKey(
raw: Uint8Array,
usages: ReadonlyArray<'encrypt' | 'decrypt'> = ['encrypt', 'decrypt']
) {
if (raw.byteLength !== KEY_LENGTH) {
throw new WorkflowRuntimeError(
`Encryption key must be exactly ${KEY_LENGTH} bytes, got ${raw.byteLength}`
);
}
return globalThis.crypto.subtle.importKey('raw', raw, 'AES-GCM', false, [
'encrypt',
'decrypt',
]);
return globalThis.crypto.subtle.importKey(
'raw',
raw,
'AES-GCM',
false,
// `KeyUsage` is a DOM-lib type that's not in scope under `es2022`.
// The `ReadonlyArray<'encrypt' | 'decrypt'>` parameter type matches
// a strict subset of `KeyUsage[]`, so this cast is sound.
usages as ('encrypt' | 'decrypt')[]
);
}

/**
Expand Down
39 changes: 39 additions & 0 deletions packages/core/src/serialization.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {
ABORT_STREAM_NAME,
STABLE_ULID,
STREAM_NAME_SYMBOL,
STREAM_SERVER_RUN_ID_SYMBOL,
} from './symbols.js';
import { createContext } from './vm/index.js';

Expand Down Expand Up @@ -498,6 +499,44 @@ describe('workflow arguments', () => {
expect(streamName).toMatch(/^strm_[0-9A-Z]{26}$/);
});

// When a user writable is already backed by a workflow server
// stream (because it was hydrated by a step-side reviver or created
// via step-context `getWritable()`), forwarding it across a
// `start()` boundary must emit the original `(runId, name)` in the
// dehydrated descriptor and MUST NOT install any pipe through the
// user's writable. The child run's step-side reviver then opens a
// server writable against the original `(runId, name)` directly,
// so writes survive for the full lifetime of the child run — not
// just for the dehydrating step's process.
it('forwards original (runId, name) for a tagged WritableStream', async () => {
const userWritable = new WritableStream();
Object.defineProperty(userWritable, STREAM_NAME_SYMBOL, {
value: 'strm_parentstreamname',
writable: false,
});
Object.defineProperty(userWritable, STREAM_SERVER_RUN_ID_SYMBOL, {
value: 'wrun_parent',
writable: false,
});

expect(userWritable.locked).toBe(false);
const serialized = await dehydrateWorkflowArguments(
userWritable,
'wrun_child',
noEncryptionKey,
[]
);
// If the reducer had piped through the user's writable, the lock
// would be acquired here.
expect(userWritable.locked).toBe(false);
// The dehydrated descriptor should carry both the original name
// and the original runId so the child's reviver can open the
// writable against the parent's server stream directly.
const text = new TextDecoder().decode(serialized as Uint8Array);
expect(text).toContain('strm_parentstreamname');
expect(text).toContain('wrun_parent');
});

it('should work with ReadableStream', async () => {
const stream = new ReadableStream();
const serialized = await dehydrateWorkflowArguments(
Expand Down
119 changes: 108 additions & 11 deletions packages/core/src/serialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
decrypt as aesGcmDecrypt,
encrypt as aesGcmEncrypt,
type CryptoKey,
importKey,
} from './encryption.js';
import {
createFlushableState,
Expand Down Expand Up @@ -62,6 +63,7 @@ import {
BODY_INIT_SYMBOL,
STABLE_ULID,
STREAM_NAME_SYMBOL,
STREAM_SERVER_RUN_ID_SYMBOL,
STREAM_TYPE_SYMBOL,
WEBHOOK_RESPONSE_WRITABLE,
} from './symbols.js';
Expand Down Expand Up @@ -775,9 +777,26 @@ export function getExternalReducers(
WritableStream: (value) => {
if (!(value instanceof global.WritableStream)) return false;

// Fast path: when the writable is already backed by a workflow
// server stream (e.g. it came from a step-context `getWritable()`
// or was hydrated from a workflow input by `getStepRevivers`),
// forward its underlying `(runId, name)` to the receiving run.
// The receiving run's step-side reviver opens a server writable
// against the original `(runId, name)` and resolves that run's
// encryption key directly, so writes land on the original stream
// for the full lifetime of the receiving run — no in-process
// bridge tied to the dehydrating step's lifetime.
const existingName = (value as any)[STREAM_NAME_SYMBOL];
const existingRunId = (value as any)[STREAM_SERVER_RUN_ID_SYMBOL];
if (
typeof existingName === 'string' &&
typeof existingRunId === 'string'
) {
return { name: existingName, runId: existingRunId };
}

const streamId = ((global as any)[STABLE_ULID] || defaultUlid)();
const name = `strm_${streamId}`;

const readable = new WorkflowServerReadableStream(runId, name);
ops.push(readable.pipeTo(value));

Expand Down Expand Up @@ -861,7 +880,13 @@ export function getWorkflowReducers(
if (!name) {
throw new WorkflowRuntimeError('WritableStream `name` is not set');
}
return { name };
const s: SerializableSpecial['WritableStream'] = { name };
// When the handle was forwarded from another run (parent → child
// via `start()`), preserve the foreign runId so the step-side
// reviver opens the writable against the original stream.
const foreignRunId = value[STREAM_SERVER_RUN_ID_SYMBOL];
if (typeof foreignRunId === 'string') s.runId = foreignRunId;
return s;
},

// AbortController/AbortSignal in workflow context — just read symbols (handles).
Expand Down Expand Up @@ -961,6 +986,7 @@ function getStepReducers(
if (!(value instanceof global.WritableStream)) return false;

let name = value[STREAM_NAME_SYMBOL];
const foreignRunId = (value as any)[STREAM_SERVER_RUN_ID_SYMBOL];
if (!name) {
const streamId = ((global as any)[STABLE_ULID] || defaultUlid)();
name = `strm_${streamId}`;
Expand All @@ -976,7 +1002,9 @@ function getStepReducers(
);
}

return { name };
const s: SerializableSpecial['WritableStream'] = { name };
if (typeof foreignRunId === 'string') s.runId = foreignRunId;
return s;
},

AbortController: (value) => {
Expand Down Expand Up @@ -1414,12 +1442,25 @@ export function getExternalRevivers(
}
},
WritableStream: (value) => {
// Same handling as `getStepRevivers.WritableStream` — see comments
// there for the cross-run case (writable carries `runId` from
// parent → child forwarding via `start()`).
const targetRunId = typeof value.runId === 'string' ? value.runId : runId;
const targetKey: EncryptionKeyParam =
targetRunId === runId
? cryptoKey
: (async () => {
const world = await getWorldLazy();
const rawKey = await world.getEncryptionKeyForRun?.(targetRunId);
return rawKey ? await importKey(rawKey, ['encrypt']) : undefined;
})();

const serialize = getSerializeStream(
getExternalReducers(global, ops, runId, cryptoKey),
cryptoKey
getExternalReducers(global, ops, targetRunId, targetKey),
targetKey
);
const serverWritable = new WorkflowServerWritableStream(
runId,
targetRunId,
value.name
);

Expand All @@ -1435,6 +1476,15 @@ export function getExternalRevivers(
// Start polling to detect when user releases lock
pollWritableLock(serialize.writable, state);

Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, {
value: value.name,
writable: false,
});
Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, {
value: targetRunId,
writable: false,
});

return serialize.writable;
},

Expand Down Expand Up @@ -1519,12 +1569,22 @@ export function getWorkflowRevivers(
});
},
WritableStream: (value) => {
return Object.create(global.WritableStream.prototype, {
const descriptor: PropertyDescriptorMap = {
[STREAM_NAME_SYMBOL]: {
value: value.name,
writable: false,
},
});
};
// Preserve the foreign runId, if present, so that when the
// handle is later passed to a step the workflow reducer can
// forward it through to the step reviver.
if (typeof value.runId === 'string') {
descriptor[STREAM_SERVER_RUN_ID_SYMBOL] = {
value: value.runId,
writable: false,
};
}
return Object.create(global.WritableStream.prototype, descriptor);
},

// AbortController/AbortSignal revived inside the workflow VM. Use the
Expand Down Expand Up @@ -1742,12 +1802,34 @@ function getStepRevivers(
}
},
WritableStream: (value) => {
// Same-run case: the writable belongs to the current run. Use the
// local cryptoKey and write to the local runId's server stream.
//
// Cross-run case (parent → child via `start()`): the descriptor
// carries the original `runId` and `name`. Open a server writable
// against the original `(runId, name)` and resolve THAT run's key
// for encryption. The resolution is async but doesn't need to
// block reviver return — `getSerializeStream` accepts the
// `Promise<CryptoKey | undefined>` directly and awaits it lazily
// on the first chunk written. The key is imported encrypt-only
// so the receiving run can never decrypt anything else on the
// owning run's stream — it can only contribute new writes.
const targetRunId = typeof value.runId === 'string' ? value.runId : runId;
const targetKey: EncryptionKeyParam =
targetRunId === runId
? cryptoKey
: (async () => {
const world = await getWorldLazy();
const rawKey = await world.getEncryptionKeyForRun?.(targetRunId);
return rawKey ? await importKey(rawKey, ['encrypt']) : undefined;
Comment thread
TooTallNate marked this conversation as resolved.
})();

const serialize = getSerializeStream(
getStepReducers(global, ops, runId, cryptoKey),
cryptoKey
getStepReducers(global, ops, targetRunId, targetKey),
targetKey
);
const serverWritable = new WorkflowServerWritableStream(
runId,
targetRunId,
value.name
);

Expand All @@ -1763,6 +1845,21 @@ function getStepRevivers(
// Start polling to detect when user releases lock
pollWritableLock(serialize.writable, state);

// Record the underlying `(runId, name)` so downstream reducers can
// recognize that this writable is already backed by a workflow
// server stream. When forwarded across `start()` again — e.g.
// the child passes this writable on to a grandchild — the
// external reducer needs both to emit the original `runId` in
// the descriptor.
Object.defineProperty(serialize.writable, STREAM_NAME_SYMBOL, {
value: value.name,
writable: false,
});
Object.defineProperty(serialize.writable, STREAM_SERVER_RUN_ID_SYMBOL, {
value: targetRunId,
writable: false,
});

return serialize.writable;
},

Expand Down
11 changes: 10 additions & 1 deletion packages/core/src/serialization/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,16 @@ export interface SerializableSpecial {
cause?: unknown;
errors: unknown[];
};
WritableStream: { name: string };
WritableStream: {
name: string;
/**
* The runId of the workflow run that owns the underlying server
* stream. Present only when the writable was forwarded across a
* `start()` boundary (parent → child). When omitted, the writable
* belongs to the receiving run (the normal in-run case).
*/
runId?: string;
};
AbortController: {
streamName: string;
hookToken: string;
Expand Down
Loading
Loading