diff --git a/.changeset/getwritable-share-pipe.md b/.changeset/getwritable-share-pipe.md new file mode 100644 index 0000000000..a5ca52e9ae --- /dev/null +++ b/.changeset/getwritable-share-pipe.md @@ -0,0 +1,6 @@ +--- +"@workflow/core": patch +"workflow": patch +--- + +Fix `getWritable()` returning a new TransformStream per call, which caused racing pipes to reorder chunks when callers acquired a writer per write. Repeat calls within the same step now share a single pipe per `(runId, namespace)`. diff --git a/packages/core/src/step/context-storage.ts b/packages/core/src/step/context-storage.ts index 200925ea4b..88941d6ee3 100644 --- a/packages/core/src/step/context-storage.ts +++ b/packages/core/src/step/context-storage.ts @@ -1,14 +1,29 @@ import { AsyncLocalStorage } from 'node:async_hooks'; import type { CryptoKey } from '../encryption.js'; +import type { FlushableStreamState } from '../flushable-stream.js'; import type { WorkflowMetadata } from '../workflow/get-workflow-metadata.js'; import type { StepMetadata } from './get-step-metadata.js'; +/** + * Per-step cache entry for a `(runId, namespace)` writable stream. + * + * Holds the user-facing `WritableStream` and the shared `FlushableStreamState` + * driving the background pipe to the workflow server. Re-used so repeat calls + * to `getWritable()` within the same step return the same handle instead of + * spawning racing pipes — see https://github.com/vercel/workflow/issues/2058. + */ +export interface CachedWritable { + writable: WritableStream; + state: FlushableStreamState; +} + export type StepContext = { stepMetadata: StepMetadata; workflowMetadata: WorkflowMetadata; ops: Promise[]; closureVars?: Record; encryptionKey?: CryptoKey; + writables?: Map; }; /** diff --git a/packages/core/src/step/writable-stream.test.ts b/packages/core/src/step/writable-stream.test.ts index 01e83a1b53..ac80d675a4 100644 --- a/packages/core/src/step/writable-stream.test.ts +++ b/packages/core/src/step/writable-stream.test.ts @@ -2,12 +2,46 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { LOCK_POLL_INTERVAL_MS } from '../flushable-stream.js'; import { setWorld } from '../runtime/world.js'; +// Captures every chunk written to `world.streams.write` / `writeMulti` +// in arrival order, so tests can assert the on-wire sequence after +// going through the (de)serialize transforms. +let writeCalls: Uint8Array[]; + +function makeStepCtx(): any { + return { + stepMetadata: { + stepName: 'test-step', + stepId: 'step_001', + stepStartedAt: new Date(), + attempt: 1, + }, + workflowMetadata: { + workflowName: 'test-workflow', + workflowRunId: 'wrun_test123', + workflowStartedAt: new Date(), + url: 'http://localhost:3000', + features: { encryption: false }, + }, + ops: [] as Promise[], + encryptionKey: undefined, + }; +} + describe('step-level getWritable', () => { beforeEach(() => { + writeCalls = []; const mockWorld = { streams: { - write: vi.fn().mockResolvedValue(undefined), - writeMulti: vi.fn().mockResolvedValue(undefined), + write: vi.fn( + async (_runId: string, _name: string, chunk: Uint8Array) => { + writeCalls.push(chunk); + } + ), + writeMulti: vi.fn( + async (_runId: string, _name: string, chunks: Uint8Array[]) => { + writeCalls.push(...chunks); + } + ), close: vi.fn().mockResolvedValue(undefined), }, }; @@ -23,24 +57,8 @@ describe('step-level getWritable', () => { it('ops promise should resolve when writer lock is released (without closing stream)', async () => { const { contextStorage } = await import('./context-storage.js'); - const ops: Promise[] = []; - const ctx = { - stepMetadata: { - stepName: 'test-step', - stepId: 'step_001', - stepStartedAt: new Date(), - attempt: 1, - }, - workflowMetadata: { - workflowName: 'test-workflow', - workflowRunId: 'wrun_test123', - workflowStartedAt: new Date(), - url: 'http://localhost:3000', - features: { encryption: false }, - }, - ops, - encryptionKey: undefined, - }; + const ctx = makeStepCtx(); + const ops = ctx.ops as Promise[]; const writable = await contextStorage.run(ctx, async () => { const { getWritable } = await import('./writable-stream.js'); @@ -71,24 +89,8 @@ describe('step-level getWritable', () => { it('ops promise should resolve when stream is explicitly closed', async () => { const { contextStorage } = await import('./context-storage.js'); - const ops: Promise[] = []; - const ctx = { - stepMetadata: { - stepName: 'test-step', - stepId: 'step_001', - stepStartedAt: new Date(), - attempt: 1, - }, - workflowMetadata: { - workflowName: 'test-workflow', - workflowRunId: 'wrun_test123', - workflowStartedAt: new Date(), - url: 'http://localhost:3000', - features: { encryption: false }, - }, - ops, - encryptionKey: undefined, - }; + const ctx = makeStepCtx(); + const ops = ctx.ops as Promise[]; const writable = await contextStorage.run(ctx, async () => { const { getWritable } = await import('./writable-stream.js'); @@ -111,4 +113,110 @@ describe('step-level getWritable', () => { ]) ).resolves.not.toThrow(); }); + + // Regression for https://github.com/vercel/workflow/issues/2058. + // Repeat calls to `getWritable()` from the same step previously spawned + // independent TransformStream + pipe pairs that all flushed to the same + // (runId, name). On world-vercel the 50-100ms HTTP write latency turned + // that race window into deterministic reordering; locally it was + // invisible. We now memoize per (runId, namespace) so a single serial + // sink is shared across calls. + it('returns the same writable for repeat calls with the same namespace', async () => { + const { contextStorage } = await import('./context-storage.js'); + const ctx = makeStepCtx(); + + const [a, b] = await contextStorage.run(ctx, async () => { + const { getWritable } = await import('./writable-stream.js'); + return [getWritable(), getWritable()] as const; + }); + + expect(a).toBe(b); + + // Different namespaces still get distinct writables. + const [c, d] = await contextStorage.run(ctx, async () => { + const { getWritable } = await import('./writable-stream.js'); + return [ + getWritable({ namespace: 'left' }), + getWritable({ namespace: 'right' }), + ] as const; + }); + + expect(c).not.toBe(d); + expect(c).not.toBe(a); + }); + + it('preserves chunk order across per-write getWritable() calls in a loop', async () => { + const { contextStorage } = await import('./context-storage.js'); + const { getDeserializeStream } = await import('../serialization.js'); + + const ctx = makeStepCtx(); + const ops = ctx.ops as Promise[]; + + // Repro of the user-reported pattern: acquire a fresh writer per chunk + // and release between writes. With the pre-fix per-call pipe, these + // chunks could land out of order on the server. + const chunks = ['nov', 'o', ' e', '2', 'e', ' ok']; + await contextStorage.run(ctx, async () => { + const { getWritable } = await import('./writable-stream.js'); + for (const chunk of chunks) { + const writer = getWritable().getWriter(); + try { + await writer.write(chunk); + } finally { + writer.releaseLock(); + } + } + }); + + // Wait for all pending writes to flush through the shared pipe. + await Promise.race([ + Promise.all(ops), + new Promise((_, r) => + setTimeout( + () => r(new Error('ops did not resolve')), + LOCK_POLL_INTERVAL_MS * 20 + 500 + ) + ), + ]); + + // Decode the recorded server writes via the matching deserialize + // stream and confirm chunks arrived in the order we wrote them. + const deserialize = getDeserializeStream({}, undefined); + const decoded: string[] = []; + const reader = deserialize.readable.getReader(); + const drain = (async () => { + while (true) { + const r = await reader.read(); + if (r.done) return; + decoded.push(r.value); + } + })(); + + const writer = deserialize.writable.getWriter(); + for (const buf of writeCalls) { + await writer.write(buf); + } + await writer.close(); + await drain; + + expect(decoded).toEqual(chunks); + }); + + it('registers exactly one pipe per (runId, namespace), regardless of call count', async () => { + const { contextStorage } = await import('./context-storage.js'); + + const ctx = makeStepCtx(); + const ops = ctx.ops as Promise[]; + + await contextStorage.run(ctx, async () => { + const { getWritable } = await import('./writable-stream.js'); + getWritable(); + getWritable(); + getWritable(); + // A distinct namespace gets its own pipe. + getWritable({ namespace: 'other' }); + }); + + expect(ops).toHaveLength(2); + }); }); diff --git a/packages/core/src/step/writable-stream.ts b/packages/core/src/step/writable-stream.ts index f81349242e..99865b93fa 100644 --- a/packages/core/src/step/writable-stream.ts +++ b/packages/core/src/step/writable-stream.ts @@ -11,7 +11,7 @@ import { } from '../serialization.js'; import { STREAM_NAME_SYMBOL, STREAM_SERVER_RUN_ID_SYMBOL } from '../symbols.js'; import { getWorkflowRunStreamId } from '../util.js'; -import { contextStorage } from './context-storage.js'; +import { type CachedWritable, contextStorage } from './context-storage.js'; /** * The options for {@link getWritable}. @@ -50,7 +50,25 @@ export function getWritable( const runId = ctx.workflowMetadata.workflowRunId; const name = getWorkflowRunStreamId(runId, namespace); - // Create a transform stream that serializes chunks and pipes to the workflow server + // Cache the writable per (runId, namespace) within the step context. + // + // The previous behavior — constructing a fresh TransformStream and + // background pipe on every call — produced non-deterministic chunk + // ordering when callers acquired a new writer per write (e.g. a + // per-chunk loop). Each pipe flushed to the same (runId, name) server + // stream independently, and on Vercel the 50-100ms HTTP latency + // turned the race window from microseconds into something prod-visible. + // + // Sharing a single TransformStream + pipe across calls makes the + // unsafe pattern correct: writes go through one serial sink in the + // order the user wrote them. See + // https://github.com/vercel/workflow/issues/2058. + const cache = (ctx.writables ??= new Map()); + const cached = cache.get(name); + if (cached) { + return cached.writable as WritableStream; + } + const serialize = getSerializeStream( getExternalReducers(globalThis, ctx.ops, runId, ctx.encryptionKey), ctx.encryptionKey @@ -86,6 +104,7 @@ export function getWritable( writable: false, }); - // Return the writable side of the transform stream - return serialize.writable; + cache.set(name, { writable: serialize.writable, state }); + + return serialize.writable as WritableStream; }