Skip to content

Commit 7aa871f

Browse files
authored
feat(webapp): plan-aware compute migration (#3957)
Adds an opt-in mechanism to route a configurable percentage of organizations onto the compute (MicroVM) backing of their region at trigger time, without changing their stored region settings. Routing is gated by three global feature flags - `computeMigrationEnabled`, `computeMigrationFreePercentage`, `computeMigrationPaidPercentage` - plus a per-org `computeMigrationEnabled` override that wins in both directions. A region's compute backing is resolved from a new `WorkerInstanceGroup.region` column: a container group and its MicroVM group share one geo `region`, so the migration swaps the resolved worker queue to the backing group's queue. Orgs are bucketed deterministically by id, so ramping a percentage down keeps a strict subset rather than reshuffling, and a region with no compute backing is never touched. Everything is off by default - behaviour is unchanged unless the flags are set. The flags and the worker-region groups are read on the trigger hot path from in-memory snapshots rather than the database: a small `createReloadingRegistry` helper loads each at startup and refreshes them on an interval, so no per-trigger query is added and a percentage or kill-switch change propagates within the reload interval. A cold replica whose snapshot hasn't loaded yet reads as not-migrated (the container path) and self-corrects on the next load - the same cold-start contract as the datastore / LLM-pricing registries, with a `reloading_registry_loaded` metric so a never-loaded registry is alertable. The same migration decision is consulted at deploy-time template creation so a migrated org gets a compute template built ahead of its first run. This runs in shadow mode (best-effort, never fails the deploy) by default, or - when the `computeMigrationRequireTemplate` flag is on - in required mode, built synchronously at deploy so the first run never builds on-demand and template errors surface at deploy time. So operators keep "which runs ran where" while customers only see geography: the run's actual worker queue is stored raw, and the geo region is stamped separately on `TaskRun.region` (and a new ClickHouse `region` column) at trigger time. Read surfaces - the dashboard, the API, and the Query/Logs page - show the geo region, falling back to the worker queue for runs written before the column existed. Minor follow-ups left out of scope: the percentage flags render as text inputs on the admin flags page (the catalog UI has no numeric control type yet), and `createReloadingRegistry` could later gain pub/sub for sub-second cross-replica propagation if the reload interval proves too slow.
1 parent 0c839e8 commit 7aa871f

35 files changed

Lines changed: 785 additions & 22 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Gradually roll out a new run execution backend to a configurable percentage of organizations.

apps/webapp/app/entry.server.tsx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ import { registerRunChangeNotifierHandlers } from "./services/realtime/runChange
4343
import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server";
4444
(globalThis as Record<string, unknown>).__sessionsReplicationInstance =
4545
sessionsReplicationInstance;
46+
import { globalFlagsRegistry } from "./v3/globalFlagsRegistry.server";
47+
(globalThis as Record<string, unknown>).__globalFlagsRegistry = globalFlagsRegistry;
48+
import { workerRegionRegistry } from "./v3/workerRegions.server";
49+
(globalThis as Record<string, unknown>).__workerRegionRegistry = workerRegionRegistry;
4650

4751
const ABORT_DELAY = 30000;
4852

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ const EnvironmentSchema = z
158158
WORKER_SCHEMA: z.string().default("graphile_worker"),
159159
WORKER_CONCURRENCY: z.coerce.number().int().default(10),
160160
WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
161+
// How often each replica reloads the global flags snapshot from the DB.
162+
// Sets kill/ramp propagation latency.
163+
GLOBAL_FLAGS_RELOAD_INTERVAL_MS: z.coerce.number().int().min(1000).default(5000),
161164
WORKER_ENABLED: z.string().default("true"),
162165
GRACEFUL_SHUTDOWN_TIMEOUT: z.coerce.number().int().default(60000),
163166
DISABLE_SSE: z.string().optional(),

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/databa
1515
import assertNever from "assert-never";
1616
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
1717
import { $replica, prisma } from "~/db.server";
18-
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
18+
import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server";
1919
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
2020
import {
2121
findRunByIdWithMollifierFallback,
@@ -49,6 +49,7 @@ const commonRunSelect = {
4949
depth: true,
5050
scheduleId: true,
5151
workerQueue: true,
52+
region: true,
5253
lockedToVersion: {
5354
select: {
5455
version: true,
@@ -520,7 +521,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
520521
triggerFunction: resolveTriggerFunction(run),
521522
batchId: run.batch?.friendlyId,
522523
metadata,
523-
region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined,
524+
region: regionForDisplay(run.region, run.workerQueue),
524525
};
525526
}
526527

@@ -684,6 +685,7 @@ export function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
684685
// API response's `region` to undefined instead of advertising a
685686
// misleading "main" region for a not-yet-assigned buffered run).
686687
workerQueue: buffered.workerQueue ?? "",
688+
region: buffered.region ?? "",
687689
parentTaskRun: null,
688690
rootTaskRun: null,
689691
childRuns: [],

apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { timeFilters } from "~/components/runs/v3/SharedFilters";
1111
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
1212
import { getTaskIdentifiers } from "~/models/task.server";
1313
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
14-
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
14+
import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server";
1515
import { machinePresetFromRun } from "~/v3/machinePresets.server";
1616
import { ServiceValidationError } from "~/v3/services/baseService.server";
1717
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";
@@ -260,7 +260,7 @@ export class NextRunListPresenter {
260260
name: run.queue.replace("task/", ""),
261261
type: run.queue.startsWith("task/") ? "task" : "custom",
262262
},
263-
region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined,
263+
region: regionForDisplay(run.region, run.workerQueue),
264264
taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD",
265265
};
266266
}),

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,11 +303,17 @@ export class SpanPresenter extends BasePresenter {
303303
location: true,
304304
},
305305
where: {
306+
// masterQueue is unique and IS the run's backing queue, so this finds
307+
// the group the run actually ran on.
306308
masterQueue: baseWorkerQueue(run.workerQueue),
307309
},
308310
});
309311

310-
region = workerGroup ?? null;
312+
// Show the stamped geo region as the name so a migrated run never reveals
313+
// its compute backing; fall back to the group name for unstamped runs.
314+
region = workerGroup
315+
? { name: run.region ?? workerGroup.name, location: workerGroup.location }
316+
: null;
311317
}
312318

313319
// Only AGENT-tagged runs (chat.agent and friends) can be session-bound,
@@ -513,6 +519,7 @@ export class SpanPresenter extends BasePresenter {
513519
},
514520
engine: true,
515521
workerQueue: true,
522+
region: true,
516523
error: true,
517524
output: true,
518525
outputType: true,

apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import {
1818
type SyntheticReplayTaskRun,
1919
} from "~/v3/mollifier/syntheticReplayTaskRun.server";
2020
import parseDuration from "parse-duration";
21-
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
21+
import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server";
2222
import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server";
2323
import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server";
2424
import { ReplayRunData } from "~/v3/replayTask";
@@ -52,6 +52,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
5252
maxDurationInSeconds: true,
5353
machinePreset: true,
5454
workerQueue: true,
55+
region: true,
5556
ttl: true,
5657
idempotencyKey: true,
5758
runTags: true,
@@ -163,6 +164,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
163164
maxDurationInSeconds: buffered.maxDurationInSeconds ?? null,
164165
machinePreset: buffered.machinePreset ?? null,
165166
workerQueue: buffered.workerQueue ?? null,
167+
region: buffered.region ?? null,
166168
ttl: buffered.ttl ?? null,
167169
idempotencyKey: buffered.idempotencyKey ?? null,
168170
runTags: buffered.runTags,
@@ -210,7 +212,10 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
210212
maxAttempts: run.maxAttempts,
211213
maxDurationSeconds: run.maxDurationInSeconds,
212214
machinePreset: run.machinePreset,
213-
region: environment.type === "DEVELOPMENT" ? undefined : baseWorkerQueue(run.workerQueue),
215+
region:
216+
environment.type === "DEVELOPMENT"
217+
? undefined
218+
: regionForDisplay(run.region, run.workerQueue),
214219
regions: regionsResult.regions,
215220
ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined,
216221
idempotencyKey: run.idempotencyKey,
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import { hashBucket } from "~/utils/computeBucket";
2+
3+
/** Subset of the global flags snapshot this resolver reads. */
4+
export type ComputeMigrationFlags = {
5+
computeMigrationEnabled?: boolean;
6+
computeMigrationFreePercentage?: number;
7+
computeMigrationPaidPercentage?: number;
8+
};
9+
10+
type MigrationDecisionInput = {
11+
planType: string | undefined;
12+
orgId: string;
13+
orgFeatureFlags: Record<string, unknown> | null | undefined;
14+
flags: ComputeMigrationFlags | undefined;
15+
};
16+
17+
/**
18+
* Whether this org should run on the compute backing. Shared by the trigger-time
19+
* transform and the deploy-time template decision so a migrated org always gets a
20+
* compute template. Precedence: per-org override (both directions) wins; otherwise
21+
* global enable + the plan's percentage bucket. Enterprise and unknown plans are
22+
* never enrolled by percentage (override only). The sole opt-out is the per-org
23+
* `computeMigrationEnabled: false`.
24+
*/
25+
export function isOrgMigrated({
26+
planType,
27+
orgId,
28+
orgFeatureFlags,
29+
flags,
30+
}: MigrationDecisionInput): boolean {
31+
const override = orgFeatureFlags?.["computeMigrationEnabled"];
32+
if (override === false) return false;
33+
if (override === true) return true;
34+
35+
if (!(flags?.computeMigrationEnabled ?? false)) return false;
36+
37+
const pct =
38+
planType === "free"
39+
? flags?.computeMigrationFreePercentage ?? 0
40+
: planType === "paid"
41+
? flags?.computeMigrationPaidPercentage ?? 0
42+
: 0; // enterprise / undefined
43+
44+
return hashBucket(orgId) < pct;
45+
}
46+
47+
type ResolveInput = MigrationDecisionInput & {
48+
baseWorkerQueue: string | undefined;
49+
baseEnableFastPath: boolean;
50+
region: string | undefined; // geo of the base queue (same whether migrated or not)
51+
backing: { workerQueue: string; enableFastPath: boolean } | undefined;
52+
envType: string;
53+
};
54+
55+
/**
56+
* Produce the target descriptor `{ workerQueue, region, enableFastPath }` for a
57+
* run. When the org is migrated and the region has a compute backing, the queue
58+
* and fast-path setting come from the MICROVM backing group; `region` is the geo
59+
* either way. Same-geo swap (us-east-1 -> us-east-1-next): any explicit placement
60+
* is a geography preference, honored by staying in-region. Applied after region
61+
* resolution, mirroring the scheduled-split.
62+
*/
63+
export function resolveComputeMigration({
64+
baseWorkerQueue,
65+
baseEnableFastPath,
66+
region,
67+
backing,
68+
envType,
69+
...decision
70+
}: ResolveInput): { workerQueue: string | undefined; region: string | undefined; enableFastPath: boolean } {
71+
const passthrough = { workerQueue: baseWorkerQueue, region, enableFastPath: baseEnableFastPath };
72+
if (baseWorkerQueue === undefined) return passthrough;
73+
if (envType === "DEVELOPMENT") return passthrough;
74+
if (!isOrgMigrated(decision)) return passthrough;
75+
if (!backing) return passthrough;
76+
return { workerQueue: backing.workerQueue, region, enableFastPath: backing.enableFastPath };
77+
}

apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,19 @@ export function baseWorkerQueue(workerQueue: string | null | undefined): string
3333
return colon === -1 ? workerQueue : workerQueue.slice(0, colon);
3434
}
3535

36+
/**
37+
* User-facing region for read surfaces: the explicit geo region if set, else the
38+
* region derived from the worker queue, else undefined. Use everywhere a run's
39+
* region is displayed so an empty queue never surfaces as `""` and all surfaces
40+
* agree. Not for query keys — those want the raw worker queue, not this fallback.
41+
*/
42+
export function regionForDisplay(
43+
region: string | null | undefined,
44+
workerQueue: string | null | undefined
45+
): string | undefined {
46+
return region || (workerQueue ? baseWorkerQueue(workerQueue) : undefined);
47+
}
48+
3649
/** `TriggerSource` value used for runs originating from a schedule. */
3750
const SCHEDULE_TRIGGER_SOURCE = "schedule";
3851

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ import {
3838
resolveScheduledQueueSplitEnabled,
3939
workerQueueForRun,
4040
} from "../concerns/workerQueueSplit.server";
41+
import { resolveComputeMigration } from "../concerns/computeMigration.server";
42+
import { workerRegionRegistry, backingForQueue, regionForQueue } from "~/v3/workerRegions.server";
43+
import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server";
4144
import {
4245
publishClaim as publishMollifierClaim,
4346
releaseClaim as releaseMollifierClaim,
@@ -358,6 +361,24 @@ export class RunEngineTriggerTaskService {
358361
const baseWorkerQueue = workerQueueResult?.masterQueue;
359362
const enableFastPath = workerQueueResult?.enableFastPath ?? false;
360363

364+
// Rewrite the region to its compute backing for migration-enrolled orgs,
365+
// from the in-memory snapshots (no DB query). A cold read (registry not yet
366+
// loaded) returns undefined/[] and the resolver falls back to not-migrated.
367+
const workerGroups = workerRegionRegistry.current() ?? [];
368+
const region = baseWorkerQueue ? regionForQueue(baseWorkerQueue, workerGroups) : undefined;
369+
const backing = baseWorkerQueue ? backingForQueue(baseWorkerQueue, workerGroups) : undefined;
370+
const migrated = resolveComputeMigration({
371+
baseWorkerQueue,
372+
baseEnableFastPath: enableFastPath,
373+
region,
374+
backing,
375+
planType,
376+
orgId: environment.organization.id,
377+
orgFeatureFlags: environment.organization.featureFlags as Record<string, unknown> | null,
378+
flags: globalFlagsRegistry.current(),
379+
envType: environment.type,
380+
});
381+
361382
// Build annotations for this run
362383
const triggerSource = options.triggerSource ?? "api";
363384
const triggerAction = options.triggerAction ?? "trigger";
@@ -386,13 +407,13 @@ export class RunEngineTriggerTaskService {
386407
globalDefault: env.TRIGGER_WORKER_QUEUE_SCHEDULED_SPLIT_ENABLED === "1",
387408
});
388409
const workerQueue =
389-
baseWorkerQueue !== undefined
410+
migrated.workerQueue !== undefined
390411
? workerQueueForRun({
391-
workerQueue: baseWorkerQueue,
412+
workerQueue: migrated.workerQueue,
392413
rootTriggerSource: annotations.rootTriggerSource,
393414
splitEnabled: scheduledQueueSplitEnabled,
394415
})
395-
: baseWorkerQueue;
416+
: migrated.workerQueue;
396417

397418
try {
398419
return await this.traceEventConcern.traceRun(
@@ -491,7 +512,8 @@ export class RunEngineTriggerTaskService {
491512
queueName,
492513
lockedQueueId,
493514
workerQueue,
494-
enableFastPath,
515+
region: migrated.region,
516+
enableFastPath: migrated.enableFastPath,
495517
lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined,
496518
delayUntil,
497519
ttl,
@@ -569,7 +591,8 @@ export class RunEngineTriggerTaskService {
569591
queueName,
570592
lockedQueueId,
571593
workerQueue,
572-
enableFastPath,
594+
region: migrated.region,
595+
enableFastPath: migrated.enableFastPath,
573596
lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined,
574597
delayUntil,
575598
ttl,
@@ -718,6 +741,7 @@ export class RunEngineTriggerTaskService {
718741
queueName: string;
719742
lockedQueueId?: string;
720743
workerQueue?: string;
744+
region?: string;
721745
enableFastPath: boolean;
722746
lockedToBackgroundWorker?: { id: string; version: string; sdkVersion: string; cliVersion: string };
723747
delayUntil?: Date;
@@ -771,6 +795,7 @@ export class RunEngineTriggerTaskService {
771795
queue: args.queueName,
772796
lockedQueueId: args.lockedQueueId,
773797
workerQueue: args.workerQueue,
798+
region: args.region,
774799
enableFastPath: args.enableFastPath,
775800
isTest: args.body.options?.test ?? false,
776801
delayUntil: args.delayUntil,

0 commit comments

Comments
 (0)