Skip to content

Commit 8812c1c

Browse files
committed
feat(webapp): route migrated orgs to the compute backing at trigger
1 parent fd74e7d commit 8812c1c

1 file changed

Lines changed: 27 additions & 3 deletions

File tree

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ import {
3838
resolveScheduledQueueSplitEnabled,
3939
workerQueueForRun,
4040
} from "../concerns/workerQueueSplit.server";
41+
import {
42+
parseComputeBackingMap,
43+
resolveComputeMigration,
44+
} from "../concerns/computeMigration.server";
45+
import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server";
4146
import {
4247
publishClaim as publishMollifierClaim,
4348
releaseClaim as releaseMollifierClaim,
@@ -65,6 +70,8 @@ import { mollifyTrigger } from "~/v3/mollifier/mollifierMollify.server";
6570
import { type MollifierBuffer } from "@trigger.dev/redis-worker";
6671
import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server";
6772

73+
const COMPUTE_BACKING_MAP = parseComputeBackingMap(env.COMPUTE_BACKING_MAP);
74+
6875
class NoopTriggerRacepointSystem implements TriggerRacepointSystem {
6976
async waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise<void> {
7077
return;
@@ -358,6 +365,23 @@ export class RunEngineTriggerTaskService {
358365
const baseWorkerQueue = workerQueueResult?.masterQueue;
359366
const enableFastPath = workerQueueResult?.enableFastPath ?? false;
360367

368+
// Plan-aware compute migration: rewrite the resolved region to its
369+
// compute backing for enrolled orgs. Reads the in-memory global-flags
370+
// snapshot (no DB query). Gate the first read on the registry so a cold
371+
// replica never serves a default over a real flag value.
372+
if (!globalFlagsRegistry.isLoaded) {
373+
await globalFlagsRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS);
374+
}
375+
const migratedWorkerQueue = resolveComputeMigration({
376+
baseWorkerQueue,
377+
planType,
378+
orgId: environment.organization.id,
379+
orgFeatureFlags: environment.organization.featureFlags as Record<string, unknown> | null,
380+
flags: globalFlagsRegistry.current(),
381+
envType: environment.type,
382+
backingMap: COMPUTE_BACKING_MAP,
383+
});
384+
361385
// Build annotations for this run
362386
const triggerSource = options.triggerSource ?? "api";
363387
const triggerAction = options.triggerAction ?? "trigger";
@@ -386,13 +410,13 @@ export class RunEngineTriggerTaskService {
386410
globalDefault: env.TRIGGER_WORKER_QUEUE_SCHEDULED_SPLIT_ENABLED === "1",
387411
});
388412
const workerQueue =
389-
baseWorkerQueue !== undefined
413+
migratedWorkerQueue !== undefined
390414
? workerQueueForRun({
391-
workerQueue: baseWorkerQueue,
415+
workerQueue: migratedWorkerQueue,
392416
rootTriggerSource: annotations.rootTriggerSource,
393417
splitEnabled: scheduledQueueSplitEnabled,
394418
})
395-
: baseWorkerQueue;
419+
: migratedWorkerQueue;
396420

397421
try {
398422
return await this.traceEventConcern.traceRun(

0 commit comments

Comments
 (0)