From 1d3959eaa8db5866d08ad3970324c1b5dae73f7b Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 21 May 2026 13:09:13 -0700 Subject: [PATCH] Capture world contract failures as fatal (#2060) --- .../runtime-schema-validation-failure.md | 7 + packages/core/src/classify-error.test.ts | 22 ++ packages/core/src/classify-error.ts | 28 ++ packages/core/src/describe-error.test.ts | 30 ++ packages/core/src/describe-error.ts | 17 + packages/core/src/runtime.test.ts | 261 +++++++++++++- packages/core/src/runtime.ts | 332 +++++++++++------- packages/errors/src/error-codes.ts | 2 + packages/world-vercel/src/utils.ts | 4 +- 9 files changed, 574 insertions(+), 129 deletions(-) create mode 100644 .changeset/runtime-schema-validation-failure.md diff --git a/.changeset/runtime-schema-validation-failure.md b/.changeset/runtime-schema-validation-failure.md new file mode 100644 index 0000000000..1bf919a153 --- /dev/null +++ b/.changeset/runtime-schema-validation-failure.md @@ -0,0 +1,7 @@ +--- +"@workflow/core": patch +"@workflow/errors": patch +"@workflow/world-vercel": patch +--- + +Record fatal world response contract failures as non-retryable workflow errors. diff --git a/packages/core/src/classify-error.test.ts b/packages/core/src/classify-error.test.ts index 1007a2fa86..89bc6acb83 100644 --- a/packages/core/src/classify-error.test.ts +++ b/packages/core/src/classify-error.test.ts @@ -48,6 +48,28 @@ describe('classifyRunError', () => { ).toBe(RUN_ERROR_CODES.USER_ERROR); }); + it('classifies world schema validation failures as WORLD_CONTRACT_ERROR', () => { + expect( + classifyRunError( + new WorkflowWorldError( + 'Schema validation failed for POST /v3/runs/wrun/events', + { code: 'SCHEMA_VALIDATION' } + ) + ) + ).toBe(RUN_ERROR_CODES.WORLD_CONTRACT_ERROR); + }); + + it('classifies world response parse failures as WORLD_CONTRACT_ERROR', () => { + expect( + classifyRunError( + new WorkflowWorldError( + 'Failed to parse response body for GET /v3/runs/wrun/events', + { code: 'PARSE_ERROR' } + ) + ) + ).toBe(RUN_ERROR_CODES.WORLD_CONTRACT_ERROR); + }); + it('classifies string throw as USER_ERROR', () => { expect(classifyRunError('string error')).toBe(RUN_ERROR_CODES.USER_ERROR); }); diff --git a/packages/core/src/classify-error.ts b/packages/core/src/classify-error.ts index f8c0d069d0..6d1da5031a 100644 --- a/packages/core/src/classify-error.ts +++ b/packages/core/src/classify-error.ts @@ -5,8 +5,15 @@ import { StepNotRegisteredError, WorkflowNotRegisteredError, WorkflowRuntimeError, + WorkflowWorldError, } from '@workflow/errors'; +const WORLD_CONTRACT_ERROR_CODES = new Set([ + 'PARSE_ERROR', + 'SCHEMA_VALIDATION', + RUN_ERROR_CODES.WORLD_CONTRACT_ERROR, +]); + /** * Set of error names that should classify as generic `RUNTIME_ERROR`. Each * `*.is()` static does a name-based duck check, so subclassing alone is @@ -36,11 +43,32 @@ const RUNTIME_ERROR_CHECKS = [ * thrown inside the workflow VM and we'd misclassify genuine runtime * errors as user errors. */ +export function isWorldContractError(err: unknown): err is WorkflowWorldError { + if (!WorkflowWorldError.is(err) || err.status !== undefined) { + return false; + } + + const cause = 'cause' in err ? err.cause : undefined; + return ( + (err.code !== undefined && WORLD_CONTRACT_ERROR_CODES.has(err.code)) || + err.message.startsWith('Failed to parse response body for ') || + err.message.startsWith('Schema validation failed for ') || + (typeof cause === 'object' && + cause !== null && + 'name' in cause && + cause.name === 'ZodError') + ); +} + export function classifyRunError(err: unknown): RunErrorCode { if (CorruptedEventLogError.is(err)) { return RUN_ERROR_CODES.CORRUPTED_EVENT_LOG; } + if (isWorldContractError(err)) { + return RUN_ERROR_CODES.WORLD_CONTRACT_ERROR; + } + for (const isMatch of RUNTIME_ERROR_CHECKS) { if (isMatch(err)) { return RUN_ERROR_CODES.RUNTIME_ERROR; diff --git a/packages/core/src/describe-error.test.ts b/packages/core/src/describe-error.test.ts index 8b4e21d65d..627aeb1f42 100644 --- a/packages/core/src/describe-error.test.ts +++ b/packages/core/src/describe-error.test.ts @@ -96,6 +96,16 @@ describe('describeError', () => { expect(result.hint).toContain('max-delivery budget'); }); + test('WORLD_CONTRACT_ERROR via precomputed errorCode is attributed to the SDK', () => { + const result = describeError( + undefined, + RUN_ERROR_CODES.WORLD_CONTRACT_ERROR + ); + expect(result.attribution).toBe('sdk'); + expect(result.errorCode).toBe(RUN_ERROR_CODES.WORLD_CONTRACT_ERROR); + expect(result.hint).toContain('SDK contract'); + }); + test('precomputed errorCode wins over classifyRunError when both are provided', () => { // A plain Error would classify as USER_ERROR, but passing REPLAY_TIMEOUT // explicitly overrides that — useful for callers that know the failure @@ -179,6 +189,14 @@ describe('describeRunError', () => { expect(result.hint).toContain('max-delivery budget'); }); + test('WORLD_CONTRACT_ERROR errorCode is attributed to the SDK', () => { + const result = describeRunError({ + errorCode: RUN_ERROR_CODES.WORLD_CONTRACT_ERROR, + }); + expect(result.attribution).toBe('sdk'); + expect(result.hint).toContain('SDK contract'); + }); + test('RUNTIME_ERROR code without errorName still lands as SDK', () => { const result = describeRunError({ errorCode: RUN_ERROR_CODES.RUNTIME_ERROR, @@ -288,4 +306,16 @@ describe('describeError — payload shape snapshots', () => { } `); }); + + test('WORLD_CONTRACT_ERROR via precomputed errorCode payload', () => { + expect( + describeError(undefined, RUN_ERROR_CODES.WORLD_CONTRACT_ERROR) + ).toMatchInlineSnapshot(` + { + "attribution": "sdk", + "errorCode": "WORLD_CONTRACT_ERROR", + "hint": "The workflow backend returned data that violated the SDK contract. This is not retryable; please report it with the stack trace and runId.", + } + `); + }); }); diff --git a/packages/core/src/describe-error.ts b/packages/core/src/describe-error.ts index 45a9b61720..91cedfe5c2 100644 --- a/packages/core/src/describe-error.ts +++ b/packages/core/src/describe-error.ts @@ -83,6 +83,8 @@ const REPLAY_TIMEOUT_HINT = 'The workflow replay took too long. This usually means the event log is unusually large or the workflow function is doing heavy synchronous work between step boundaries.'; const MAX_DELIVERIES_HINT = 'The workflow queue exceeded its max-delivery budget. This usually indicates a persistent runtime failure — check the most recent stack traces for the underlying cause.'; +const WORLD_CONTRACT_HINT = + 'The workflow backend returned data that violated the SDK contract. This is not retryable; please report it with the stack trace and runId.'; function normalizeErrorCode(code: string | undefined): RunErrorCode { // Values read back from persisted events are `string | undefined` — we @@ -135,6 +137,13 @@ export function describeRunError( hint: CORRUPTED_EVENT_LOG_HINT, }; } + if (errorCode === RUN_ERROR_CODES.WORLD_CONTRACT_ERROR) { + return { + attribution: 'sdk', + errorCode, + hint: WORLD_CONTRACT_HINT, + }; + } if (name === 'WorkflowRuntimeError' || name === 'StepNotRegisteredError') { return { attribution: 'sdk', errorCode, hint: RUNTIME_ERROR_HINT }; } @@ -227,5 +236,13 @@ export function describeError( }; } + if (effectiveCode === RUN_ERROR_CODES.WORLD_CONTRACT_ERROR) { + return { + attribution: 'sdk', + errorCode: effectiveCode, + hint: WORLD_CONTRACT_HINT, + }; + } + return { attribution: 'user', errorCode: effectiveCode }; } diff --git a/packages/core/src/runtime.test.ts b/packages/core/src/runtime.test.ts index 4c950026ce..e110ec6869 100644 --- a/packages/core/src/runtime.test.ts +++ b/packages/core/src/runtime.test.ts @@ -1,4 +1,4 @@ -import { RUN_ERROR_CODES } from '@workflow/errors'; +import { RUN_ERROR_CODES, WorkflowWorldError } from '@workflow/errors'; import { type Event, SPEC_VERSION_CURRENT, @@ -97,6 +97,265 @@ describe('workflowEntrypoint replay guards', () => { `;globalThis.__private_workflows = new Map(); globalThis.__private_workflows.set(${JSON.stringify(workflowName)}, ${workflowName});`; + it('records run_failed when run_started response schema validation fails', async () => { + const createdEvents: unknown[] = []; + const schemaError = new WorkflowWorldError( + 'Schema validation failed for POST /v3/runs/wrun_schema_validation/events:\n' + + ' run.output: Invalid input: expected nonoptional, received undefined\n' + + ' run.error: Invalid input: expected nonoptional, received undefined\n' + + ' run.completedAt: Invalid input: expected nonoptional, received undefined', + { code: 'SCHEMA_VALIDATION' } + ); + const eventsCreate = vi.fn(async (_runId: string, data: any) => { + if (data.eventType === 'run_started') { + throw schemaError; + } + + createdEvents.push(data); + return { + event: { + eventId: `event-${createdEvents.length}`, + runId: 'wrun_schema_validation', + createdAt: new Date(), + ...data, + }, + }; + }); + + setWorld({ + specVersion: SPEC_VERSION_CURRENT, + createQueueHandler: vi.fn( + ( + _prefix: string, + handler: (message: unknown, metadata: unknown) => Promise + ) => { + return async () => { + await handler( + { + runId: 'wrun_schema_validation', + requestedAt: new Date('2024-01-01T00:00:00.000Z'), + }, + { + requestId: 'req_test', + attempt: 1, + queueName: '__wkf_workflow_workflow', + messageId: 'msg_test', + } + ); + return new Response(null, { status: 204 }); + }; + } + ), + events: { + create: eventsCreate, + list: vi.fn(async () => ({ + data: [], + hasMore: false, + cursor: 'cursor_test', + })), + }, + runs: { + get: vi.fn(), + }, + queue: vi.fn(), + getEncryptionKeyForRun: vi.fn(async () => undefined), + } as any); + + const handler = workflowEntrypoint( + `async function workflow() { + return 'done'; + }${getWorkflowTransformCode('workflow')}` + ); + + const response = await handler(new Request('https://example.test')); + + expect(response.status).toBe(204); + expect(createdEvents).toContainEqual( + expect.objectContaining({ + eventType: 'run_failed', + eventData: expect.objectContaining({ + errorCode: RUN_ERROR_CODES.WORLD_CONTRACT_ERROR, + }), + }) + ); + }); + + it('records run_failed when event listing response schema validation fails', async () => { + const createdEvents: unknown[] = []; + const workflowRun: WorkflowRun = { + runId: 'wrun_events_schema_validation', + workflowName: 'workflow', + status: 'running', + input: await dehydrateWorkflowArguments( + [], + 'wrun_events_schema_validation', + undefined, + [] + ), + createdAt: new Date('2024-01-01T00:00:00.000Z'), + updatedAt: new Date('2024-01-01T00:00:00.000Z'), + startedAt: new Date('2024-01-01T00:00:00.000Z'), + deploymentId: 'test-deployment', + }; + const schemaError = new WorkflowWorldError( + 'Schema validation failed for GET /v3/runs/wrun_events_schema_validation/events:\n' + + ' data.0.eventData: Invalid input', + { code: 'SCHEMA_VALIDATION' } + ); + + const eventsCreate = vi.fn(async (_runId: string, data: any) => { + if (data.eventType !== 'run_started') { + createdEvents.push(data); + } + + return data.eventType === 'run_started' + ? { run: workflowRun } + : { + event: { + eventId: `event-${createdEvents.length}`, + runId: workflowRun.runId, + createdAt: new Date(), + ...data, + }, + }; + }); + + setWorld({ + specVersion: SPEC_VERSION_CURRENT, + createQueueHandler: vi.fn( + ( + _prefix: string, + handler: (message: unknown, metadata: unknown) => Promise + ) => { + return async () => { + await handler( + { + runId: workflowRun.runId, + requestedAt: new Date('2024-01-01T00:00:00.000Z'), + }, + { + requestId: 'req_test', + attempt: 1, + queueName: '__wkf_workflow_workflow', + messageId: 'msg_test', + } + ); + return new Response(null, { status: 204 }); + }; + } + ), + events: { + create: eventsCreate, + list: vi.fn(async () => { + throw schemaError; + }), + }, + runs: { + get: vi.fn(async () => workflowRun), + }, + queue: vi.fn(), + getEncryptionKeyForRun: vi.fn(async () => undefined), + } as any); + + const handler = workflowEntrypoint( + `async function workflow() { + return 'done'; + }${getWorkflowTransformCode('workflow')}` + ); + + const response = await handler(new Request('https://example.test')); + + expect(response.status).toBe(204); + expect(createdEvents).toContainEqual( + expect.objectContaining({ + eventType: 'run_failed', + eventData: expect.objectContaining({ + errorCode: RUN_ERROR_CODES.WORLD_CONTRACT_ERROR, + }), + }) + ); + }); + + it('records run_failed when run_started response parsing fails', async () => { + const createdEvents: unknown[] = []; + const parseError = new WorkflowWorldError( + 'Failed to parse response body for POST /v3/runs/wrun_parse/events (Content-Type: application/cbor):\n\nError: unexpected end of file', + { code: 'PARSE_ERROR' } + ); + const eventsCreate = vi.fn(async (_runId: string, data: any) => { + if (data.eventType === 'run_started') { + throw parseError; + } + + createdEvents.push(data); + return { + event: { + eventId: `event-${createdEvents.length}`, + runId: 'wrun_parse', + createdAt: new Date(), + ...data, + }, + }; + }); + + setWorld({ + specVersion: SPEC_VERSION_CURRENT, + createQueueHandler: vi.fn( + ( + _prefix: string, + handler: (message: unknown, metadata: unknown) => Promise + ) => { + return async () => { + await handler( + { + runId: 'wrun_parse', + requestedAt: new Date('2024-01-01T00:00:00.000Z'), + }, + { + requestId: 'req_test', + attempt: 1, + queueName: '__wkf_workflow_workflow', + messageId: 'msg_test', + } + ); + return new Response(null, { status: 204 }); + }; + } + ), + events: { + create: eventsCreate, + list: vi.fn(async () => ({ + data: [], + hasMore: false, + cursor: 'cursor_test', + })), + }, + runs: { + get: vi.fn(), + }, + queue: vi.fn(), + getEncryptionKeyForRun: vi.fn(async () => undefined), + } as any); + + const handler = workflowEntrypoint( + `async function workflow() { + return 'done'; + }${getWorkflowTransformCode('workflow')}` + ); + + const response = await handler(new Request('https://example.test')); + + expect(response.status).toBe(204); + expect(createdEvents).toContainEqual( + expect.objectContaining({ + eventType: 'run_failed', + eventData: expect.objectContaining({ + errorCode: RUN_ERROR_CODES.WORLD_CONTRACT_ERROR, + }), + }) + ); + }); + it('records run_failed when a committed wait_completed targets the wrong wait', async () => { const ops: Promise[] = []; const workflowRun: WorkflowRun = { diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index c17d66265e..f4cd051592 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -3,6 +3,7 @@ import { EntityConflictError, FatalError, RUN_ERROR_CODES, + type RunErrorCode, RunExpiredError, WorkflowRuntimeError, } from '@workflow/errors'; @@ -12,8 +13,9 @@ import { SPEC_VERSION_CURRENT, WorkflowInvokePayloadSchema, type WorkflowRun, + type World, } from '@workflow/world'; -import { classifyRunError } from './classify-error.js'; +import { classifyRunError, isWorldContractError } from './classify-error.js'; import { describeError } from './describe-error.js'; import { WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; @@ -102,6 +104,74 @@ export { setWorld, } from './runtime/world.js'; +function getWorkflowSetupErrorCode(err: unknown): RunErrorCode | null { + if (WorkflowRuntimeError.is(err)) { + return RUN_ERROR_CODES.RUNTIME_ERROR; + } + + if (isWorldContractError(err)) { + return RUN_ERROR_CODES.WORLD_CONTRACT_ERROR; + } + + return null; +} + +async function recordFatalRunError({ + world, + workflowRun, + runId, + requestId, + err, + errorCode, + logMessage, +}: { + world: World; + workflowRun: WorkflowRun | undefined; + runId: string; + requestId: string | undefined; + err: unknown; + errorCode: RunErrorCode; + logMessage: string; +}) { + runtimeLogger.error(logMessage, { + workflowRunId: runId, + errorCode, + error: err instanceof Error ? err.message : String(err), + }); + + try { + const getEncryptionKey = memoizeEncryptionKey(world, workflowRun ?? runId); + await world.events.create( + runId, + { + eventType: 'run_failed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + error: await dehydrateRunError(err, runId, await getEncryptionKey()), + errorCode, + }, + }, + { requestId } + ); + } catch (failErr) { + if (EntityConflictError.is(failErr) || RunExpiredError.is(failErr)) { + return; + } + if (isWorldContractError(failErr)) { + runtimeLogger.error( + 'Fatal world contract error while recording workflow failure', + { + workflowRunId: runId, + errorCode: RUN_ERROR_CODES.WORLD_CONTRACT_ERROR, + error: failErr instanceof Error ? failErr.message : String(failErr), + } + ); + return; + } + throw failErr; + } +} + /** * Creates a single route which handles workflow execution requests, * executing steps inline when possible to reduce function invocations @@ -336,105 +406,123 @@ export function workflowEntrypoint( // roundtrip). If not, return — the last handler to complete // will pick up the replay. if (incomingStepId && incomingStepName) { - const bgRun = await world.runs.get(runId); - if (bgRun.status !== 'running') { - runtimeLogger.debug( - 'Run already finished, skipping background step', - { workflowRunId: runId, status: bgRun.status } - ); - return; - } - const bgStartedAt = bgRun.startedAt - ? +bgRun.startedAt - : Date.now(); - const stepResult = await executeStep({ - world, - workflowRunId: runId, - workflowName, - workflowStartedAt: bgStartedAt, - stepId: incomingStepId, - stepName: incomingStepName, - }); - if (stepResult.type === 'retry') { - return { timeoutSeconds: stepResult.timeoutSeconds }; - } - if (stepResult.type === 'throttled') { - return { timeoutSeconds: stepResult.timeoutSeconds }; - } - - // If step had pending ops (stream writes), break and let - // waitUntil flush them — can't continue inline. - if ( - stepResult.type === 'completed' && - stepResult.hasPendingOps - ) { - await queueMessage( + try { + const bgRun = await world.runs.get(runId); + if (bgRun.status !== 'running') { + runtimeLogger.debug( + 'Run already finished, skipping background step', + { workflowRunId: runId, status: bgRun.status } + ); + return; + } + const bgStartedAt = bgRun.startedAt + ? +bgRun.startedAt + : Date.now(); + const stepResult = await executeStep({ world, - getWorkflowQueueName(workflowName), - { - runId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - } - ); - return; - } + workflowRunId: runId, + workflowName, + workflowStartedAt: bgStartedAt, + stepId: incomingStepId, + stepName: incomingStepName, + }); + if (stepResult.type === 'retry') { + return { timeoutSeconds: stepResult.timeoutSeconds }; + } + if (stepResult.type === 'throttled') { + return { timeoutSeconds: stepResult.timeoutSeconds }; + } - if ( - stepResult.type === 'completed' || - stepResult.type === 'failed' || - stepResult.type === 'skipped' - ) { - // Load events to check if all parallel steps are done. - // Use cursor-based loading so the main loop can continue - // incrementally from here. - const loaded = await loadWorkflowRunEvents(runId); - cachedEvents = loaded.events; - eventsCursor = loaded.cursor; - - // Check for pending steps: any step_created without - // a matching step_completed or step_failed. - const stepCreatedIds = new Set(); - const stepTerminalIds = new Set(); - for (const e of cachedEvents) { - if (e.eventType === 'step_created') { - stepCreatedIds.add(e.correlationId); - } else if ( - e.eventType === 'step_completed' || - e.eventType === 'step_failed' - ) { - stepTerminalIds.add(e.correlationId); - } + // If step had pending ops (stream writes), break and let + // waitUntil flush them — can't continue inline. + if ( + stepResult.type === 'completed' && + stepResult.hasPendingOps + ) { + await queueMessage( + world, + getWorkflowQueueName(workflowName), + { + runId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + } + ); + return; } - let hasPendingSteps = false; - for (const id of stepCreatedIds) { - if (!stepTerminalIds.has(id)) { - hasPendingSteps = true; - break; + + if ( + stepResult.type === 'completed' || + stepResult.type === 'failed' || + stepResult.type === 'skipped' + ) { + // Load events to check if all parallel steps are done. + // Use cursor-based loading so the main loop can continue + // incrementally from here. + const loaded = await loadWorkflowRunEvents(runId); + cachedEvents = loaded.events; + eventsCursor = loaded.cursor; + + // Check for pending steps: any step_created without + // a matching step_completed or step_failed. + const stepCreatedIds = new Set(); + const stepTerminalIds = new Set(); + for (const e of cachedEvents) { + if (e.eventType === 'step_created') { + stepCreatedIds.add(e.correlationId); + } else if ( + e.eventType === 'step_completed' || + e.eventType === 'step_failed' + ) { + stepTerminalIds.add(e.correlationId); + } + } + let hasPendingSteps = false; + for (const id of stepCreatedIds) { + if (!stepTerminalIds.has(id)) { + hasPendingSteps = true; + break; + } } - } - if (hasPendingSteps) { - // Other steps still in progress. Return without - // queuing — the last handler to complete will see - // all steps done and replay inline. + if (hasPendingSteps) { + // Other steps still in progress. Return without + // queuing — the last handler to complete will see + // all steps done and replay inline. + runtimeLogger.debug( + 'Background step done but other steps pending, returning', + { workflowRunId: runId } + ); + return; + } + + // All steps done — fall through to the main replay loop. + // Set up shared state so the loop can continue. runtimeLogger.debug( - 'Background step done but other steps pending, returning', + 'All parallel steps done, replaying inline after background step', { workflowRunId: runId } ); + workflowRun = bgRun; + workflowStartedAt = bgStartedAt; + // cachedEvents and eventsCursor already set from load above + } else { return; } - - // All steps done — fall through to the main replay loop. - // Set up shared state so the loop can continue. - runtimeLogger.debug( - 'All parallel steps done, replaying inline after background step', - { workflowRunId: runId } - ); - workflowRun = bgRun; - workflowStartedAt = bgStartedAt; - // cachedEvents and eventsCursor already set from load above - } else { + } catch (err) { + const errorCode = getWorkflowSetupErrorCode(err); + if (!errorCode) { + throw err; + } + await recordFatalRunError({ + world, + workflowRun, + runId, + requestId, + err, + errorCode, + logMessage: + 'Fatal error while preparing background workflow step', + }); return; } } @@ -514,44 +602,22 @@ export function workflowEntrypoint( { workflowRunId: runId, message: err.message } ); return; - } else if (err instanceof WorkflowRuntimeError) { - runtimeLogger.error( - 'Fatal runtime error during workflow setup', - { workflowRunId: runId, error: err.message } - ); - try { - const getEncryptionKey = memoizeEncryptionKey( - world, - runId - ); - await world.events.create( - runId, - { - eventType: 'run_failed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - error: await dehydrateRunError( - err, - runId, - await getEncryptionKey() - ), - errorCode: RUN_ERROR_CODES.RUNTIME_ERROR, - }, - }, - { requestId } - ); - } catch (failErr) { - if ( - EntityConflictError.is(failErr) || - RunExpiredError.is(failErr) - ) { - return; - } - throw failErr; + } else { + const errorCode = getWorkflowSetupErrorCode(err); + if (!errorCode) { + throw err; } + await recordFatalRunError({ + world, + workflowRun, + runId, + requestId, + err, + errorCode, + logMessage: + 'Fatal runtime error during workflow setup', + }); return; - } else { - throw err; } } @@ -1176,6 +1242,20 @@ export function workflowEntrypoint( ); return; } + if (isWorldContractError(failErr)) { + runtimeLogger.error( + 'Fatal world contract error while recording workflow failure', + { + workflowRunId: runId, + errorCode: RUN_ERROR_CODES.WORLD_CONTRACT_ERROR, + error: + failErr instanceof Error + ? failErr.message + : String(failErr), + } + ); + return; + } throw failErr; } diff --git a/packages/errors/src/error-codes.ts b/packages/errors/src/error-codes.ts index fadbdbf32d..c28695f36d 100644 --- a/packages/errors/src/error-codes.ts +++ b/packages/errors/src/error-codes.ts @@ -14,6 +14,8 @@ export const RUN_ERROR_CODES = { MAX_DELIVERIES_EXCEEDED: 'MAX_DELIVERIES_EXCEEDED', /** Workflow replay exceeded the maximum allowed duration */ REPLAY_TIMEOUT: 'REPLAY_TIMEOUT', + /** World response violated the SDK contract and cannot be retried safely */ + WORLD_CONTRACT_ERROR: 'WORLD_CONTRACT_ERROR', } as const; export type RunErrorCode = diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index c6eb7825eb..8acefbaa69 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -423,7 +423,7 @@ export async function makeRequest({ const contentType = response.headers.get('Content-Type') || 'unknown'; throw new WorkflowWorldError( `Failed to parse response body for ${request.method} ${endpoint} (Content-Type: ${contentType}):\n\n${error}`, - { url, cause: error } + { url, code: 'PARSE_ERROR', cause: error } ); } @@ -442,7 +442,7 @@ export async function makeRequest({ : ''; throw new WorkflowWorldError( `Schema validation failed for ${method} ${endpoint}:\n${issues}${debugContext}`, - { url, cause: validationResult.error } + { url, code: 'SCHEMA_VALIDATION', cause: validationResult.error } ); } return validationResult.data;