Skip to content

Commit cfa9052

Browse files
committed
refactor(run-engine): route TaskRun reads through the run store
Relocate the direct TaskRun reads in the engine and its systems to the RunStore read methods, preserving the exact client (writer, replica, or transaction) at each site. Behavior-preserving; the engine test suite is unchanged.
1 parent 13d5364 commit cfa9052

10 files changed

Lines changed: 278 additions & 221 deletions

File tree

internal-packages/run-engine/src/engine/index.ts

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,7 @@ export class RunEngine {
650650
"createCancelledRun: row already exists, returning existing (idempotent)",
651651
{ friendlyId: snapshot.friendlyId },
652652
);
653-
const existing = await prisma.taskRun.findFirst({ where: { id } });
653+
const existing = await this.runStore.findRun({ id }, prisma);
654654
if (existing) {
655655
// Only treat the conflict as idempotent when the existing
656656
// row is ALREADY canceled. If a non-canceled row landed
@@ -2325,16 +2325,19 @@ export class RunEngine {
23252325
});
23262326

23272327
//the run didn't start executing, we need to requeue it
2328-
const run = await prisma.taskRun.findFirst({
2329-
where: { id: runId },
2330-
include: {
2331-
runtimeEnvironment: {
2332-
include: {
2333-
organization: true,
2328+
const run = await this.runStore.findRun(
2329+
{ id: runId },
2330+
{
2331+
include: {
2332+
runtimeEnvironment: {
2333+
include: {
2334+
organization: true,
2335+
},
23342336
},
23352337
},
23362338
},
2337-
});
2339+
prisma
2340+
);
23382341

23392342
if (!run) {
23402343
this.logger.error(
@@ -2629,12 +2632,15 @@ export class RunEngine {
26292632
snapshotId,
26302633
});
26312634

2632-
const taskRun = await this.prisma.taskRun.findFirst({
2633-
where: { id: runId },
2634-
select: {
2635-
queue: true,
2635+
const taskRun = await this.runStore.findRun(
2636+
{ id: runId },
2637+
{
2638+
select: {
2639+
queue: true,
2640+
},
26362641
},
2637-
});
2642+
this.prisma
2643+
);
26382644

26392645
if (!taskRun) {
26402646
this.logger.error(
@@ -2708,7 +2714,7 @@ export class RunEngine {
27082714
runIds: string[],
27092715
completedAtOffsetMs: number = 1000 * 60 * 10
27102716
): Promise<Array<{ id: string; orgId: string }>> {
2711-
const runs = await this.readOnlyPrisma.taskRun.findMany({
2717+
const runs = await this.runStore.findRuns({
27122718
where: {
27132719
id: { in: runIds },
27142720
completedAt: {

internal-packages/run-engine/src/engine/retrying.ts

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
TaskRunExecutionRetry,
1111
} from "@trigger.dev/core/v3";
1212
import { PrismaClientOrTransaction } from "@trigger.dev/database";
13+
import { RunStore } from "@internal/run-store";
1314
import { MAX_TASK_RUN_ATTEMPTS } from "./consts.js";
1415
import { ServiceValidationError } from "./errors.js";
1516

@@ -45,6 +46,7 @@ export type RetryOutcome =
4546

4647
export async function retryOutcomeFromCompletion(
4748
prisma: PrismaClientOrTransaction,
49+
runStore: RunStore,
4850
{ runId, attemptNumber, error, retryUsingQueue, retrySettings }: Params
4951
): Promise<RetryOutcome> {
5052
// Canceled
@@ -56,7 +58,7 @@ export async function retryOutcomeFromCompletion(
5658

5759
// OOM error (retry on a larger machine or fail)
5860
if (isOOMRunError(error)) {
59-
const oomResult = await retryOOMOnMachine(prisma, runId);
61+
const oomResult = await retryOOMOnMachine(prisma, runStore, runId);
6062
if (!oomResult) {
6163
return { outcome: "fail_run", sanitizedError, wasOOMError: true };
6264
}
@@ -95,18 +97,21 @@ export async function retryOutcomeFromCompletion(
9597
}
9698

9799
// Get the run settings and current usage values
98-
const run = await prisma.taskRun.findFirst({
99-
where: {
100+
const run = await runStore.findRun(
101+
{
100102
id: runId,
101103
},
102-
select: {
103-
maxAttempts: true,
104-
lockedRetryConfig: true,
105-
usageDurationMs: true,
106-
costInCents: true,
107-
machinePreset: true,
104+
{
105+
select: {
106+
maxAttempts: true,
107+
lockedRetryConfig: true,
108+
usageDurationMs: true,
109+
costInCents: true,
110+
machinePreset: true,
111+
},
108112
},
109-
});
113+
prisma
114+
);
110115

111116
if (!run) {
112117
throw new ServiceValidationError("Run not found", 404);
@@ -179,6 +184,7 @@ export async function retryOutcomeFromCompletion(
179184

180185
async function retryOOMOnMachine(
181186
prisma: PrismaClientOrTransaction,
187+
runStore: RunStore,
182188
runId: string
183189
): Promise<{
184190
machine: string;
@@ -188,17 +194,20 @@ async function retryOOMOnMachine(
188194
machinePreset: string | null;
189195
} | undefined> {
190196
try {
191-
const run = await prisma.taskRun.findFirst({
192-
where: {
197+
const run = await runStore.findRun(
198+
{
193199
id: runId,
194200
},
195-
select: {
196-
machinePreset: true,
197-
lockedRetryConfig: true,
198-
usageDurationMs: true,
199-
costInCents: true,
201+
{
202+
select: {
203+
machinePreset: true,
204+
lockedRetryConfig: true,
205+
usageDurationMs: true,
206+
costInCents: true,
207+
},
200208
},
201-
});
209+
prisma
210+
);
202211

203212
if (!run || !run.lockedRetryConfig || !run.machinePreset) {
204213
return;

internal-packages/run-engine/src/engine/systems/batchSystem.ts

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,19 @@ export class BatchSystem {
8787
return;
8888
}
8989

90-
const runs = await this.$.prisma.taskRun.findMany({
91-
select: {
92-
id: true,
93-
status: true,
94-
},
95-
where: {
96-
batchId,
97-
runtimeEnvironmentId: batch.runtimeEnvironmentId,
90+
const runs = await this.$.runStore.findRuns(
91+
{
92+
select: {
93+
id: true,
94+
status: true,
95+
},
96+
where: {
97+
batchId,
98+
runtimeEnvironmentId: batch.runtimeEnvironmentId,
99+
},
98100
},
99-
});
101+
this.$.prisma
102+
);
100103

101104
if (runs.every((r) => isFinalRunStatus(r.status))) {
102105
this.$.logger.debug("#tryCompleteBatch: All runs are completed", { batchId });

internal-packages/run-engine/src/engine/systems/debounceSystem.ts

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -606,10 +606,11 @@ return 0
606606
return null;
607607
}
608608

609-
const probe = await prisma.taskRun.findFirst({
610-
where: { id: existingRunId },
611-
select: { status: true, delayUntil: true, createdAt: true },
612-
});
609+
const probe = await this.$.runStore.findRun(
610+
{ id: existingRunId },
611+
{ select: { status: true, delayUntil: true, createdAt: true } },
612+
prisma
613+
);
613614
if (!probe || probe.status !== "DELAYED" || !probe.delayUntil) {
614615
return null;
615616
}
@@ -632,10 +633,11 @@ return 0
632633
return null;
633634
}
634635

635-
const fullRun = await prisma.taskRun.findFirst({
636-
where: { id: existingRunId },
637-
include: { associatedWaitpoint: true },
638-
});
636+
const fullRun = await this.$.runStore.findRun(
637+
{ id: existingRunId },
638+
{ include: { associatedWaitpoint: true } },
639+
prisma
640+
);
639641
if (!fullRun || fullRun.status !== "DELAYED") {
640642
return null;
641643
}
@@ -665,10 +667,11 @@ return 0
665667
error: unknown;
666668
prisma: PrismaClientOrTransaction;
667669
}): Promise<DebounceResult> {
668-
const fullRun = await prisma.taskRun.findFirst({
669-
where: { id: existingRunId },
670-
include: { associatedWaitpoint: true },
671-
});
670+
const fullRun = await this.$.runStore.findRun(
671+
{ id: existingRunId },
672+
{ include: { associatedWaitpoint: true } },
673+
prisma
674+
);
672675

673676
if (!fullRun || fullRun.status !== "DELAYED") {
674677
// The run is no longer in a state we can safely return as "existing" -
@@ -775,12 +778,15 @@ return 0
775778
}
776779

777780
// Get the run to check debounce metadata and createdAt
778-
const existingRun = await prisma.taskRun.findFirst({
779-
where: { id: existingRunId },
780-
include: {
781-
associatedWaitpoint: true,
781+
const existingRun = await this.$.runStore.findRun(
782+
{ id: existingRunId },
783+
{
784+
include: {
785+
associatedWaitpoint: true,
786+
},
782787
},
783-
});
788+
prisma
789+
);
784790

785791
if (!existingRun) {
786792
this.$.logger.debug("handleExistingRun: existing run not found in database", {

internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,17 +110,20 @@ export class DelayedRunSystem {
110110
return;
111111
}
112112

113-
const run = await this.$.prisma.taskRun.findFirst({
114-
where: { id: runId },
115-
include: {
116-
runtimeEnvironment: {
117-
include: {
118-
project: true,
119-
organization: true,
113+
const run = await this.$.runStore.findRun(
114+
{ id: runId },
115+
{
116+
include: {
117+
runtimeEnvironment: {
118+
include: {
119+
project: true,
120+
organization: true,
121+
},
120122
},
121123
},
122124
},
123-
});
125+
this.$.prisma
126+
);
124127

125128
if (!run) {
126129
throw new Error(`#enqueueDelayedRun: run not found: ${runId}`);

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -641,12 +641,15 @@ export class DequeueSystem {
641641

642642
// Wrap the Prisma call with tryCatch - if DB is unavailable, we still want to nack via Redis
643643
const [findError, run] = await tryCatch(
644-
prisma.taskRun.findFirst({
645-
where: { id: runId },
646-
include: {
647-
runtimeEnvironment: true,
644+
this.$.runStore.findRun(
645+
{ id: runId },
646+
{
647+
include: {
648+
runtimeEnvironment: true,
649+
},
648650
},
649-
})
651+
prisma
652+
)
650653
);
651654

652655
// If DB is unavailable or run not found, just nack directly via Redis
@@ -808,26 +811,29 @@ export class DequeueSystem {
808811
return startSpan(this.$.tracer, "getRunWithBackgroundWorkerTasks", async (span) => {
809812
span.setAttribute("run_id", runId);
810813

811-
const run = await prisma.taskRun.findFirst({
812-
where: {
814+
const run = await this.$.runStore.findRun(
815+
{
813816
id: runId,
814817
},
815-
include: {
816-
runtimeEnvironment: {
817-
select: {
818-
id: true,
819-
type: true,
820-
archivedAt: true,
818+
{
819+
include: {
820+
runtimeEnvironment: {
821+
select: {
822+
id: true,
823+
type: true,
824+
archivedAt: true,
825+
},
821826
},
822-
},
823-
lockedToVersion: {
824-
include: {
825-
deployment: true,
826-
tasks: true,
827+
lockedToVersion: {
828+
include: {
829+
deployment: true,
830+
tasks: true,
831+
},
827832
},
828833
},
829834
},
830-
});
835+
prisma
836+
);
831837

832838
if (!run) {
833839
span.setAttribute("result", "NO_RUN");

internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,18 @@ export class PendingVersionSystem {
9393
// is dropped. The planner uses the PK for `id IN (…)`; the status
9494
// predicate is a residual filter and does NOT require the status
9595
// index.
96-
const pendingRuns = await this.$.prisma.taskRun.findMany({
97-
where: {
98-
id: { in: candidateIds },
99-
status: "PENDING_VERSION",
100-
},
101-
orderBy: {
102-
createdAt: "asc",
96+
const pendingRuns = await this.$.runStore.findRuns(
97+
{
98+
where: {
99+
id: { in: candidateIds },
100+
status: "PENDING_VERSION",
101+
},
102+
orderBy: {
103+
createdAt: "asc",
104+
},
103105
},
104-
});
106+
this.$.prisma
107+
);
105108

106109
if (!pendingRuns.length) {
107110
// CH returned candidates but all of them have already moved past
@@ -135,7 +138,7 @@ export class PendingVersionSystem {
135138
return false;
136139
}
137140

138-
const updatedRun = await tx.taskRun.findFirstOrThrow({ where: { id: run.id } });
141+
const updatedRun = await this.$.runStore.findRunOrThrow({ id: run.id }, tx);
139142

140143
await this.enqueueSystem.enqueueRun({
141144
run: updatedRun,

0 commit comments

Comments
 (0)