Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/getwritable-share-pipe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@workflow/core": patch
"workflow": patch
---

Fix `getWritable()` returning a new TransformStream per call, which caused racing pipes to reorder chunks when callers acquired a writer per write. Repeat calls within the same step now share a single pipe per `(runId, namespace)`.
15 changes: 15 additions & 0 deletions packages/core/src/step/context-storage.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
import { AsyncLocalStorage } from 'node:async_hooks';
import type { CryptoKey } from '../encryption.js';
import type { FlushableStreamState } from '../flushable-stream.js';
import type { WorkflowMetadata } from '../workflow/get-workflow-metadata.js';
import type { StepMetadata } from './get-step-metadata.js';

/**
* Per-step cache entry for a `(runId, namespace)` writable stream.
*
* Holds the user-facing `WritableStream` and the shared `FlushableStreamState`
* driving the background pipe to the workflow server. Re-used so repeat calls
* to `getWritable()` within the same step return the same handle instead of
* spawning racing pipes — see https://github.com/vercel/workflow/issues/2058.
*/
export interface CachedWritable {
writable: WritableStream<any>;
state: FlushableStreamState;
}

export type StepContext = {
stepMetadata: StepMetadata;
workflowMetadata: WorkflowMetadata;
ops: Promise<void>[];
closureVars?: Record<string, any>;
encryptionKey?: CryptoKey;
writables?: Map<string, CachedWritable>;
};

/**
Expand Down
184 changes: 146 additions & 38 deletions packages/core/src/step/writable-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,46 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { LOCK_POLL_INTERVAL_MS } from '../flushable-stream.js';
import { setWorld } from '../runtime/world.js';

// Captures every chunk written to `world.streams.write` / `writeMulti`
// in arrival order, so tests can assert the on-wire sequence after
// going through the (de)serialize transforms.
let writeCalls: Uint8Array[];

function makeStepCtx(): any {
return {
stepMetadata: {
stepName: 'test-step',
stepId: 'step_001',
stepStartedAt: new Date(),
attempt: 1,
},
workflowMetadata: {
workflowName: 'test-workflow',
workflowRunId: 'wrun_test123',
workflowStartedAt: new Date(),
url: 'http://localhost:3000',
features: { encryption: false },
},
ops: [] as Promise<void>[],
encryptionKey: undefined,
};
}

describe('step-level getWritable', () => {
beforeEach(() => {
writeCalls = [];
const mockWorld = {
streams: {
write: vi.fn().mockResolvedValue(undefined),
writeMulti: vi.fn().mockResolvedValue(undefined),
write: vi.fn(
async (_runId: string, _name: string, chunk: Uint8Array) => {
writeCalls.push(chunk);
}
),
writeMulti: vi.fn(
async (_runId: string, _name: string, chunks: Uint8Array[]) => {
writeCalls.push(...chunks);
}
),
close: vi.fn().mockResolvedValue(undefined),
},
};
Expand All @@ -23,24 +57,8 @@ describe('step-level getWritable', () => {
it('ops promise should resolve when writer lock is released (without closing stream)', async () => {
const { contextStorage } = await import('./context-storage.js');

const ops: Promise<void>[] = [];
const ctx = {
stepMetadata: {
stepName: 'test-step',
stepId: 'step_001',
stepStartedAt: new Date(),
attempt: 1,
},
workflowMetadata: {
workflowName: 'test-workflow',
workflowRunId: 'wrun_test123',
workflowStartedAt: new Date(),
url: 'http://localhost:3000',
features: { encryption: false },
},
ops,
encryptionKey: undefined,
};
const ctx = makeStepCtx();
const ops = ctx.ops as Promise<void>[];

const writable = await contextStorage.run(ctx, async () => {
const { getWritable } = await import('./writable-stream.js');
Expand Down Expand Up @@ -71,24 +89,8 @@ describe('step-level getWritable', () => {
it('ops promise should resolve when stream is explicitly closed', async () => {
const { contextStorage } = await import('./context-storage.js');

const ops: Promise<void>[] = [];
const ctx = {
stepMetadata: {
stepName: 'test-step',
stepId: 'step_001',
stepStartedAt: new Date(),
attempt: 1,
},
workflowMetadata: {
workflowName: 'test-workflow',
workflowRunId: 'wrun_test123',
workflowStartedAt: new Date(),
url: 'http://localhost:3000',
features: { encryption: false },
},
ops,
encryptionKey: undefined,
};
const ctx = makeStepCtx();
const ops = ctx.ops as Promise<void>[];

const writable = await contextStorage.run(ctx, async () => {
const { getWritable } = await import('./writable-stream.js');
Expand All @@ -111,4 +113,110 @@ describe('step-level getWritable', () => {
])
).resolves.not.toThrow();
});

// Regression for https://github.com/vercel/workflow/issues/2058.
// Repeat calls to `getWritable()` from the same step previously spawned
// independent TransformStream + pipe pairs that all flushed to the same
// (runId, name). On world-vercel the 50-100ms HTTP write latency turned
// that race window into deterministic reordering; locally it was
// invisible. We now memoize per (runId, namespace) so a single serial
// sink is shared across calls.
it('returns the same writable for repeat calls with the same namespace', async () => {
const { contextStorage } = await import('./context-storage.js');
const ctx = makeStepCtx();

const [a, b] = await contextStorage.run(ctx, async () => {
const { getWritable } = await import('./writable-stream.js');
return [getWritable<string>(), getWritable<string>()] as const;
});

expect(a).toBe(b);

// Different namespaces still get distinct writables.
const [c, d] = await contextStorage.run(ctx, async () => {
const { getWritable } = await import('./writable-stream.js');
return [
getWritable<string>({ namespace: 'left' }),
getWritable<string>({ namespace: 'right' }),
] as const;
});

expect(c).not.toBe(d);
expect(c).not.toBe(a);
});

it('preserves chunk order across per-write getWritable() calls in a loop', async () => {
const { contextStorage } = await import('./context-storage.js');
const { getDeserializeStream } = await import('../serialization.js');

const ctx = makeStepCtx();
const ops = ctx.ops as Promise<void>[];

// Repro of the user-reported pattern: acquire a fresh writer per chunk
// and release between writes. With the pre-fix per-call pipe, these
// chunks could land out of order on the server.
const chunks = ['nov', 'o', ' e', '2', 'e', ' ok'];
await contextStorage.run(ctx, async () => {
const { getWritable } = await import('./writable-stream.js');
for (const chunk of chunks) {
const writer = getWritable<string>().getWriter();
try {
await writer.write(chunk);
} finally {
writer.releaseLock();
}
}
});

// Wait for all pending writes to flush through the shared pipe.
await Promise.race([
Promise.all(ops),
new Promise((_, r) =>
setTimeout(
() => r(new Error('ops did not resolve')),
LOCK_POLL_INTERVAL_MS * 20 + 500
)
),
]);

// Decode the recorded server writes via the matching deserialize
// stream and confirm chunks arrived in the order we wrote them.
const deserialize = getDeserializeStream({}, undefined);
const decoded: string[] = [];
const reader = deserialize.readable.getReader();
const drain = (async () => {
while (true) {
const r = await reader.read();
if (r.done) return;
decoded.push(r.value);
}
})();

const writer = deserialize.writable.getWriter();
for (const buf of writeCalls) {
await writer.write(buf);
}
await writer.close();
await drain;

expect(decoded).toEqual(chunks);
});

it('registers exactly one pipe per (runId, namespace), regardless of call count', async () => {
const { contextStorage } = await import('./context-storage.js');

const ctx = makeStepCtx();
const ops = ctx.ops as Promise<void>[];

await contextStorage.run(ctx, async () => {
const { getWritable } = await import('./writable-stream.js');
getWritable<string>();
getWritable<string>();
getWritable<string>();
// A distinct namespace gets its own pipe.
getWritable<string>({ namespace: 'other' });
});

expect(ops).toHaveLength(2);
});
});
27 changes: 23 additions & 4 deletions packages/core/src/step/writable-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
} from '../serialization.js';
import { STREAM_NAME_SYMBOL, STREAM_SERVER_RUN_ID_SYMBOL } from '../symbols.js';
import { getWorkflowRunStreamId } from '../util.js';
import { contextStorage } from './context-storage.js';
import { type CachedWritable, contextStorage } from './context-storage.js';

/**
* The options for {@link getWritable}.
Expand Down Expand Up @@ -50,7 +50,25 @@ export function getWritable<W = any>(
const runId = ctx.workflowMetadata.workflowRunId;
const name = getWorkflowRunStreamId(runId, namespace);

// Create a transform stream that serializes chunks and pipes to the workflow server
// Cache the writable per (runId, namespace) within the step context.
//
// The previous behavior — constructing a fresh TransformStream and
// background pipe on every call — produced non-deterministic chunk
// ordering when callers acquired a new writer per write (e.g. a
// per-chunk loop). Each pipe flushed to the same (runId, name) server
// stream independently, and on Vercel the 50-100ms HTTP latency
// turned the race window from microseconds into something prod-visible.
//
// Sharing a single TransformStream + pipe across calls makes the
// unsafe pattern correct: writes go through one serial sink in the
// order the user wrote them. See
// https://github.com/vercel/workflow/issues/2058.
const cache = (ctx.writables ??= new Map<string, CachedWritable>());
const cached = cache.get(name);
if (cached) {
return cached.writable as WritableStream<W>;
}

const serialize = getSerializeStream(
getExternalReducers(globalThis, ctx.ops, runId, ctx.encryptionKey),
ctx.encryptionKey
Expand Down Expand Up @@ -86,6 +104,7 @@ export function getWritable<W = any>(
writable: false,
});

// Return the writable side of the transform stream
return serialize.writable;
cache.set(name, { writable: serialize.writable, state });

return serialize.writable as WritableStream<W>;
}
Loading