Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/event-write-occ-fence.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"@workflow/core": patch
"@workflow/world": patch
"@workflow/world-vercel": patch
---

Add an optimistic-concurrency fence to event writes that talk to workflow-server.

- The elapsed-wait scan now passes `lastKnownEventId` snapshotted from the loaded events when committing `wait_completed`, so a stale-snapshot tick can't slip a sleep-branch event past a freshly-committed `hook_received`.
- `resumeHook` sends `asOfTimestamp` with the new `hook_received` event so the server-side fence is anchored at the resume call's wall-clock without paying for a client-side event pre-read.
- The `CreateEventParams` shape on `@workflow/world` grows two optional fields (`lastKnownEventId`, `asOfTimestamp`) that worlds may forward as-is.

Conflict surfaces as the existing `EntityConflictError`, which the runtime already reloads-and-continues on.
107 changes: 95 additions & 12 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -796,23 +796,106 @@ export function workflowEntrypoint(
},
}));

// Snapshot the loaded events' tail eventId as the OCC
// fence. If a concurrent writer (e.g. `resumeHook`)
// committed something between our load and this write,
// the server's CAS rejects and we retry *in-place*
// with a freshly-loaded fence rather than throwing
// the whole tick away. Falling back to queue
// redelivery thunder-herds — every redelivery spawns
// another concurrent tick which fences-conflicts
// again, and workflows stall in `running`.
let fenceEventId: string | undefined =
events.length > 0
? events[events.length - 1].eventId
: undefined;
const MAX_FENCE_RETRIES = 5;
for (const waitEvent of waitsToComplete) {
try {
await world.events.create(runId, waitEvent, {
requestId,
});
} catch (err) {
if (EntityConflictError.is(err)) {
runtimeLogger.info(
'Wait already completed, skipping',
let attempts = 0;
let written = false;
while (!written) {
try {
const result = await world.events.create(
runId,
waitEvent,
{
workflowRunId: runId,
correlationId: waitEvent.correlationId,
requestId,
...(fenceEventId
? { lastKnownEventId: fenceEventId }
: {}),
}
);
continue;
if (result.event) {
fenceEventId = result.event.eventId;
}
written = true;
} catch (err) {
if (!EntityConflictError.is(err)) {
throw err;
}
// Fence conflicts surface a specific error
// message from workflow-server. Anything
// else (workflow-server "Workflow wait …",
// world-local 'Wait "…" already completed',
// and any other world's duplicate-wait
// shape) is the existing
// wait-already-completed conflict — skip
// and continue, matching pre-OCC behavior
// across worlds.
const isFenceConflict = /fence conflict/i.test(
err.message
);
if (!isFenceConflict) {
runtimeLogger.info(
'Wait already completed, skipping',
{
workflowRunId: runId,
correlationId: waitEvent.correlationId,
}
);
break;
}
attempts += 1;
if (attempts > MAX_FENCE_RETRIES) {
runtimeLogger.warn(
'Wait completion gave up after fence retries; falling back to queue redelivery',
{
workflowRunId: runId,
correlationId: waitEvent.correlationId,
attempts,
}
);
throw err;
}
const loaded = eventsCursor
? await loadWorkflowRunEvents(runId, eventsCursor)
: await loadWorkflowRunEvents(runId);
if (eventsCursor) {
for (const e of loaded.events) {
if (
!events.some((x) => x.eventId === e.eventId)
) {
events.push(e);
}
}
eventsCursor = loaded.cursor ?? eventsCursor;
} else {
events = loaded.events;
eventsCursor = loaded.cursor;
}
const alreadyCompleted = events.some(
(e) =>
e.eventType === 'wait_completed' &&
e.correlationId === waitEvent.correlationId
);
if (alreadyCompleted) {
break;
}
fenceEventId = events[events.length - 1]?.eventId;
await new Promise((r) =>
setTimeout(r, 25 * attempts)
);
}
throw err;
}
}

Expand Down
7 changes: 6 additions & 1 deletion packages/core/src/runtime/resume-hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,12 @@ export async function resumeHook<T = any>(
})
);

// Create a hook_received event with the payload
// Append `hook_received` unconditionally — ULID ordering already
// places this write after anything committed before us. We do
// NOT send `lastKnownEventId` here: a fence would only ever
// reject the hook in favor of an unrelated concurrent write,
// which would lose the user's hook signal. Stale-snapshot
// protection lives on the *tick* writes that consume hooks.
await world.events.create(
hook.runId,
{
Expand Down
6 changes: 6 additions & 0 deletions packages/world-vercel/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,12 @@ async function createWorkflowRunEventInner(
...data,
remoteRefBehavior,
...(params?.requestId ? { vercelId: params.requestId } : {}),
...(params?.lastKnownEventId
? { lastKnownEventId: params.lastKnownEventId }
: {}),
...(params?.asOfTimestamp !== undefined
? { asOfTimestamp: params.asOfTimestamp }
: {}),
},
config,
schema: EventResultResolveWireSchema,
Expand Down
14 changes: 14 additions & 0 deletions packages/world/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,20 @@ export interface CreateEventParams {
resolveData?: ResolveData;
/** Request ID (x-vercel-id when on Vercel) for correlating request logs with workflow events. */
requestId?: string;
/**
* OCC fence: when set, the event write is rejected with a conflict
* unless the run's materialized `lastKnownEventId` equals this value.
* Lets the runtime stop a stale-snapshot tick from advancing the log.
*/
lastKnownEventId?: string;
/**
* OCC fence (alternative form): unix-ms cutoff. Server resolves to the
* highest eventId strictly before this timestamp and uses that as the
* expected fence. Lets `resumeHook` fence `hook_received` after anything
* the caller could have observed without paying for a separate read.
* Ignored when `lastKnownEventId` is also set.
*/
asOfTimestamp?: number;
}

/**
Expand Down
Loading