diff --git a/.changeset/core-run-id-input.md b/.changeset/core-run-id-input.md new file mode 100644 index 0000000000..79f60c54a2 --- /dev/null +++ b/.changeset/core-run-id-input.md @@ -0,0 +1,6 @@ +--- +"@workflow/core": minor +"workflow": minor +--- + +`start()` now delegates run ID generation to `world.createRunId(options)` when the world provides it, falling back to a monotonic ULID otherwise. The full options bag is passed through so worlds can read whichever fields they recognise. Adds a new `region` option that worlds may consume — when set, it is also forwarded onto the queue options so the initial workflow message is routed to the matching region. diff --git a/.changeset/tagged-run-id.md b/.changeset/tagged-run-id.md new file mode 100644 index 0000000000..956ea45717 --- /dev/null +++ b/.changeset/tagged-run-id.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-vercel": minor +--- + +Add `@workflow/world-vercel/run-id` sub-export with `encode`/`decode` helpers that produce ULID-shaped workflow run IDs carrying a tag bit, a 5-bit version, and a 6-bit Vercel region ID. diff --git a/.changeset/world-create-run-id.md b/.changeset/world-create-run-id.md new file mode 100644 index 0000000000..929357986c --- /dev/null +++ b/.changeset/world-create-run-id.md @@ -0,0 +1,5 @@ +--- +"@workflow/world": minor +--- + +Add optional `createRunId(options?)` to the `World` interface and `region` to `QueueOptions`. Worlds can now mint custom run IDs (reading whichever start-option fields they recognise) and route messages to a specific region. diff --git a/.changeset/world-vercel-create-run-id.md b/.changeset/world-vercel-create-run-id.md new file mode 100644 index 0000000000..7b59054d2f --- /dev/null +++ b/.changeset/world-vercel-create-run-id.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-vercel": minor +--- + +Implement `World.createRunId` to mint region-tagged ULIDs, preferring an explicit `options.region` from `start()` and falling back to the `VERCEL_REGION` environment variable. The queue now routes each message to the region encoded in the payload's tagged run ID (or to an explicit `opts.region` override), instead of the previous hard-coded `iad1`. diff --git a/packages/core/src/runtime/start.test.ts b/packages/core/src/runtime/start.test.ts index c30a17da27..987fe60e0a 100644 --- a/packages/core/src/runtime/start.test.ts +++ b/packages/core/src/runtime/start.test.ts @@ -533,4 +533,139 @@ describe('start', () => { expectTypeOf().toMatchTypeOf(); }); }); + + describe('createRunId', () => { + let mockEventsCreate: ReturnType; + let mockQueue: ReturnType; + + beforeEach(() => { + mockEventsCreate = vi.fn().mockImplementation((runId) => { + return Promise.resolve({ + run: { runId: runId ?? 'wrun_test123', status: 'pending' }, + }); + }); + mockQueue = vi.fn().mockResolvedValue(undefined); + }); + + afterEach(() => { + setWorld(undefined); + vi.clearAllMocks(); + }); + + it('uses world.createRunId() when provided', async () => { + const validWorkflow = Object.assign(() => Promise.resolve('result'), { + workflowId: 'test-workflow', + }); + + const customId = '01ARZ3NDEKTSV4RRFFQ69G5FAV'; + const createRunId = vi.fn().mockReturnValue(customId); + + setWorld({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + createRunId, + } as any); + + await start(validWorkflow, []); + + expect(createRunId).toHaveBeenCalledTimes(1); + // No options were passed, so the world receives an empty object + // (the default value used internally). + expect(createRunId).toHaveBeenCalledWith({}); + expect(mockEventsCreate).toHaveBeenCalledWith( + `wrun_${customId}`, + expect.objectContaining({ eventType: 'run_created' }), + expect.any(Object) + ); + }); + + it('passes the full options bag through to world.createRunId()', async () => { + const validWorkflow = Object.assign(() => Promise.resolve('result'), { + workflowId: 'test-workflow', + }); + + const customId = '01ARZ3NDEKTSV4RRFFQ69G5FAV'; + const createRunId = vi.fn().mockReturnValue(customId); + + setWorld({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + createRunId, + } as any); + + await start(validWorkflow, [], { + region: 'fra1', + specVersion: 3, + }); + + expect(createRunId).toHaveBeenCalledWith( + expect.objectContaining({ region: 'fra1', specVersion: 3 }) + ); + }); + + it('threads opts.region onto queue opts', async () => { + const validWorkflow = Object.assign(() => Promise.resolve('result'), { + workflowId: 'test-workflow', + }); + + const customId = '01ARZ3NDEKTSV4RRFFQ69G5FAV'; + + setWorld({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + createRunId: vi.fn().mockReturnValue(customId), + } as any); + + await start(validWorkflow, [], { region: 'fra1' }); + + expect(mockQueue).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ runId: `wrun_${customId}` }), + expect.objectContaining({ region: 'fra1' }) + ); + }); + + it('omits region from queue opts when opts.region is undefined', async () => { + const validWorkflow = Object.assign(() => Promise.resolve('result'), { + workflowId: 'test-workflow', + }); + + setWorld({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + createRunId: vi.fn().mockReturnValue('01ARZ3NDEKTSV4RRFFQ69G5FAV'), + } as any); + + await start(validWorkflow, []); + + const queueOpts = mockQueue.mock.calls[0][2]; + expect(queueOpts).not.toHaveProperty('region'); + }); + + it('falls back to a default monotonic ULID when world.createRunId is omitted', async () => { + const validWorkflow = Object.assign(() => Promise.resolve('result'), { + workflowId: 'test-workflow', + }); + + setWorld({ + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + } as any); + + await start(validWorkflow, []); + + // ULIDs are 26 Crockford-Base32 chars; the runId becomes + // `wrun_` + 26 chars = 31 chars total. + expect(mockEventsCreate).toHaveBeenCalledWith( + expect.stringMatching(/^wrun_[0-9A-HJKMNP-TV-Z]{26}$/), + expect.objectContaining({ eventType: 'run_created' }), + expect.any(Object) + ); + }); + }); }); diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index 6f20e81c2a..3d552e1bc6 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -20,9 +20,9 @@ import * as Attribute from '../telemetry/semantic-conventions.js'; import { serializeTraceCarrier, trace } from '../telemetry.js'; import { waitedUntil } from '../util.js'; import { version as workflowCoreVersion } from '../version.js'; +import { getWorldLazy } from './get-world-lazy.js'; import { getWorkflowQueueName } from './helpers.js'; import { Run } from './run.js'; -import { getWorldLazy } from './get-world-lazy.js'; /** ULID generator for client-side runId generation */ const ulid = monotonicFactory(); @@ -38,6 +38,18 @@ export interface StartOptionsBase { * The spec version to use for the workflow run. Defaults to the latest version. */ specVersion?: number; + + /** + * Optional region identifier for the new run. Currently consumed only + * by `@workflow/world-vercel`, which embeds the region into the tagged + * run ID and routes the initial workflow message to the matching + * regional queue. When omitted, the world falls back to its own + * default (for `world-vercel`: the `VERCEL_REGION` environment + * variable, then the `unknown` sentinel). + * + * Worlds without a regional dimension ignore this field. + */ + region?: string; } export interface StartOptionsWithDeploymentId extends StartOptionsBase { @@ -167,8 +179,17 @@ export async function start( const ops: Promise[] = []; // Generate runId client-side so we have it before serialization - // (required for future E2E encryption where runId is part of the encryption context) - const runId = `wrun_${ulid()}`; + // (required for future E2E encryption where runId is part of the + // encryption context). When the World provides a `createRunId()` + // implementation, use it so worlds can embed implementation-specific + // metadata (e.g., region) into the ID, forwarding the full options + // bag so worlds can read whichever fields they recognise; otherwise + // fall back to a standard monotonic ULID. + const runId = `wrun_${ + world.createRunId + ? world.createRunId(opts as Readonly>) + : ulid() + }`; // Serialize current trace context to propagate across queue boundary const traceCarrier = await serializeTraceCarrier(); @@ -251,6 +272,11 @@ export async function start( { deploymentId, specVersion, + // Forward any caller-supplied region hint so worlds with + // per-region queue routing (e.g. world-vercel) can target the + // matching queue. Worlds without a regional dimension ignore + // this field. + ...(opts.region !== undefined ? { region: opts.region } : {}), } ), ]); diff --git a/packages/world-vercel/package.json b/packages/world-vercel/package.json index 70d45afec1..fc413b20ef 100644 --- a/packages/world-vercel/package.json +++ b/packages/world-vercel/package.json @@ -20,6 +20,10 @@ ".": { "types": "./dist/index.d.ts", "default": "./dist/index.js" + }, + "./run-id": { + "types": "./dist/run-id/index.d.ts", + "default": "./dist/run-id/index.js" } }, "scripts": { @@ -35,6 +39,7 @@ "@workflow/errors": "workspace:*", "@workflow/world": "workspace:*", "cbor-x": "1.6.0", + "ulid": "catalog:", "undici": "catalog:", "zod": "catalog:" }, diff --git a/packages/world-vercel/src/create-run-id.test.ts b/packages/world-vercel/src/create-run-id.test.ts new file mode 100644 index 0000000000..2fa474b136 --- /dev/null +++ b/packages/world-vercel/src/create-run-id.test.ts @@ -0,0 +1,148 @@ +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { createRunId } from './create-run-id.js'; +import { decode } from './run-id/index.js'; +import { REGION_IDS } from './run-id/regions.js'; + +describe('createRunId', () => { + const originalRegion = process.env.VERCEL_REGION; + + afterEach(() => { + if (originalRegion === undefined) delete process.env.VERCEL_REGION; + else process.env.VERCEL_REGION = originalRegion; + }); + + describe('when VERCEL_REGION is a known region', () => { + beforeEach(() => { + process.env.VERCEL_REGION = 'iad1'; + }); + + it('returns a 26-character tagged ULID', () => { + const id = createRunId(); + expect(id).toHaveLength(26); + const decoded = decode(id); + expect(decoded.tagged).toBe(true); + }); + + it('embeds the resolved region ID and current version', () => { + const id = createRunId(); + const decoded = decode(id); + expect(decoded.regionId).toBe(REGION_IDS.iad1); + expect(decoded.region).toBe('iad1'); + expect(decoded.version).toBe(1); + }); + + it('is monotonically increasing within a process', () => { + const ids = Array.from({ length: 16 }, () => createRunId()); + const sorted = [...ids].sort(); + expect(sorted).toEqual(ids); + // And all unique. + expect(new Set(ids).size).toBe(ids.length); + }); + + it('remains monotonic even when 2048+ IDs are minted in the same ms', () => { + // 2^11 = 2048: enough calls to roll the entire 11-bit metadata + // window over and exercise the fallback-bump path in createRunId. + const ids = Array.from({ length: 4096 }, () => createRunId()); + for (let i = 1; i < ids.length; i++) { + expect(ids[i] > ids[i - 1]).toBe(true); + } + expect(new Set(ids).size).toBe(ids.length); + }); + + it('reflects later updates to process.env.VERCEL_REGION', () => { + const iad = createRunId(); + expect(decode(iad).region).toBe('iad1'); + process.env.VERCEL_REGION = 'fra1'; + const fra = createRunId(); + expect(decode(fra).region).toBe('fra1'); + }); + }); + + describe('when VERCEL_REGION is missing or unrecognised', () => { + it('falls back to the "unknown" region (id 0) when unset', () => { + delete process.env.VERCEL_REGION; + const decoded = decode(createRunId()); + expect(decoded.tagged).toBe(true); + expect(decoded.regionId).toBe(0); + expect(decoded.region).toBeNull(); + }); + + it('falls back to "unknown" when the env var is empty', () => { + process.env.VERCEL_REGION = ''; + const decoded = decode(createRunId()); + expect(decoded.regionId).toBe(0); + }); + + it('falls back to "unknown" for an unrecognised region code', () => { + process.env.VERCEL_REGION = 'xyz9'; + const decoded = decode(createRunId()); + expect(decoded.regionId).toBe(0); + }); + + it('does not treat the literal string "unknown" as a region', () => { + // Defensive: the REGION_IDS table contains an `unknown` key but it is + // a sentinel, not an actual region name. The env var should not be + // matched against it. + process.env.VERCEL_REGION = 'unknown'; + const decoded = decode(createRunId()); + expect(decoded.regionId).toBe(0); + expect(decoded.region).toBeNull(); + }); + }); + + describe('with an explicit `options.region`', () => { + it('prefers an explicit region over VERCEL_REGION', () => { + process.env.VERCEL_REGION = 'iad1'; + const decoded = decode(createRunId({ region: 'fra1' })); + expect(decoded.region).toBe('fra1'); + expect(decoded.regionId).toBe(REGION_IDS.fra1); + }); + + it('still falls back to VERCEL_REGION when options.region is missing', () => { + process.env.VERCEL_REGION = 'sfo1'; + const decoded = decode(createRunId({})); + expect(decoded.region).toBe('sfo1'); + }); + + it('falls back to VERCEL_REGION when options.region is an unrecognised string', () => { + process.env.VERCEL_REGION = 'sfo1'; + const decoded = decode(createRunId({ region: 'xyz9' })); + expect(decoded.region).toBe('sfo1'); + }); + + it('falls back to VERCEL_REGION when options.region is the empty string', () => { + process.env.VERCEL_REGION = 'sfo1'; + const decoded = decode(createRunId({ region: '' })); + expect(decoded.region).toBe('sfo1'); + }); + + it('ignores non-string region hints (no throw, fall back)', () => { + process.env.VERCEL_REGION = 'sfo1'; + const decoded = decode( + createRunId({ region: 42 as unknown as undefined }) + ); + expect(decoded.region).toBe('sfo1'); + }); + + it('ignores unrelated keys in the options bag', () => { + // start()'s opts object contains keys like `deploymentId`, + // `specVersion`, `world`, etc. — `createRunId` reads only the + // fields it recognises and ignores the rest. + process.env.VERCEL_REGION = 'iad1'; + const decoded = decode( + createRunId({ + deploymentId: 'dpl_test', + specVersion: 3, + unrelated: 'value', + }) + ); + expect(decoded.region).toBe('iad1'); + }); + + it('accepts an undefined options bag (matching the World.createRunId signature)', () => { + process.env.VERCEL_REGION = 'iad1'; + const decoded = decode(createRunId(undefined)); + expect(decoded.region).toBe('iad1'); + }); + }); +}); diff --git a/packages/world-vercel/src/create-run-id.ts b/packages/world-vercel/src/create-run-id.ts new file mode 100644 index 0000000000..c936d257da --- /dev/null +++ b/packages/world-vercel/src/create-run-id.ts @@ -0,0 +1,116 @@ +import { monotonicFactory } from 'ulid'; +import { bytesToUlid, ulidToBytes } from './run-id/codec.js'; +import { encode } from './run-id/index.js'; +import { REGION_IDS, type RegionCode } from './run-id/regions.js'; + +/** + * Underlying monotonic ULID factory. We post-process its output through + * {@link encode}, which overwrites the bottom 11 bits of randomness — so + * within the same millisecond, the monotonic factory's bottom-bit + * increments would be destroyed if we relied on them naïvely. We layer + * our own per-process monotonicity check on top (see {@link createRunId}). + */ +const ulid = monotonicFactory(); + +/** + * Last emitted run ID (the encoded/tagged form), used to enforce strict + * lexicographic monotonicity across calls within a single process even + * when many IDs are minted in the same millisecond. + */ +let lastRunId: string | undefined; + +/** + * Add `1 << 11` to the integer value of a 26-char tagged ULID — i.e. + * increment the bit immediately above the 11-bit metadata window. This + * lets us produce a strictly-larger ULID without disturbing the + * region/version metadata that lives in the bottom 11 bits. + * + * Throws if the ULID is at its maximum value (timestamp would overflow). + */ +function bumpAboveMetadata(ulidStr: string): string { + const bytes = ulidToBytes(ulidStr); + // 11-bit metadata occupies the low 3 bits of bytes[14] + all of bytes[15]. + // The next bit above is bit 3 of bytes[14]; adding 1 << 3 = 8 to bytes[14] + // and propagating the carry upward gives us the desired increment. + let i = 14; + let carry = 0x08; + while (i >= 0 && carry > 0) { + const sum = bytes[i] + carry; + bytes[i] = sum & 0xff; + carry = sum >> 8; + i--; + } + if (carry > 0) { + // 128-bit ULID space exhausted — astronomically unlikely. + throw new Error('ULID space exhausted'); + } + return bytesToUlid(bytes); +} + +/** + * Coerce an arbitrary value into a known {@link RegionCode}, returning + * `null` for anything that isn't a string matching a real region entry in + * {@link REGION_IDS} (the `'unknown'` sentinel is explicitly excluded so + * callers can't accidentally supply it). + */ +function coerceRegion(value: unknown): RegionCode | null { + if (typeof value !== 'string' || value === '' || value === 'unknown') { + return null; + } + return Object.hasOwn(REGION_IDS, value) ? (value as RegionCode) : null; +} + +/** + * Resolve the effective region for a run, preferring an explicit value + * supplied via the `start()` options bag over the `VERCEL_REGION` + * environment variable. Returns `null` when neither source yields a + * recognised region, in which case the run ID falls back to the `unknown` + * (0) region tag. + */ +function resolveRegion( + options: Readonly> | undefined +): RegionCode | null { + return ( + coerceRegion(options?.region) ?? coerceRegion(process.env.VERCEL_REGION) + ); +} + +/** + * `World.createRunId` implementation that mints region-tagged ULIDs. + * + * Region resolution order (first non-empty wins): + * 1. `options.region` — explicit caller-supplied region forwarded by + * `start({ region })`. + * 2. `process.env.VERCEL_REGION` — the region the current Vercel function + * is executing in. + * 3. Region ID 0 (`unknown`) — the resulting ULID is still tagged but + * does not claim a specific region. + * + * Monotonicity: because `encode` overwrites the bottom 11 bits of the + * ULID's randomness with region/version metadata, the underlying ULID + * factory's monotonic bottom-bit increments are destroyed within a single + * millisecond. We layer our own monotonicity guarantee on top by tracking + * the last emitted ID and bumping the candidate lexicographically until + * it is strictly greater. + */ +export function createRunId( + options?: Readonly> +): string { + const region = resolveRegion(options); + const regionId = region == null ? REGION_IDS.unknown : REGION_IDS[region]; + let candidate = encode(ulid(), regionId); + // Same-ms calls share a timestamp and the underlying monotonic factory's + // bottom-bit increments fall inside the metadata window, so the freshly + // encoded `candidate` may be `<=` the previous emission for the same + // region (or smaller still when the previous emission belonged to a + // higher-numbered region). Bump the candidate above the metadata bits + // until it strictly exceeds `lastRunId`, then re-stamp the requested + // region/version on top to keep metadata stable. + if (lastRunId !== undefined) { + while (candidate <= lastRunId) { + candidate = encode(bumpAboveMetadata(lastRunId), regionId); + } + } + lastRunId = candidate; + return candidate; +} diff --git a/packages/world-vercel/src/index.ts b/packages/world-vercel/src/index.ts index f15c480869..663df3f21d 100644 --- a/packages/world-vercel/src/index.ts +++ b/packages/world-vercel/src/index.ts @@ -1,5 +1,6 @@ import type { World } from '@workflow/world'; import { SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT } from '@workflow/world'; +import { createRunId } from './create-run-id.js'; import { createGetEncryptionKeyForRun } from './encryption.js'; import { instrumentObject } from './instrumentObject.js'; import { createQueue } from './queue.js'; @@ -8,6 +9,7 @@ import { createStorage } from './storage.js'; import { createStreamer } from './streamer.js'; import type { APIConfig } from './utils.js'; +export { createRunId } from './create-run-id.js'; export { createGetEncryptionKeyForRun, deriveRunKey, @@ -29,6 +31,7 @@ export function createVercelWorld(config?: APIConfig): World { ...createQueue(config), ...createStorage(config), ...instrumentObject('world.streams', createStreamer(config)), + createRunId, getEncryptionKeyForRun: createGetEncryptionKeyForRun( projectId, config?.projectConfig?.teamId, diff --git a/packages/world-vercel/src/queue.test.ts b/packages/world-vercel/src/queue.test.ts index 78b3630652..82977ec557 100644 --- a/packages/world-vercel/src/queue.test.ts +++ b/packages/world-vercel/src/queue.test.ts @@ -17,6 +17,7 @@ const { const mockSend = vi.fn(); const mockHandleCallback = vi.fn(); + // biome-ignore lint/complexity/useArrowFunction: needs to be `new`-callable const MockQueueClient = vi.fn().mockImplementation(function () { return { send: mockSend, @@ -736,4 +737,148 @@ describe('createQueue', () => { } }); }); + + describe('region routing', () => { + const originalDeploymentId = process.env.VERCEL_DEPLOYMENT_ID; + const originalRegion = process.env.VERCEL_REGION; + + beforeEach(() => { + process.env.VERCEL_DEPLOYMENT_ID = 'dpl_test'; + delete process.env.VERCEL_REGION; + mockSend.mockResolvedValue({ messageId: 'msg-123' }); + }); + + afterEach(() => { + if (originalDeploymentId !== undefined) { + process.env.VERCEL_DEPLOYMENT_ID = originalDeploymentId; + } else { + delete process.env.VERCEL_DEPLOYMENT_ID; + } + if (originalRegion !== undefined) { + process.env.VERCEL_REGION = originalRegion; + } else { + delete process.env.VERCEL_REGION; + } + }); + + it('uses an explicit `opts.region` override', async () => { + const queue = createQueue(); + await queue.queue( + '__wkf_workflow_test', + { runId: 'wrun_01ARZ3NDEKTSV4RRFFQ69G5FAV' }, + { region: 'fra1' } + ); + + // QueueClient is constructed twice: once at createQueue() time + // (without region, for the handler), and once per send (with region). + // The send-time construction is the one we care about. + const ctorCalls = ( + MockQueueClient as unknown as { mock: { calls: unknown[][] } } + ).mock.calls; + const sendTimeCall = ctorCalls[ctorCalls.length - 1][0] as { + region?: string; + }; + expect(sendTimeCall.region).toBe('fra1'); + }); + + it('extracts the region from a tagged workflow run ID payload', async () => { + // Build a tagged run ID for `sfo1` (regionId=2). We do this by + // calling encode() via the public sub-export so the test stays + // resilient to bit-layout changes. + const { encode } = await import('./run-id/index.js'); + const runId = `wrun_${encode('01ARZ3NDEKTSV4RRFFQ69G5FAV', 'sfo1')}`; + + const queue = createQueue(); + await queue.queue('__wkf_workflow_test', { runId }); + + const ctorCalls = ( + MockQueueClient as unknown as { mock: { calls: unknown[][] } } + ).mock.calls; + const sendTimeCall = ctorCalls[ctorCalls.length - 1][0] as { + region?: string; + }; + expect(sendTimeCall.region).toBe('sfo1'); + }); + + it('extracts the region from a tagged step payload workflowRunId', async () => { + const { encode } = await import('./run-id/index.js'); + const workflowRunId = `wrun_${encode('01ARZ3NDEKTSV4RRFFQ69G5FAV', 'pdx1')}`; + + const queue = createQueue(); + await queue.queue('__wkf_step_test', { + workflowName: 'wf', + workflowRunId, + workflowStartedAt: Date.now(), + stepId: 'step-1', + }); + + const ctorCalls = ( + MockQueueClient as unknown as { mock: { calls: unknown[][] } } + ).mock.calls; + const sendTimeCall = ctorCalls[ctorCalls.length - 1][0] as { + region?: string; + }; + expect(sendTimeCall.region).toBe('pdx1'); + }); + + it('falls back to VERCEL_REGION for un-tagged run IDs', async () => { + process.env.VERCEL_REGION = 'cle1'; + + const queue = createQueue(); + await queue.queue('__wkf_workflow_test', { runId: 'wrun_untagged' }); + + const ctorCalls = ( + MockQueueClient as unknown as { mock: { calls: unknown[][] } } + ).mock.calls; + const sendTimeCall = ctorCalls[ctorCalls.length - 1][0] as { + region?: string; + }; + expect(sendTimeCall.region).toBe('cle1'); + }); + + it('falls back to iad1 when neither tagging nor VERCEL_REGION is available', async () => { + const queue = createQueue(); + await queue.queue('__wkf_workflow_test', { runId: 'wrun_untagged' }); + + const ctorCalls = ( + MockQueueClient as unknown as { mock: { calls: unknown[][] } } + ).mock.calls; + const sendTimeCall = ctorCalls[ctorCalls.length - 1][0] as { + region?: string; + }; + expect(sendTimeCall.region).toBe('iad1'); + }); + + it('falls back to iad1 for health-check payloads (no runId)', async () => { + const queue = createQueue(); + await queue.queue('__wkf_workflow_test', { + __healthCheck: true, + correlationId: 'health-1', + }); + + const ctorCalls = ( + MockQueueClient as unknown as { mock: { calls: unknown[][] } } + ).mock.calls; + const sendTimeCall = ctorCalls[ctorCalls.length - 1][0] as { + region?: string; + }; + expect(sendTimeCall.region).toBe('iad1'); + }); + + it('prefers `opts.region` over a payload-derived region', async () => { + const { encode } = await import('./run-id/index.js'); + const runId = `wrun_${encode('01ARZ3NDEKTSV4RRFFQ69G5FAV', 'sfo1')}`; + + const queue = createQueue(); + await queue.queue('__wkf_workflow_test', { runId }, { region: 'fra1' }); + + const ctorCalls = ( + MockQueueClient as unknown as { mock: { calls: unknown[][] } } + ).mock.calls; + const sendTimeCall = ctorCalls[ctorCalls.length - 1][0] as { + region?: string; + }; + expect(sendTimeCall.region).toBe('fra1'); + }); + }); }); diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index 530062f677..160275e12e 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -11,9 +11,11 @@ import { SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT, ValidQueueName, } from '@workflow/world'; -import { decode, encode } from 'cbor-x'; +import { decode as cborDecode, encode as cborEncode } from 'cbor-x'; import { z } from 'zod/v4'; import { getDispatcher } from './http-client.js'; +import { decode as decodeTaggedRunId } from './run-id/index.js'; +import { REGION_IDS } from './run-id/regions.js'; import { type APIConfig, getHeaders, getHttpUrl } from './utils.js'; /** @@ -27,7 +29,7 @@ class CborTransport implements Transport { readonly contentType = 'application/cbor'; serialize(value: unknown): Buffer { - return Buffer.from(encode(value)); + return Buffer.from(cborEncode(value)); } async deserialize(stream: ReadableStream): Promise { @@ -38,7 +40,7 @@ class CborTransport implements Transport { if (done) break; if (value) chunks.push(value); } - return decode(Buffer.concat(chunks)); + return cborDecode(Buffer.concat(chunks)); } } @@ -74,7 +76,7 @@ class DualTransport implements Transport { readonly contentType = 'application/cbor'; serialize(value: unknown): Buffer { - return Buffer.from(encode(value)); + return Buffer.from(cborEncode(value)); } async deserialize(stream: ReadableStream): Promise { @@ -87,7 +89,7 @@ class DualTransport implements Transport { } const buffer = Buffer.concat(chunks); try { - return decode(buffer); + return cborDecode(buffer); } catch { return JSON.parse(buffer.toString()); } @@ -134,6 +136,71 @@ const MAX_DELAY_SECONDS = Number( process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS || 82800 // 23 hours - leave 1h buffer before 24h retention limit ); +/** + * Default region used when no explicit override, no tagged run ID, and no + * `VERCEL_REGION` env var are available. `iad1` preserves the historical + * behaviour from before per-message regional routing existed. + */ +const FALLBACK_REGION = 'iad1'; + +/** + * Extract the workflow run ID from a queue payload, returning `undefined` for + * payloads that don't carry one (e.g. health-check messages). + */ +function getRunIdFromPayload(payload: QueuePayload): string | undefined { + if ('runId' in payload && typeof payload.runId === 'string') { + return payload.runId; + } + if ('workflowRunId' in payload && typeof payload.workflowRunId === 'string') { + return payload.workflowRunId; + } + return undefined; +} + +/** + * Workflow run IDs are prefixed with `wrun_` before the underlying ULID. + * Strip that prefix so the payload can be fed to the tagged-ULID decoder. + */ +const RUN_ID_PREFIX = 'wrun_'; + +/** + * Decode the embedded region from a tagged workflow run ID, returning + * `undefined` if the value is not a tagged ULID or carries an unknown region. + */ +function regionFromTaggedRunId(runId: string | undefined): string | undefined { + if (!runId) return undefined; + const ulid = runId.startsWith(RUN_ID_PREFIX) + ? runId.slice(RUN_ID_PREFIX.length) + : runId; + try { + const decoded = decodeTaggedRunId(ulid); + if (!decoded.tagged) return undefined; + if (decoded.regionId === REGION_IDS.unknown) return undefined; + return decoded.region ?? undefined; + } catch { + return undefined; + } +} + +/** + * Resolve the region the message should be sent to, in order of preference: + * 1. Explicit `opts.region` override. + * 2. Region embedded in the payload's tagged run ID. + * 3. `VERCEL_REGION` environment variable. + * 4. {@link FALLBACK_REGION} (preserves pre-regional behaviour). + */ +function resolveTargetRegion( + payload: QueuePayload, + opts?: QueueOptions +): string { + if (opts?.region) return opts.region; + const fromRunId = regionFromTaggedRunId(getRunIdFromPayload(payload)); + if (fromRunId) return fromRunId; + const fromEnv = process.env.VERCEL_REGION; + if (fromEnv) return fromEnv; + return FALLBACK_REGION; +} + /** * Extract known identifiers from a queue payload and return them as VQS headers. * This ensures observability headers are always set without relying on callers. @@ -166,14 +233,16 @@ export function createQueue(config?: APIConfig): Queue { const { baseUrl, usingProxy } = getHttpUrl(config); const headers = getHeaders(config, { usingProxy }); - const region = 'iad1'; - const cborTransport = new CborTransport(); const jsonTransport = new JsonTransport(); const dualTransport = new DualTransport(); + /** + * Options common to every `QueueClient` instantiation. `region` is + * intentionally omitted here and added per-call by both `queue()` and the + * handler, since the value depends on the payload being sent / received. + */ const clientOptions = { - region, dispatcher: getDispatcher(), transport: dualTransport, ...(usingProxy && { @@ -207,8 +276,16 @@ export function createQueue(config?: APIConfig): Queue { SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT; const transport = useCbor ? cborTransport : jsonTransport; + // Resolve the destination region. Explicit `opts.region` wins, otherwise + // we decode it from the payload's tagged run ID so messages produced by + // `start()` land in the same region the run was created in. Falls back + // to the `VERCEL_REGION` env var, then `iad1` to preserve historical + // behaviour for legacy / untagged run IDs. + const region = resolveTargetRegion(payload, opts); + const client = new QueueClient({ ...clientOptions, + region, deploymentId, transport, }); diff --git a/packages/world-vercel/src/run-id/codec.test.ts b/packages/world-vercel/src/run-id/codec.test.ts new file mode 100644 index 0000000000..12e9333907 --- /dev/null +++ b/packages/world-vercel/src/run-id/codec.test.ts @@ -0,0 +1,157 @@ +import { describe, expect, it } from 'vitest'; +import { + bytesToUlid, + isTaggedString, + TAG_BIT_MASK, + ULID_BYTE_LENGTH, + ULID_LENGTH, + ulidToBytes, +} from './codec.js'; + +/** + * Reference ULID with all bytes = 0. Crockford encoding of 16 zero bytes is + * 26 '0' chars. + */ +const ZERO_ULID = '0'.repeat(ULID_LENGTH); + +/** + * Reference ULID with all bytes = 0xff. Crockford encoding of 16 0xff bytes + * is the 26-char value "7ZZZZZZZZZZZZZZZZZZZZZZZZZ" (the top char carries + * only 3 real bits, so its max is 7). + */ +const MAX_ULID = '7ZZZZZZZZZZZZZZZZZZZZZZZZZ'; + +describe('codec / ulidToBytes & bytesToUlid', () => { + it('round-trips the all-zero ULID', () => { + const bytes = ulidToBytes(ZERO_ULID); + expect(bytes).toEqual(new Uint8Array(ULID_BYTE_LENGTH)); + expect(bytesToUlid(bytes)).toBe(ZERO_ULID); + }); + + it('round-trips the all-ones ULID', () => { + const bytes = ulidToBytes(MAX_ULID); + expect(bytes).toEqual(new Uint8Array(ULID_BYTE_LENGTH).fill(0xff)); + expect(bytesToUlid(bytes)).toBe(MAX_ULID); + }); + + it('round-trips a typical ULID-shaped value', () => { + const ulid = '01ARZ3NDEKTSV4RRFFQ69G5FAV'; + const bytes = ulidToBytes(ulid); + expect(bytes).toHaveLength(ULID_BYTE_LENGTH); + // Sanity-check the byte-level decoding of this ULID-spec example string. + expect(Array.from(bytes)).toEqual([ + 0x01, 0x56, 0x3e, 0x3a, 0xb5, 0xd3, 0xd6, 0x76, 0x4c, 0x61, 0xef, 0xb9, + 0x93, 0x02, 0xbd, 0x5b, + ]); + expect(bytesToUlid(bytes)).toBe(ulid); + }); + + it('decodes lowercase Crockford characters and emits uppercase', () => { + const ulid = '01ARZ3NDEKTSV4RRFFQ69G5FAV'; + expect(bytesToUlid(ulidToBytes(ulid.toLowerCase()))).toBe(ulid); + }); + + it('rejects strings of the wrong length', () => { + expect(() => ulidToBytes('')).toThrow(/Invalid ULID length/); + expect(() => ulidToBytes('0'.repeat(25))).toThrow(/Invalid ULID length/); + expect(() => ulidToBytes('0'.repeat(27))).toThrow(/Invalid ULID length/); + }); + + it('rejects strings with invalid Crockford characters', () => { + // 'U' is invalid in Crockford Base32. + const bad = `U${ZERO_ULID.slice(1)}`; + expect(() => ulidToBytes(bad)).toThrow( + /Invalid Crockford-Base32 character/ + ); + // 'L' is also invalid in Crockford (replaced by '1'). + const bad2 = `L${ZERO_ULID.slice(1)}`; + expect(() => ulidToBytes(bad2)).toThrow( + /Invalid Crockford-Base32 character/ + ); + // Non-ASCII. + const bad3 = `\u00ff${ZERO_ULID.slice(1)}`; + expect(() => ulidToBytes(bad3)).toThrow( + /Invalid Crockford-Base32 character/ + ); + }); + + it("rejects ULIDs whose first character is > '7'", () => { + // '8' = 0b01000, which has the top of its 3 real bits set... wait, '8' + // has value 8 = 0b01000 in Crockford. The codec checks the top 2 pad + // bits (values[0] & 0x18). 8 & 0x18 = 0x08, which is nonzero. + const bad = `8${'0'.repeat(25)}`; + expect(() => ulidToBytes(bad)).toThrow(/top 2 bits must be zero/); + // 'Z' = 31 = 0b11111 → top 2 pad bits both set. + const bad2 = `Z${'0'.repeat(25)}`; + expect(() => ulidToBytes(bad2)).toThrow(/top 2 bits must be zero/); + }); + + it('throws on non-string inputs', () => { + expect(() => ulidToBytes(undefined as unknown as string)).toThrow( + TypeError + ); + expect(() => ulidToBytes(null as unknown as string)).toThrow(TypeError); + expect(() => ulidToBytes(123 as unknown as string)).toThrow(TypeError); + }); + + it('rejects wrong-length byte arrays', () => { + expect(() => bytesToUlid(new Uint8Array(15))).toThrow( + /Invalid byte length/ + ); + expect(() => bytesToUlid(new Uint8Array(17))).toThrow( + /Invalid byte length/ + ); + }); +}); + +describe('codec / isTaggedString', () => { + it('returns false for the zero ULID', () => { + expect(isTaggedString(ZERO_ULID)).toBe(false); + }); + + it('returns true for a ULID with the tag bit manually set', () => { + const bytes = new Uint8Array(ULID_BYTE_LENGTH); + bytes[0] = TAG_BIT_MASK; + const tagged = bytesToUlid(bytes); + expect(tagged).toBe('40000000000000000000000000'); + expect(isTaggedString(tagged)).toBe(true); + // First char of a value with byte[0] = 0x80 should be '4' (0b100). + expect(tagged[0]).toBe('4'); + }); + + it('returns true for any ULID whose first char is in [4..7]', () => { + expect(isTaggedString(`4${'0'.repeat(25)}`)).toBe(true); + expect(isTaggedString(`5${'0'.repeat(25)}`)).toBe(true); + expect(isTaggedString(`6${'0'.repeat(25)}`)).toBe(true); + expect(isTaggedString(`7${'Z'.repeat(25)}`)).toBe(true); + expect(isTaggedString(`0${'0'.repeat(25)}`)).toBe(false); + expect(isTaggedString(`3${'Z'.repeat(25)}`)).toBe(false); + }); + + it('returns false for non-strings, wrong lengths, and invalid chars', () => { + expect(isTaggedString('')).toBe(false); + expect(isTaggedString('0'.repeat(25))).toBe(false); + expect(isTaggedString(null)).toBe(false); + expect(isTaggedString(undefined)).toBe(false); + expect(isTaggedString(123)).toBe(false); + expect(isTaggedString({})).toBe(false); + // Invalid Crockford character at index 0. + expect(isTaggedString(`U${ZERO_ULID.slice(1)}`)).toBe(false); + }); + + it('rejects ULIDs with invalid Crockford characters after index 0', () => { + // First char '4' would otherwise set the tag bit, but the string is not + // a valid ULID because of the bad char further in. A naive + // implementation that only looked at the first char would incorrectly + // return true here. + expect(isTaggedString(`4${'U'.repeat(25)}`)).toBe(false); + expect(isTaggedString(`4${'0'.repeat(24)}L`)).toBe(false); + }); + + it("rejects ULIDs whose first char is > '7' (overflows 128 bits)", () => { + // First char '8'..'Z' has nonzero top 2 pad bits → not a valid ULID, + // regardless of whether the tag bit appears set. + expect(isTaggedString(`8${'0'.repeat(25)}`)).toBe(false); + expect(isTaggedString(`Z${'0'.repeat(25)}`)).toBe(false); + }); +}); diff --git a/packages/world-vercel/src/run-id/codec.ts b/packages/world-vercel/src/run-id/codec.ts new file mode 100644 index 0000000000..500cabc164 --- /dev/null +++ b/packages/world-vercel/src/run-id/codec.ts @@ -0,0 +1,172 @@ +/** + * Low-level bit / Crockford-Base32 plumbing for tagged ULIDs. + * + * A ULID is a 128-bit value rendered as 26 Crockford-Base32 characters. Since + * 26 * 5 = 130 bits, the encoded representation has 2 leading zero pad bits + * — i.e. the top 2 bits of the first character must always be 0. This means + * the first character of any valid ULID lies in the range `0`..`7`. + * + * The tagged-ULID layout (see ./regions.ts and ./index.ts for context): + * + * byte[0] bit 7 TAG bit (1 = tagged run ID) + * byte[14] bits 0..2 high 3 bits of `version` (5-bit field) + * byte[15] bits 6..7 low 2 bits of `version` + * byte[15] bits 0..5 `regionId` (6-bit field) + * + * Encode sets the tag bit on byte[0] and overwrites the 11 metadata bits in + * bytes[14..15]. Decode reads + clears only the tag bit, leaving the metadata + * bits intact in the returned "untagged" ULID (the bottom 11 randomness bits + * are sacrificed by design — they are the metadata). + */ + +// Crockford Base32 alphabet (matches the `ulid` spec). +const ENCODING = '0123456789ABCDEFGHJKMNPQRSTVWXYZ'; + +// Decode table: ASCII char code -> 5-bit value, or -1 if invalid. +const DECODE_TABLE: Int8Array = (() => { + const table = new Int8Array(128).fill(-1); + for (let i = 0; i < ENCODING.length; i++) { + table[ENCODING.charCodeAt(i)] = i; + } + // Crockford-Base32 case-insensitivity: also accept lowercase. + for (let i = 0; i < ENCODING.length; i++) { + const lower = ENCODING[i].toLowerCase(); + if (lower !== ENCODING[i]) { + table[lower.charCodeAt(0)] = i; + } + } + return table; +})(); + +export const ULID_LENGTH = 26; +export const ULID_BYTE_LENGTH = 16; + +/** Bit masks used by the tagged-ULID layout. */ +export const TAG_BIT_MASK = 0x80; // byte[0] bit 7 +export const REGION_MASK = 0x3f; // byte[15] bits 0..5 (6 bits) +export const VERSION_LOW_MASK = 0xc0; // byte[15] bits 6..7 (low 2 bits of version) +export const VERSION_HIGH_MASK = 0x07; // byte[14] bits 0..2 (high 3 bits of version) +export const VERSION_BIT_WIDTH = 5; +export const REGION_BIT_WIDTH = 6; +export const MAX_VERSION = (1 << VERSION_BIT_WIDTH) - 1; // 31 +export const MAX_REGION = (1 << REGION_BIT_WIDTH) - 1; // 63 + +/** + * Decode a 26-character Crockford-Base32 ULID string into 16 bytes. + * + * Throws if the string is not exactly 26 characters, contains an invalid + * Crockford character, or has nonzero top 2 pad bits (which would imply the + * value overflows 128 bits). + */ +export function ulidToBytes(ulid: string): Uint8Array { + if (typeof ulid !== 'string') { + throw new TypeError(`Expected ULID string, got ${typeof ulid}`); + } + if (ulid.length !== ULID_LENGTH) { + throw new Error( + `Invalid ULID length: expected ${ULID_LENGTH}, got ${ulid.length}` + ); + } + + // Validate and convert each char to its 5-bit value. + const values = new Uint8Array(ULID_LENGTH); + for (let i = 0; i < ULID_LENGTH; i++) { + const code = ulid.charCodeAt(i); + const v = code < 128 ? DECODE_TABLE[code] : -1; + if (v < 0) { + throw new Error( + `Invalid Crockford-Base32 character at index ${i}: ${JSON.stringify(ulid[i])}` + ); + } + values[i] = v; + } + + // The first character carries only 3 real bits (the top 2 must be zero pad). + if ((values[0] & 0x18) !== 0) { + throw new Error( + `Invalid ULID: top 2 bits must be zero (first char > '7'): ${JSON.stringify(ulid[0])}` + ); + } + + // Pack 26 * 5 = 130 bits, with the top 2 bits being zero, into 16 bytes. + // Stream the values MSB-first into a bit buffer. + const out = new Uint8Array(ULID_BYTE_LENGTH); + // Skip the 2 leading zero pad bits by starting the bit cursor at 2. + let bitBuf = values[0] & 0x07; + let bitCount = 3; + let outIdx = 0; + for (let i = 1; i < ULID_LENGTH; i++) { + bitBuf = (bitBuf << 5) | values[i]; + bitCount += 5; + while (bitCount >= 8) { + bitCount -= 8; + out[outIdx++] = (bitBuf >> bitCount) & 0xff; + } + } + // After consuming all 26 chars (130 bits) starting from a 3-bit prefix, + // bitCount should be exactly 0 and outIdx should be 16. + /* c8 ignore next 3 */ + if (outIdx !== ULID_BYTE_LENGTH || bitCount !== 0) { + throw new Error('Internal error: ULID bit packing did not consume cleanly'); + } + return out; +} + +/** + * Encode 16 bytes as a 26-character Crockford-Base32 ULID string. The output + * is always uppercase. + * + * Throws if `bytes.length !== 16`. + */ +export function bytesToUlid(bytes: Uint8Array): string { + if (bytes.length !== ULID_BYTE_LENGTH) { + throw new Error( + `Invalid byte length: expected ${ULID_BYTE_LENGTH}, got ${bytes.length}` + ); + } + + // Emit 26 chars from 128 bits, MSB-first, with 2 leading zero pad bits + // implicitly contributed by starting the bit buffer empty (bitCount = 0) + // and producing the first 5-bit chunk only after we've shifted in 3 real + // bits — i.e. we encode by appending bytes and pulling 5-bit groups off + // the top. + let bitBuf = 0; + let bitCount = 0; + // Pre-load 3 zero bits (i.e., start with bitCount = -2 conceptually). The + // simpler way: shift in 3 zero bits up front, so the first 5-bit chunk + // pulled out consists of those 3 zeros + the top 2 bits of byte[0]. + // Equivalently, treat the value as a 130-bit number with the top 2 bits = 0. + bitBuf = 0; + bitCount = 2; // 2 zero pad bits already "in" the buffer at the top + let out = ''; + for (let i = 0; i < ULID_BYTE_LENGTH; i++) { + bitBuf = (bitBuf << 8) | bytes[i]; + bitCount += 8; + while (bitCount >= 5) { + bitCount -= 5; + out += ENCODING[(bitBuf >> bitCount) & 0x1f]; + } + } + /* c8 ignore next 3 */ + if (out.length !== ULID_LENGTH || bitCount !== 0) { + throw new Error('Internal error: ULID bit packing did not flush cleanly'); + } + return out; +} + +/** + * Test whether `s` is a fully valid 26-character Crockford-Base32 ULID with + * the tag bit set. Returns `false` for any input that is not a string, has + * the wrong length, contains an invalid character, or has nonzero top 2 + * padding bits. + */ +export function isTaggedString(s: unknown): boolean { + if (typeof s !== 'string' || s.length !== ULID_LENGTH) return false; + let bytes: Uint8Array; + try { + bytes = ulidToBytes(s); + } catch { + return false; + } + return (bytes[0] & TAG_BIT_MASK) !== 0; +} diff --git a/packages/world-vercel/src/run-id/index.test.ts b/packages/world-vercel/src/run-id/index.test.ts new file mode 100644 index 0000000000..8e2428b9f1 --- /dev/null +++ b/packages/world-vercel/src/run-id/index.test.ts @@ -0,0 +1,288 @@ +import { describe, expect, it } from 'vitest'; +import { bytesToUlid, ULID_BYTE_LENGTH, ulidToBytes } from './codec.js'; +import { + CURRENT_VERSION, + decode, + encode, + isTagged, + MAX_REGION_ID, + MAX_VERSION, + REGION_IDS, + type RegionCode, + type RegionKey, +} from './index.js'; + +const SAMPLE_ULID = '01ARZ3NDEKTSV4RRFFQ69G5FAV'; + +describe('encode / decode round-trip', () => { + it('encodes with default version=1 and the iad1 region code', () => { + const tagged = encode(SAMPLE_ULID, 'iad1'); + expect(tagged).toBe('41ARZ3NDEKTSV4RRFFQ69G5E21'); + expect(tagged).toHaveLength(26); + expect(isTagged(tagged)).toBe(true); + + const decoded = decode(tagged); + expect(decoded).toEqual({ + tagged: true, + ulid: '01ARZ3NDEKTSV4RRFFQ69G5E21', + region: 'iad1', + regionId: REGION_IDS.iad1, + version: CURRENT_VERSION, + }); + }); + + it('accepts numeric region IDs', () => { + const tagged = encode(SAMPLE_ULID, 7); + expect(tagged).toBe('41ARZ3NDEKTSV4RRFFQ69G5E27'); + const decoded = decode(tagged); + expect(decoded.regionId).toBe(7); + expect(decoded.region).toBe('dub1'); + expect(decoded.ulid).toBe('01ARZ3NDEKTSV4RRFFQ69G5E27'); + }); + + it('returns region: null for unknown but in-range region IDs', () => { + const tagged = encode(SAMPLE_ULID, 63); + expect(tagged).toBe('41ARZ3NDEKTSV4RRFFQ69G5E3Z'); + const decoded = decode(tagged); + expect(decoded.regionId).toBe(63); + expect(decoded.region).toBeNull(); + }); + + it('encodes regionId=0 as the "unknown" sentinel', () => { + const tagged = encode(SAMPLE_ULID, 0); + expect(tagged).toBe('41ARZ3NDEKTSV4RRFFQ69G5E20'); + const decoded = decode(tagged); + expect(decoded.regionId).toBe(0); + expect(decoded.region).toBeNull(); + }); + + it('accepts an explicit version override', () => { + const tagged = encode(SAMPLE_ULID, 'iad1', { version: 0 }); + expect(tagged).toBe('41ARZ3NDEKTSV4RRFFQ69G5E01'); + expect(decode(tagged).version).toBe(0); + + const tagged2 = encode(SAMPLE_ULID, 'iad1', { version: MAX_VERSION }); + expect(tagged2).toBe('41ARZ3NDEKTSV4RRFFQ69G5FY1'); + expect(decode(tagged2).version).toBe(MAX_VERSION); + }); + + it('preserves all metadata bits across encode → decode → encode', () => { + for (const regionId of [0, 1, 17, 31, 32, 63]) { + for (const version of [0, 1, 7, 16, 31]) { + const tagged = encode(SAMPLE_ULID, regionId, { version }); + const decoded = decode(tagged); + expect(decoded.regionId).toBe(regionId); + expect(decoded.version).toBe(version); + // Re-encoding the cleared ULID with the same metadata must reproduce + // the same tagged string. + const reTagged = encode(decoded.ulid, regionId, { version }); + expect(reTagged).toBe(tagged); + } + } + }); + + it('clears only the tag bit in the decoded ULID', () => { + const tagged = encode(SAMPLE_ULID, 'fra1', { version: 5 }); + expect(tagged).toBe('41ARZ3NDEKTSV4RRFFQ69G5EAA'); + const decoded = decode(tagged); + expect(decoded.ulid).toBe('01ARZ3NDEKTSV4RRFFQ69G5EAA'); + + // The decoded ulid must NOT have the tag bit set. + expect(isTagged(decoded.ulid)).toBe(false); + + // The metadata bits in bytes 14..15 must be preserved (not zeroed). + const taggedBytes = ulidToBytes(tagged); + const decodedBytes = ulidToBytes(decoded.ulid); + expect(decodedBytes[14]).toBe(taggedBytes[14]); + expect(decodedBytes[15]).toBe(taggedBytes[15]); + + // And byte[0] differs only in the top bit. + expect(decodedBytes[0]).toBe(taggedBytes[0] & 0x7f); + }); + + it('overwrites the tag bit and metadata bits even if the input has them set', () => { + // Synthesize a ULID with byte[0] = 0x40 (some non-tag bits set) and + // garbage in the metadata bytes. + const bytes = new Uint8Array(ULID_BYTE_LENGTH); + bytes[0] = 0x40; + bytes[14] = 0xff; + bytes[15] = 0xff; + const dirty = bytesToUlid(bytes); + expect(dirty).toBe('20000000000000000000001ZZZ'); + + const tagged = encode(dirty, 'sfo1', { version: 3 }); + expect(tagged).toBe('60000000000000000000001Y62'); + const decoded = decode(tagged); + expect(decoded.region).toBe('sfo1'); + expect(decoded.regionId).toBe(REGION_IDS.sfo1); + expect(decoded.version).toBe(3); + expect(decoded.ulid).toBe('20000000000000000000001Y62'); + }); + + it('encode emits an uppercase result for lowercase Crockford input', () => { + const tagged = encode(SAMPLE_ULID.toLowerCase(), 'iad1'); + expect(tagged).toBe('41ARZ3NDEKTSV4RRFFQ69G5E21'); + expect(tagged).toBe(tagged.toUpperCase()); + }); + + it('encodes well-known boundary inputs to exact strings', () => { + // Zero ULID with zero metadata: only the tag bit is set, so byte[0] = 0x80. + // 0x80 → first 5-bit chunk (0b00100) → '4'; rest are all zero. + expect(encode('0'.repeat(26), 0, { version: 0 })).toBe( + '40000000000000000000000000' + ); + // Zero ULID with region=1, version=1: byte[15] = 0b01_000001 = 0x41, + // which encodes the last two chars as '21'. + expect(encode('0'.repeat(26), 1, { version: 1 })).toBe( + '40000000000000000000000021' + ); + // Zero ULID with max region (63) and max version (31): the last 11 bits + // are all-ones, spilling into bits 0..2 of byte[14] as well. + expect(encode('0'.repeat(26), 63, { version: 31 })).toBe( + '400000000000000000000001ZZ' + ); + // Max ULID with zero metadata: the metadata bits are forced to 0 even + // though the source had them set, demonstrating overwrite semantics. + expect(encode('7ZZZZZZZZZZZZZZZZZZZZZZZZZ', 0, { version: 0 })).toBe( + '7ZZZZZZZZZZZZZZZZZZZZZZY00' + ); + }); +}); + +describe('decode on un-tagged input', () => { + it('returns tagged: false for a plain ULID', () => { + const decoded = decode(SAMPLE_ULID); + expect(decoded.tagged).toBe(false); + // Decoded ulid equals input (already had tag bit cleared). + expect(decoded.ulid).toBe(SAMPLE_ULID); + }); + + it('still extracts whatever bits are in the metadata positions', () => { + // Plain ULID metadata bits are essentially random — just verify they + // round-trip self-consistently. + const decoded = decode(SAMPLE_ULID); + expect(decoded.regionId).toBeGreaterThanOrEqual(0); + expect(decoded.regionId).toBeLessThanOrEqual(MAX_REGION_ID); + expect(decoded.version).toBeGreaterThanOrEqual(0); + expect(decoded.version).toBeLessThanOrEqual(MAX_VERSION); + }); +}); + +describe('encode validation', () => { + it('rejects invalid ULID input', () => { + expect(() => encode('not-a-ulid', 'iad1')).toThrow(); + expect(() => encode('', 'iad1')).toThrow(/Invalid ULID length/); + expect(() => encode(SAMPLE_ULID.slice(1), 'iad1')).toThrow( + /Invalid ULID length/ + ); + }); + + it('rejects unknown region codes', () => { + expect(() => encode(SAMPLE_ULID, 'xxx1' as RegionCode)).toThrow( + /Unknown region/ + ); + }); + + it('rejects out-of-range numeric regions', () => { + expect(() => encode(SAMPLE_ULID, -1)).toThrow(RangeError); + expect(() => encode(SAMPLE_ULID, 64)).toThrow(RangeError); + expect(() => encode(SAMPLE_ULID, 1.5)).toThrow(RangeError); + expect(() => encode(SAMPLE_ULID, Number.NaN)).toThrow(RangeError); + }); + + it('rejects out-of-range versions', () => { + expect(() => encode(SAMPLE_ULID, 'iad1', { version: -1 })).toThrow( + RangeError + ); + expect(() => encode(SAMPLE_ULID, 'iad1', { version: 32 })).toThrow( + RangeError + ); + expect(() => encode(SAMPLE_ULID, 'iad1', { version: 1.5 })).toThrow( + RangeError + ); + }); +}); + +describe('region table coverage', () => { + it('covers all 21 known Vercel compute regions plus hel1/zrh1 + unknown', () => { + const expected: RegionKey[] = [ + 'unknown', + 'iad1', + 'sfo1', + 'pdx1', + 'cle1', + 'yul1', + 'gru1', + 'dub1', + 'lhr1', + 'cdg1', + 'fra1', + 'bru1', + 'arn1', + 'hel1', + 'zrh1', + 'cpt1', + 'dxb1', + 'bom1', + 'sin1', + 'hkg1', + 'hnd1', + 'icn1', + 'kix1', + 'syd1', + ]; + expect(Object.keys(REGION_IDS).sort()).toEqual([...expected].sort()); + }); + + it('assigns each region a unique ID in [0, 63]', () => { + const ids = Object.values(REGION_IDS); + expect(new Set(ids).size).toBe(ids.length); + for (const id of ids) { + expect(id).toBeGreaterThanOrEqual(0); + expect(id).toBeLessThanOrEqual(MAX_REGION_ID); + } + }); + + it('all known region codes round-trip through encode/decode', () => { + for (const key of Object.keys(REGION_IDS) as RegionKey[]) { + if (key === 'unknown') continue; + const code: RegionCode = key; + const tagged = encode(SAMPLE_ULID, code); + const decoded = decode(tagged); + expect(decoded.region).toBe(code); + expect(decoded.regionId).toBe(REGION_IDS[code]); + } + }); + + it('rejects the "unknown" sentinel string as a region code in encode', () => { + // encode(_, 'unknown') was previously silently accepted (resolving to + // regionId=0). It is now rejected at the type level and at runtime. + expect(() => encode(SAMPLE_ULID, 'unknown' as RegionCode)).toThrow( + /Unknown region/ + ); + }); +}); + +describe('lexicographic order', () => { + it('all tagged ULIDs sort above all untagged ULIDs', () => { + // Tag bit on byte[0] sets the first char to ≥ '4'. Plain ULIDs that + // haven't blown past year 2248 start with '0' or '1'. + const minTagged = encode('0'.repeat(26), 0, { version: 0 }); + expect(minTagged).toBe('40000000000000000000000000'); + expect(minTagged > '3'.repeat(26)).toBe(true); + }); + + it('two tagged ULIDs with the same metadata preserve input ordering when they differ above the metadata bits', () => { + // Pick two ULIDs differing in the timestamp (char[5]). The metadata bits + // (bottom 11 bits) get normalized to the same values, but earlier bits + // — including timestamp — are preserved verbatim apart from the tag bit. + const a = '01ARZ3NDEKTSV4RRFFQ69G5FAV'; + const b = '01ARZ3NDEMTSV4RRFFQ69G5FAV'; + expect(a < b).toBe(true); + const ta = encode(a, 'iad1'); + const tb = encode(b, 'iad1'); + expect(ta).toBe('41ARZ3NDEKTSV4RRFFQ69G5E21'); + expect(tb).toBe('41ARZ3NDEMTSV4RRFFQ69G5E21'); + expect(ta < tb).toBe(true); + }); +}); diff --git a/packages/world-vercel/src/run-id/index.ts b/packages/world-vercel/src/run-id/index.ts new file mode 100644 index 0000000000..dc8773c59a --- /dev/null +++ b/packages/world-vercel/src/run-id/index.ts @@ -0,0 +1,218 @@ +/** + * Region-tagged ULID encoding for Vercel workflow run IDs. + * + * A "tagged" run ID is a regular 26-character Crockford-Base32 ULID with: + * + * - **Tag bit**: the MSB of byte 0 (the most-significant bit of the 48-bit + * timestamp) is set to 1, distinguishing this scheme from a plain ULID. + * This shifts the first character into the range `4`..`7`. + * - **Version** (5 bits, 0–31): encoded into the bottom 11 bits of the + * 80-bit randomness section (specifically: high 3 bits of `version` go + * into the low 3 bits of byte 14, low 2 bits of `version` go into the + * high 2 bits of byte 15). + * - **Region ID** (6 bits, 0–63): encoded into the bottom 6 bits of byte 15. + * Region IDs are assigned in {@link REGION_IDS}. + * + * Net effect: 80 bits of ULID randomness become 69 bits (still ~5.9 × 10²⁰ + * distinct values per millisecond), and the maximum representable timestamp + * drops from year ~10895 down to year ~5429 — neither limit is practically + * relevant. + * + * Tagged ULIDs remain valid ULIDs (lexicographically sortable, monotonic when + * generated with a monotonic factory), so they can flow through any system + * that accepts ULIDs. + * + * @example + * ```ts + * import { monotonicFactory } from 'ulid'; + * import { encode, decode } from '@workflow/world-vercel/run-id'; + * + * const ulid = monotonicFactory(); + * const taggedRunId = encode(ulid(), 'iad1'); + * + * const { region, regionId, version } = decode(taggedRunId); + * // region === 'iad1', regionId === 1, version === 1, tagged === true + * ``` + * + * @packageDocumentation + */ + +import { + bytesToUlid, + isTaggedString, + MAX_REGION, + MAX_VERSION, + REGION_MASK, + TAG_BIT_MASK, + ulidToBytes, + VERSION_HIGH_MASK, + VERSION_LOW_MASK, +} from './codec.js'; +import { lookupRegion, REGION_IDS, type RegionCode } from './regions.js'; + +export { + lookupRegion, + REGION_IDS, + type RegionCode, + type RegionId, + type RegionKey, + regionIdFor, +} from './regions.js'; + +/** Encoding format version currently emitted by {@link encode}. */ +export const CURRENT_VERSION = 1; + +export interface EncodeOptions { + /** + * Encoding format version to embed. Must be in the range 0..31. Defaults to + * {@link CURRENT_VERSION} (1). Version 0 is reserved as a sentinel meaning + * "no metadata encoded" — callers should not normally emit it. + */ + version?: number; +} + +export interface DecodedRunId { + /** + * Whether the input had the tag bit set. If `false`, the {@link regionId} + * and {@link version} fields will still be populated by reading the same + * bit positions, but callers should generally ignore them as they will be + * meaningless for un-tagged ULIDs. + */ + tagged: boolean; + /** + * The input ULID with **only the tag bit cleared**. The 11 encoded bits in + * bytes 14–15 are preserved verbatim. For un-tagged input this equals the + * input string (uppercased). + */ + ulid: string; + /** Encoded format version (0..31). */ + version: number; + /** Encoded region ID (0..63). 0 represents "unknown". */ + regionId: number; + /** + * Region code (e.g. `'iad1'`) when {@link regionId} matches a known entry + * in {@link REGION_IDS}, else `null`. + */ + region: RegionCode | null; +} + +function isRegionCode(value: unknown): value is RegionCode { + return ( + typeof value === 'string' && + value !== 'unknown' && + Object.hasOwn(REGION_IDS, value) + ); +} + +/** + * Encode a region ID and version into a ULID, producing a 26-character + * "tagged" ULID. The input ULID's bottom 11 randomness bits and top + * (timestamp MSB) bit are overwritten. + * + * @param ulid - A valid 26-character Crockford-Base32 ULID. + * @param region - Either a numeric region ID (0..63) or a known + * {@link RegionCode} (e.g. `'iad1'`). + * @param options - See {@link EncodeOptions}. + * @returns The tagged ULID, always uppercase. + * + * @throws If `ulid` is not a valid ULID string, if `region` is an unknown + * region code, if a numeric `region` is outside 0..63, or if + * `options.version` is outside 0..31. + */ +export function encode( + ulid: string, + region: number | RegionCode, + options: EncodeOptions = {} +): string { + // Resolve region → numeric ID. + let regionId: number; + if (typeof region === 'number') { + if (!Number.isInteger(region) || region < 0 || region > MAX_REGION) { + throw new RangeError( + `regionId must be an integer in [0, ${MAX_REGION}]; got ${region}` + ); + } + regionId = region; + } else if (isRegionCode(region)) { + regionId = REGION_IDS[region]; + } else { + throw new Error(`Unknown region: ${String(region)}`); + } + + const version = options.version ?? CURRENT_VERSION; + if (!Number.isInteger(version) || version < 0 || version > MAX_VERSION) { + throw new RangeError( + `version must be an integer in [0, ${MAX_VERSION}]; got ${version}` + ); + } + + const bytes = ulidToBytes(ulid); + + // Set the tag bit. + bytes[0] = bytes[0] | TAG_BIT_MASK; + + // Pack version (5 bits): high 3 bits → byte[14] low 3 bits; + // low 2 bits → byte[15] high 2 bits. + const versionHigh = (version >> 2) & VERSION_HIGH_MASK; // 3 bits + const versionLow = (version & 0x03) << 6; // 2 bits placed at bits 6..7 + + bytes[14] = (bytes[14] & ~VERSION_HIGH_MASK) | versionHigh; + bytes[15] = + (bytes[15] & ~(VERSION_LOW_MASK | REGION_MASK)) | + versionLow | + (regionId & REGION_MASK); + + return bytesToUlid(bytes); +} + +/** + * Decode a (possibly) tagged ULID. Always succeeds for any syntactically + * valid ULID; check {@link DecodedRunId.tagged} to determine whether the + * input was actually tagged by this scheme. + * + * The returned {@link DecodedRunId.ulid} has only the tag bit cleared — the + * 11 metadata bits remain in place, so `decode(encode(u, r)).ulid` is *not* + * byte-identical to `u` (the bottom 11 randomness bits of `u` were destroyed + * by `encode`), but `decode(encode(u, r)).ulid` is byte-identical to + * `decode(encode(decode(encode(u, r)).ulid, r)).ulid`. + * + * @throws If the input is not a syntactically valid 26-character + * Crockford-Base32 ULID. + */ +export function decode(taggedUlid: string): DecodedRunId { + const bytes = ulidToBytes(taggedUlid); + const tagged = (bytes[0] & TAG_BIT_MASK) !== 0; + + const regionId = bytes[15] & REGION_MASK; + const version = + ((bytes[14] & VERSION_HIGH_MASK) << 2) | + ((bytes[15] & VERSION_LOW_MASK) >> 6); + + // Clear the tag bit for the returned "untagged" ULID. + bytes[0] = bytes[0] & ~TAG_BIT_MASK; + const ulid = bytesToUlid(bytes); + + return { + tagged, + ulid, + version, + regionId, + region: lookupRegion(regionId), + }; +} + +/** + * Returns `true` if `value` is a 26-character Crockford-Base32 ULID with the + * tag bit set (i.e. was produced by {@link encode}). Returns `false` for any + * input that is not a syntactically valid ULID, including non-strings. + * + * The parameter is typed as `unknown` so this function can safely be used as + * a guard on untrusted input without requiring callers to cast. + */ +export function isTagged(value: unknown): boolean { + return isTaggedString(value); +} + +// Re-export internal constants that may be useful for callers wanting to +// reason about the encoding's bit budget without importing from a deep path. +export { MAX_REGION as MAX_REGION_ID, MAX_VERSION } from './codec.js'; diff --git a/packages/world-vercel/src/run-id/regions.ts b/packages/world-vercel/src/run-id/regions.ts new file mode 100644 index 0000000000..5be8de74c9 --- /dev/null +++ b/packages/world-vercel/src/run-id/regions.ts @@ -0,0 +1,87 @@ +/** + * Stable mapping between Vercel compute region codes (e.g. `iad1`) and the + * 6-bit region IDs encoded into tagged workflow run IDs. + * + * **DO NOT REORDER OR REUSE IDS.** Once a region has been assigned an ID, that + * ID is part of the on-the-wire encoding of every run ID ever issued for that + * region. New regions must be appended with the next unused ID. + * + * `0` is reserved for "unknown" — encode functions may emit it when the + * caller's region cannot be determined, and decode will surface it as + * `region: null`. + * + * The list below covers the 21 currently-deployed Vercel compute regions plus + * `hel1` and `zrh1`, which are reserved for future rollout so they can be + * assigned without requiring a version bump. + */ +export const REGION_IDS = { + unknown: 0, + iad1: 1, + sfo1: 2, + pdx1: 3, + cle1: 4, + yul1: 5, + gru1: 6, + dub1: 7, + lhr1: 8, + cdg1: 9, + fra1: 10, + bru1: 11, + arn1: 12, + hel1: 13, + zrh1: 14, + cpt1: 15, + dxb1: 16, + bom1: 17, + sin1: 18, + hkg1: 19, + hnd1: 20, + icn1: 21, + kix1: 22, + syd1: 23, +} as const; + +/** + * Any key in {@link REGION_IDS}, including the `'unknown'` sentinel. Not + * usually what callers want — see {@link RegionCode} for the "known region" + * subset. + */ +export type RegionKey = keyof typeof REGION_IDS; + +/** + * A concrete Vercel compute region code (e.g. `'iad1'`, `'fra1'`). Excludes + * the `'unknown'` sentinel since it does not correspond to any real region. + */ +export type RegionCode = Exclude; + +export type RegionId = (typeof REGION_IDS)[RegionKey]; + +/** + * Reverse map: numeric region ID → region code. Only populated for known + * regions (i.e. excludes the `unknown`/0 sentinel); {@link lookupRegion} + * returns `null` for any ID not present in this map. + */ +const REGION_CODES_BY_ID: ReadonlyMap = new Map( + (Object.entries(REGION_IDS) as Array<[RegionKey, number]>) + .filter((entry): entry is [RegionCode, number] => entry[0] !== 'unknown') + .map(([code, id]) => [id, code]) +); + +/** + * Look up a region code by ID. Returns `null` for IDs not in {@link REGION_IDS} + * and for the `unknown`/0 sentinel. + */ +export function lookupRegion(regionId: number): RegionCode | null { + return REGION_CODES_BY_ID.get(regionId) ?? null; +} + +/** + * Look up a numeric region ID by code. Throws if the code is not recognized. + */ +export function regionIdFor(code: RegionCode): RegionId { + const id = REGION_IDS[code]; + if (id === undefined) { + throw new Error(`Unknown Vercel region code: ${String(code)}`); + } + return id; +} diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index ce8a5d2b94..47af4b510d 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -304,4 +304,26 @@ export interface World extends Queue, Streamer, Storage { runId: string, context?: Record ): Promise; + + /** + * Mint a new workflow run ID. + * + * Called by `start()` to generate the unique ID for a newly-created run. + * The returned value is the "bare" ID (without any `wrun_` prefix); the + * core attaches the prefix. + * + * Implementations are free to embed world-specific metadata in the ID + * (e.g., a region identifier) as long as the returned string remains a + * valid ULID. When omitted, `start()` falls back to generating a standard + * monotonic ULID. + * + * @param options - The full options bag passed to `start()` (typed as + * `Record` here to avoid a circular dependency with + * `@workflow/core`). Worlds should read only the fields they + * recognise — for example, `@workflow/world-vercel` reads + * `options.region` to embed a region identifier. Unrecognised keys + * must be ignored. When `start()` was called with no options, this is + * `undefined`. + */ + createRunId?(options?: Readonly>): string; } diff --git a/packages/world/src/queue.ts b/packages/world/src/queue.ts index 1637163200..68915c86d2 100644 --- a/packages/world/src/queue.ts +++ b/packages/world/src/queue.ts @@ -88,6 +88,17 @@ export interface QueueOptions { delaySeconds?: number; /** Spec version of the target run. Used to select the queue transport format. */ specVersion?: number; + /** + * World-specific routing hint identifying the region the message should + * be sent to (e.g. a Vercel compute region code such as `'iad1'`). + * + * Worlds that don't have a regional dimension ignore this field. For + * `@workflow/world-vercel`, this overrides the region the underlying + * `@vercel/queue` client uses to route the message; when omitted, the + * region is resolved from the payload's tagged run ID, then from the + * `VERCEL_REGION` environment variable. + */ + region?: string; } export interface Queue { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8640dfe50b..40b1ec878d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1482,6 +1482,9 @@ importers: cbor-x: specifier: 1.6.0 version: 1.6.0 + ulid: + specifier: 'catalog:' + version: 3.0.1 undici: specifier: 'catalog:' version: 7.22.0