diff --git a/.changeset/event-write-occ-fence.md b/.changeset/event-write-occ-fence.md new file mode 100644 index 0000000000..ab4b7decc1 --- /dev/null +++ b/.changeset/event-write-occ-fence.md @@ -0,0 +1,13 @@ +--- +"@workflow/core": patch +"@workflow/world": patch +"@workflow/world-vercel": patch +--- + +Add an optimistic-concurrency fence to event writes that talk to workflow-server. + +- The elapsed-wait scan now passes `lastKnownEventId` snapshotted from the loaded events when committing `wait_completed`, so a stale-snapshot tick can't slip a sleep-branch event past a freshly-committed `hook_received`. +- `resumeHook` sends `asOfTimestamp` with the new `hook_received` event so the server-side fence is anchored at the resume call's wall-clock without paying for a client-side event pre-read. +- The `CreateEventParams` shape on `@workflow/world` grows two optional fields (`lastKnownEventId`, `asOfTimestamp`) that worlds may forward as-is. + +Conflict surfaces as the existing `EntityConflictError`, which the runtime already reloads-and-continues on. diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 1aa36e32ad..69f0d8db14 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -796,23 +796,106 @@ export function workflowEntrypoint( }, })); + // Snapshot the loaded events' tail eventId as the OCC + // fence. If a concurrent writer (e.g. `resumeHook`) + // committed something between our load and this write, + // the server's CAS rejects and we retry *in-place* + // with a freshly-loaded fence rather than throwing + // the whole tick away. Falling back to queue + // redelivery thunder-herds — every redelivery spawns + // another concurrent tick which fences-conflicts + // again, and workflows stall in `running`. + let fenceEventId: string | undefined = + events.length > 0 + ? events[events.length - 1].eventId + : undefined; + const MAX_FENCE_RETRIES = 5; for (const waitEvent of waitsToComplete) { - try { - await world.events.create(runId, waitEvent, { - requestId, - }); - } catch (err) { - if (EntityConflictError.is(err)) { - runtimeLogger.info( - 'Wait already completed, skipping', + let attempts = 0; + let written = false; + while (!written) { + try { + const result = await world.events.create( + runId, + waitEvent, { - workflowRunId: runId, - correlationId: waitEvent.correlationId, + requestId, + ...(fenceEventId + ? { lastKnownEventId: fenceEventId } + : {}), } ); - continue; + if (result.event) { + fenceEventId = result.event.eventId; + } + written = true; + } catch (err) { + if (!EntityConflictError.is(err)) { + throw err; + } + // Fence conflicts surface a specific error + // message from workflow-server. Anything + // else (workflow-server "Workflow wait …", + // world-local 'Wait "…" already completed', + // and any other world's duplicate-wait + // shape) is the existing + // wait-already-completed conflict — skip + // and continue, matching pre-OCC behavior + // across worlds. + const isFenceConflict = /fence conflict/i.test( + err.message + ); + if (!isFenceConflict) { + runtimeLogger.info( + 'Wait already completed, skipping', + { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + } + ); + break; + } + attempts += 1; + if (attempts > MAX_FENCE_RETRIES) { + runtimeLogger.warn( + 'Wait completion gave up after fence retries; falling back to queue redelivery', + { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + attempts, + } + ); + throw err; + } + const loaded = eventsCursor + ? await loadWorkflowRunEvents(runId, eventsCursor) + : await loadWorkflowRunEvents(runId); + if (eventsCursor) { + for (const e of loaded.events) { + if ( + !events.some((x) => x.eventId === e.eventId) + ) { + events.push(e); + } + } + eventsCursor = loaded.cursor ?? eventsCursor; + } else { + events = loaded.events; + eventsCursor = loaded.cursor; + } + const alreadyCompleted = events.some( + (e) => + e.eventType === 'wait_completed' && + e.correlationId === waitEvent.correlationId + ); + if (alreadyCompleted) { + break; + } + fenceEventId = events[events.length - 1]?.eventId; + await new Promise((r) => + setTimeout(r, 25 * attempts) + ); } - throw err; } } diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index 0787d4d4e5..c89aec4621 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -156,7 +156,12 @@ export async function resumeHook( }) ); - // Create a hook_received event with the payload + // Append `hook_received` unconditionally — ULID ordering already + // places this write after anything committed before us. We do + // NOT send `lastKnownEventId` here: a fence would only ever + // reject the hook in favor of an unrelated concurrent write, + // which would lose the user's hook signal. Stale-snapshot + // protection lives on the *tick* writes that consume hooks. await world.events.create( hook.runId, { diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 433cedcc7c..a896e04330 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -457,6 +457,12 @@ async function createWorkflowRunEventInner( ...data, remoteRefBehavior, ...(params?.requestId ? { vercelId: params.requestId } : {}), + ...(params?.lastKnownEventId + ? { lastKnownEventId: params.lastKnownEventId } + : {}), + ...(params?.asOfTimestamp !== undefined + ? { asOfTimestamp: params.asOfTimestamp } + : {}), }, config, schema: EventResultResolveWireSchema, diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index a541ac0563..3a33e2bfc0 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -391,6 +391,20 @@ export interface CreateEventParams { resolveData?: ResolveData; /** Request ID (x-vercel-id when on Vercel) for correlating request logs with workflow events. */ requestId?: string; + /** + * OCC fence: when set, the event write is rejected with a conflict + * unless the run's materialized `lastKnownEventId` equals this value. + * Lets the runtime stop a stale-snapshot tick from advancing the log. + */ + lastKnownEventId?: string; + /** + * OCC fence (alternative form): unix-ms cutoff. Server resolves to the + * highest eventId strictly before this timestamp and uses that as the + * expected fence. Lets `resumeHook` fence `hook_received` after anything + * the caller could have observed without paying for a separate read. + * Ignored when `lastKnownEventId` is also set. + */ + asOfTimestamp?: number; } /**