Skip to content

Commit d530eb1

Browse files
committed
refactor(run-engine): route expiry and dequeue-lock writes through RunStore
1 parent 8650e40 commit d530eb1

2 files changed

Lines changed: 82 additions & 102 deletions

File tree

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 43 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -419,17 +419,14 @@ export class DequeueSystem {
419419
// Pre-generate snapshot ID so we can construct the result without an extra read
420420
const snapshotId = generateInternalId();
421421

422-
const lockedTaskRun = await prisma.taskRun.update({
423-
where: {
424-
id: runId,
425-
},
426-
data: {
422+
const lockedTaskRun = await this.$.runStore.lockRunToWorker(
423+
runId,
424+
{
427425
lockedAt,
428426
lockedById: result.task.id,
429427
lockedToVersionId: result.worker.id,
430428
lockedQueueId: result.queue.id,
431429
lockedRetryConfig: lockedRetryConfig ?? undefined,
432-
status: "DEQUEUED",
433430
startedAt,
434431
baseCostInCents: this.options.machines.baseCostInCents,
435432
machinePreset: machinePreset.name,
@@ -438,38 +435,27 @@ export class DequeueSystem {
438435
cliVersion: result.worker.cliVersion,
439436
maxDurationInSeconds,
440437
maxAttempts: maxAttempts ?? undefined,
441-
executionSnapshots: {
442-
create: {
443-
id: snapshotId,
444-
engine: "V2",
445-
executionStatus: "PENDING_EXECUTING",
446-
description: "Run was dequeued for execution",
447-
// Map DEQUEUED -> PENDING for backwards compatibility with older runners
448-
runStatus: "PENDING",
449-
attemptNumber: result.run.attemptNumber ?? undefined,
450-
previousSnapshotId: snapshot.id,
451-
environmentId: snapshot.environmentId,
452-
environmentType: snapshot.environmentType,
453-
projectId: snapshot.projectId,
454-
organizationId: snapshot.organizationId,
455-
checkpointId: snapshot.checkpointId ?? undefined,
456-
batchId: snapshot.batchId ?? undefined,
457-
completedWaitpoints: {
458-
connect: snapshot.completedWaitpoints.map((w) => ({ id: w.id })),
459-
},
460-
completedWaitpointOrder: snapshot.completedWaitpoints
461-
.filter((c) => c.index !== undefined)
462-
.sort((a, b) => a.index! - b.index!)
463-
.map((w) => w.id),
464-
workerId,
465-
runnerId,
466-
},
438+
snapshot: {
439+
id: snapshotId,
440+
previousSnapshotId: snapshot.id,
441+
attemptNumber: result.run.attemptNumber ?? undefined,
442+
environmentId: snapshot.environmentId,
443+
environmentType: snapshot.environmentType,
444+
projectId: snapshot.projectId,
445+
organizationId: snapshot.organizationId,
446+
checkpointId: snapshot.checkpointId ?? undefined,
447+
batchId: snapshot.batchId ?? undefined,
448+
completedWaitpointIds: snapshot.completedWaitpoints.map((w) => w.id),
449+
completedWaitpointOrder: snapshot.completedWaitpoints
450+
.filter((c) => c.index !== undefined)
451+
.sort((a, b) => a.index! - b.index!)
452+
.map((w) => w.id),
453+
workerId,
454+
runnerId,
467455
},
468456
},
469-
include: {
470-
runtimeEnvironment: true,
471-
},
472-
});
457+
prisma
458+
);
473459

474460
this.$.eventBus.emit("runLocked", {
475461
time: new Date(),
@@ -741,30 +727,32 @@ export class DequeueSystem {
741727
});
742728

743729
//mark run as waiting for deploy
744-
const run = await prisma.taskRun.update({
745-
where: { id: runId },
746-
data: {
747-
status: "PENDING_VERSION",
730+
const run = await this.$.runStore.parkPendingVersion(
731+
runId,
732+
{
748733
statusReason,
749734
},
750-
select: {
751-
id: true,
752-
status: true,
753-
attemptNumber: true,
754-
updatedAt: true,
755-
createdAt: true,
756-
runTags: true,
757-
batchId: true,
758-
runtimeEnvironment: {
759-
select: {
760-
id: true,
761-
type: true,
762-
projectId: true,
763-
project: { select: { id: true, organizationId: true } },
735+
{
736+
select: {
737+
id: true,
738+
status: true,
739+
attemptNumber: true,
740+
updatedAt: true,
741+
createdAt: true,
742+
runTags: true,
743+
batchId: true,
744+
runtimeEnvironment: {
745+
select: {
746+
id: true,
747+
type: true,
748+
projectId: true,
749+
project: { select: { id: true, organizationId: true } },
750+
},
764751
},
765752
},
766753
},
767-
});
754+
prisma
755+
);
768756

769757
this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version", {
770758
runId,

internal-packages/run-engine/src/engine/systems/ttlSystem.ts

Lines changed: 39 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic";
22
import { TaskRunError } from "@trigger.dev/core/v3/schemas";
3-
import { Prisma, PrismaClientOrTransaction, TaskRunStatus } from "@trigger.dev/database";
3+
import { PrismaClientOrTransaction, TaskRunStatus } from "@trigger.dev/database";
44
import { isExecuting } from "../statuses.js";
55
import { getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
66
import { SystemResources } from "./systems.js";
@@ -61,51 +61,51 @@ export class TtlSystem {
6161
raw: `Run expired because the TTL (${run.ttl}) was reached`,
6262
};
6363

64-
const updatedRun = await prisma.taskRun.update({
65-
where: { id: runId },
66-
data: {
67-
status: "EXPIRED",
64+
const updatedRun = await this.$.runStore.expireRun(
65+
runId,
66+
{
67+
error,
6868
completedAt: new Date(),
6969
expiredAt: new Date(),
70-
error,
71-
executionSnapshots: {
72-
create: {
73-
engine: "V2",
74-
executionStatus: "FINISHED",
75-
description: "Run was expired because the TTL was reached",
76-
runStatus: "EXPIRED",
77-
environmentId: snapshot.environmentId,
78-
environmentType: snapshot.environmentType,
79-
projectId: snapshot.projectId,
80-
organizationId: snapshot.organizationId,
81-
},
70+
snapshot: {
71+
engine: "V2",
72+
executionStatus: "FINISHED",
73+
description: "Run was expired because the TTL was reached",
74+
runStatus: "EXPIRED",
75+
environmentId: snapshot.environmentId,
76+
environmentType: snapshot.environmentType,
77+
projectId: snapshot.projectId,
78+
organizationId: snapshot.organizationId,
8279
},
8380
},
84-
select: {
85-
id: true,
86-
spanId: true,
87-
ttl: true,
88-
updatedAt: true,
89-
associatedWaitpoint: {
90-
select: {
91-
id: true,
81+
{
82+
select: {
83+
id: true,
84+
spanId: true,
85+
ttl: true,
86+
updatedAt: true,
87+
associatedWaitpoint: {
88+
select: {
89+
id: true,
90+
},
9291
},
93-
},
94-
runtimeEnvironment: {
95-
select: {
96-
organizationId: true,
97-
projectId: true,
98-
id: true,
92+
runtimeEnvironment: {
93+
select: {
94+
organizationId: true,
95+
projectId: true,
96+
id: true,
97+
},
9998
},
99+
createdAt: true,
100+
completedAt: true,
101+
taskEventStore: true,
102+
parentTaskRunId: true,
103+
expiredAt: true,
104+
status: true,
100105
},
101-
createdAt: true,
102-
completedAt: true,
103-
taskEventStore: true,
104-
parentTaskRunId: true,
105-
expiredAt: true,
106-
status: true,
107106
},
108-
});
107+
prisma
108+
);
109109

110110
await this.$.runQueue.acknowledgeMessage(
111111
updatedRun.runtimeEnvironment.organizationId,
@@ -228,15 +228,7 @@ export class TtlSystem {
228228
raw: "Run expired because the TTL was reached",
229229
};
230230

231-
await this.$.prisma.$executeRaw`
232-
UPDATE "TaskRun"
233-
SET "status" = 'EXPIRED'::"TaskRunStatus",
234-
"completedAt" = ${now},
235-
"expiredAt" = ${now},
236-
"updatedAt" = ${now},
237-
"error" = ${JSON.stringify(error)}::jsonb
238-
WHERE "id" IN (${Prisma.join(runIdsToExpire)})
239-
`;
231+
await this.$.runStore.expireRunsBatch(runIdsToExpire, { error, now }, this.$.prisma);
240232

241233
// Process each run: enqueue waitpoint completion jobs and emit events
242234
await pMap(

0 commit comments

Comments
 (0)