Skip to content

Commit 109c6a7

Browse files
committed
refactor(run-engine): route checkpoint, delayed, pending-version, and debounce writes through RunStore
1 parent 4ec5aab commit 109c6a7

4 files changed

Lines changed: 45 additions & 65 deletions

File tree

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -115,22 +115,20 @@ export class CheckpointSystem {
115115
}
116116

117117
// Get the run and update the status
118-
const run = await this.$.prisma.taskRun.update({
119-
where: {
120-
id: runId,
121-
},
122-
data: {
123-
status: "WAITING_TO_RESUME",
124-
},
125-
include: {
126-
runtimeEnvironment: {
127-
include: {
128-
project: true,
129-
organization: true,
118+
const run = await this.$.runStore.suspendForCheckpoint(
119+
runId,
120+
{
121+
include: {
122+
runtimeEnvironment: {
123+
include: {
124+
project: true,
125+
organization: true,
126+
},
130127
},
131128
},
132129
},
133-
});
130+
this.$.prisma
131+
);
134132

135133
if (!run) {
136134
this.$.logger.error("Run not found for createCheckpoint", {
@@ -294,26 +292,24 @@ export class CheckpointSystem {
294292
}
295293

296294
// Get the run and update the status
297-
const run = await this.$.prisma.taskRun.update({
298-
where: {
299-
id: runId,
300-
},
301-
data: {
302-
status: "EXECUTING",
303-
},
304-
select: {
305-
id: true,
306-
status: true,
307-
attemptNumber: true,
308-
organizationId: true,
309-
runtimeEnvironmentId: true,
310-
projectId: true,
311-
updatedAt: true,
312-
createdAt: true,
313-
runTags: true,
314-
batchId: true,
295+
const run = await this.$.runStore.resumeFromCheckpoint(
296+
runId,
297+
{
298+
select: {
299+
id: true,
300+
status: true,
301+
attemptNumber: true,
302+
organizationId: true,
303+
runtimeEnvironmentId: true,
304+
projectId: true,
305+
updatedAt: true,
306+
createdAt: true,
307+
runTags: true,
308+
batchId: true,
309+
},
315310
},
316-
});
311+
this.$.prisma
312+
);
317313

318314
if (!run) {
319315
this.$.logger.error("Run not found for createCheckpoint", {

internal-packages/run-engine/src/engine/systems/debounceSystem.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,13 +1160,7 @@ return 0
11601160
updatePayload.runTags = updateData.tags;
11611161
}
11621162

1163-
const updatedRun = await prisma.taskRun.update({
1164-
where: { id: runId },
1165-
data: updatePayload,
1166-
include: {
1167-
associatedWaitpoint: true,
1168-
},
1169-
});
1163+
const updatedRun = await this.$.runStore.rewriteDebouncedRun(runId, updatePayload, prisma);
11701164

11711165
return updatedRun;
11721166
}

internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,26 +48,19 @@ export class DelayedRunSystem {
4848
throw new ServiceValidationError("Cannot reschedule a run that is not delayed");
4949
}
5050

51-
const updatedRun = await prisma.taskRun.update({
52-
where: {
53-
id: runId,
54-
},
55-
data: {
51+
const updatedRun = await this.$.runStore.rescheduleRun(
52+
runId,
53+
{
5654
delayUntil: delayUntil,
57-
executionSnapshots: {
58-
create: {
59-
engine: "V2",
60-
executionStatus: "DELAYED",
61-
description: "Delayed run was rescheduled to a future date",
62-
runStatus: "DELAYED",
63-
environmentId: snapshot.environmentId,
64-
environmentType: snapshot.environmentType,
65-
projectId: snapshot.projectId,
66-
organizationId: snapshot.organizationId,
67-
},
55+
snapshot: {
56+
environmentId: snapshot.environmentId,
57+
environmentType: snapshot.environmentType,
58+
projectId: snapshot.projectId,
59+
organizationId: snapshot.organizationId,
6860
},
6961
},
70-
});
62+
prisma
63+
);
7164

7265
await this.$.worker.reschedule(`enqueueDelayedRun:${updatedRun.id}`, delayUntil);
7366

@@ -178,13 +171,13 @@ export class DelayedRunSystem {
178171

179172
const queuedAt = new Date();
180173

181-
const updatedRun = await this.$.prisma.taskRun.update({
182-
where: { id: runId },
183-
data: {
184-
status: "PENDING",
174+
const updatedRun = await this.$.runStore.enqueueDelayedRun(
175+
runId,
176+
{
185177
queuedAt,
186178
},
187-
});
179+
this.$.prisma
180+
);
188181

189182
this.$.eventBus.emit("runEnqueuedAfterDelay", {
190183
time: new Date(),

internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,7 @@ export class PendingVersionSystem {
129129
// Idempotency guard: only flips PENDING_VERSION → PENDING. If another
130130
// worker already promoted this run between our findMany and the
131131
// update, count is 0 and we skip the enqueue.
132-
const updateResult = await tx.taskRun.updateMany({
133-
where: { id: run.id, status: "PENDING_VERSION" },
134-
data: { status: "PENDING" },
135-
});
132+
const updateResult = await this.$.runStore.promotePendingVersionRuns(run.id, tx);
136133

137134
if (updateResult.count === 0) {
138135
return false;

0 commit comments

Comments
 (0)