Skip to content

Commit f1ab6ae

Browse files
committed
feat(run-store): implement expiry, dequeue-lock, version, and checkpoint methods
1 parent f8456c1 commit f1ab6ae

2 files changed

Lines changed: 521 additions & 37 deletions

File tree

internal-packages/run-store/src/PostgresRunStore.test.ts

Lines changed: 377 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,4 +551,381 @@ describe("PostgresRunStore", () => {
551551
expect(run.usageDurationMs).toBe(150);
552552
expect(run.costInCents).toBe(3);
553553
});
554+
555+
postgresTest(
556+
"expireRun sets status to EXPIRED with distinct completedAt/expiredAt, error set, and one FINISHED/EXPIRED snapshot",
557+
async ({ prisma }) => {
558+
const { organization, project, environment } = await seedEnvironment(prisma);
559+
560+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
561+
const runId = "run_expire_1";
562+
563+
await store.createRun(
564+
buildCreateRunInput({
565+
runId,
566+
organizationId: organization.id,
567+
projectId: project.id,
568+
runtimeEnvironmentId: environment.id,
569+
})
570+
);
571+
572+
const completedAt = new Date("2026-06-01T10:00:00.000Z");
573+
const expiredAt = new Date("2026-06-01T10:00:01.000Z");
574+
const error = { type: "STRING_ERROR" as const, raw: "Run expired because the TTL was reached" };
575+
576+
const run = await store.expireRun(
577+
runId,
578+
{
579+
error,
580+
completedAt,
581+
expiredAt,
582+
snapshot: {
583+
engine: "V2",
584+
executionStatus: "FINISHED",
585+
description: "Run was expired because the TTL was reached",
586+
runStatus: "EXPIRED",
587+
environmentId: environment.id,
588+
environmentType: "DEVELOPMENT",
589+
projectId: project.id,
590+
organizationId: organization.id,
591+
},
592+
},
593+
{
594+
select: {
595+
id: true,
596+
status: true,
597+
completedAt: true,
598+
expiredAt: true,
599+
error: true,
600+
},
601+
}
602+
);
603+
604+
expect(run.id).toBe(runId);
605+
expect(run.status).toBe("EXPIRED");
606+
expect(run.completedAt).toEqual(completedAt);
607+
expect(run.expiredAt).toEqual(expiredAt);
608+
// completedAt and expiredAt are distinct
609+
expect(run.completedAt?.getTime()).not.toBe(run.expiredAt?.getTime());
610+
611+
const snapshots = await prisma.taskRunExecutionSnapshot.findMany({
612+
where: { runId, executionStatus: "FINISHED", runStatus: "EXPIRED" },
613+
});
614+
expect(snapshots).toHaveLength(1);
615+
}
616+
);
617+
618+
postgresTest(
619+
"expireRunsBatch sets EXPIRED status with all four timestamps equal to now and error set; returns correct count",
620+
async ({ prisma }) => {
621+
const { organization, project, environment } = await seedEnvironment(prisma);
622+
623+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
624+
625+
const runId1 = "run_expire_batch_1";
626+
const runId2 = "run_expire_batch_2";
627+
628+
for (const id of [runId1, runId2]) {
629+
await prisma.taskRun.create({
630+
data: {
631+
id,
632+
engine: "V2",
633+
status: "PENDING",
634+
friendlyId: `run_expire_batch_friendly_${id}`,
635+
runtimeEnvironmentId: environment.id,
636+
environmentType: "DEVELOPMENT",
637+
organizationId: organization.id,
638+
projectId: project.id,
639+
taskIdentifier: "my-task",
640+
payload: "{}",
641+
payloadType: "application/json",
642+
traceContext: {},
643+
traceId: `trace_${id}`,
644+
spanId: `span_${id}`,
645+
queue: "task/my-task",
646+
isTest: false,
647+
taskEventStore: "taskEvent",
648+
depth: 0,
649+
},
650+
});
651+
}
652+
653+
const now = new Date("2026-06-01T12:00:00.000Z");
654+
const error = { type: "STRING_ERROR" as const, raw: "Run expired because the TTL was reached" };
655+
656+
const count = await store.expireRunsBatch([runId1, runId2], { error, now });
657+
658+
expect(count).toBe(2);
659+
660+
for (const id of [runId1, runId2]) {
661+
const row = await prisma.taskRun.findUniqueOrThrow({
662+
where: { id },
663+
select: { status: true, completedAt: true, expiredAt: true, updatedAt: true },
664+
});
665+
expect(row.status).toBe("EXPIRED");
666+
expect(row.completedAt).toEqual(now);
667+
expect(row.expiredAt).toEqual(now);
668+
expect(row.updatedAt).toEqual(now);
669+
}
670+
}
671+
);
672+
673+
postgresTest(
674+
"lockRunToWorker sets status to DEQUEUED with lock columns, includes runtimeEnvironment, and creates one PENDING_EXECUTING snapshot",
675+
async ({ prisma }) => {
676+
const { organization, project, environment } = await seedEnvironment(prisma);
677+
678+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
679+
const runId = "run_lock_1";
680+
681+
await store.createRun(
682+
buildCreateRunInput({
683+
runId,
684+
organizationId: organization.id,
685+
projectId: project.id,
686+
runtimeEnvironmentId: environment.id,
687+
})
688+
);
689+
690+
// Seed a background worker task to use as lockedById
691+
const backgroundWorker = await prisma.backgroundWorker.create({
692+
data: {
693+
friendlyId: "worker_friendly_1",
694+
version: "20260601.1",
695+
runtimeEnvironmentId: environment.id,
696+
projectId: project.id,
697+
contentHash: "abc123",
698+
sdkVersion: "3.0.0",
699+
cliVersion: "3.0.0",
700+
metadata: {},
701+
},
702+
});
703+
704+
const workerTask = await prisma.backgroundWorkerTask.create({
705+
data: {
706+
friendlyId: "task_friendly_1",
707+
slug: "my-task",
708+
filePath: "src/my-task.ts",
709+
exportName: "myTask",
710+
workerId: backgroundWorker.id,
711+
runtimeEnvironmentId: environment.id,
712+
projectId: project.id,
713+
},
714+
});
715+
716+
const queue = await prisma.taskQueue.create({
717+
data: {
718+
friendlyId: "queue_friendly_1",
719+
name: "task/my-task",
720+
runtimeEnvironmentId: environment.id,
721+
projectId: project.id,
722+
},
723+
});
724+
725+
// Seed a prior snapshot to use as previousSnapshotId
726+
const priorSnapshot = await prisma.taskRunExecutionSnapshot.create({
727+
data: {
728+
engine: "V2",
729+
executionStatus: "RUN_CREATED",
730+
description: "prior",
731+
runStatus: "PENDING",
732+
environmentId: environment.id,
733+
environmentType: "DEVELOPMENT",
734+
projectId: project.id,
735+
organizationId: organization.id,
736+
runId,
737+
},
738+
});
739+
740+
const lockedAt = new Date("2026-06-01T13:00:00.000Z");
741+
const startedAt = new Date("2026-06-01T13:00:01.000Z");
742+
const snapshotId = "snap_lock_1";
743+
744+
const locked = await store.lockRunToWorker(runId, {
745+
lockedAt,
746+
lockedById: workerTask.id,
747+
lockedToVersionId: backgroundWorker.id,
748+
lockedQueueId: queue.id,
749+
startedAt,
750+
baseCostInCents: 5,
751+
machinePreset: "small-1x",
752+
taskVersion: "20260601.1",
753+
sdkVersion: "3.0.0",
754+
cliVersion: "3.0.0",
755+
maxDurationInSeconds: null,
756+
snapshot: {
757+
id: snapshotId,
758+
previousSnapshotId: priorSnapshot.id,
759+
environmentId: environment.id,
760+
environmentType: "DEVELOPMENT",
761+
projectId: project.id,
762+
organizationId: organization.id,
763+
completedWaitpointIds: [],
764+
completedWaitpointOrder: [],
765+
},
766+
});
767+
768+
expect(locked.status).toBe("DEQUEUED");
769+
expect(locked.lockedAt).toEqual(lockedAt);
770+
expect(locked.lockedById).toBe(workerTask.id);
771+
expect(locked.lockedToVersionId).toBe(backgroundWorker.id);
772+
expect(locked.lockedQueueId).toBe(queue.id);
773+
expect(locked.runtimeEnvironment).toBeDefined();
774+
expect(locked.runtimeEnvironment.id).toBe(environment.id);
775+
776+
const snap = await prisma.taskRunExecutionSnapshot.findUnique({ where: { id: snapshotId } });
777+
expect(snap).not.toBeNull();
778+
expect(snap?.executionStatus).toBe("PENDING_EXECUTING");
779+
expect(snap?.runStatus).toBe("PENDING");
780+
}
781+
);
782+
783+
postgresTest("parkPendingVersion sets status to PENDING_VERSION and stores statusReason", async ({ prisma }) => {
784+
const { organization, project, environment } = await seedEnvironment(prisma);
785+
786+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
787+
const runId = "run_park_1";
788+
789+
await store.createRun(
790+
buildCreateRunInput({
791+
runId,
792+
organizationId: organization.id,
793+
projectId: project.id,
794+
runtimeEnvironmentId: environment.id,
795+
})
796+
);
797+
798+
const run = await store.parkPendingVersion(
799+
runId,
800+
{ statusReason: "No background worker found" },
801+
{ select: { id: true, status: true, statusReason: true } }
802+
);
803+
804+
expect(run.id).toBe(runId);
805+
expect(run.status).toBe("PENDING_VERSION");
806+
expect(run.statusReason).toBe("No background worker found");
807+
});
808+
809+
postgresTest(
810+
"promotePendingVersionRuns flips PENDING_VERSION to PENDING and returns count 1; run in another status returns count 0 and is unchanged",
811+
async ({ prisma }) => {
812+
const { organization, project, environment } = await seedEnvironment(prisma);
813+
814+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
815+
816+
// Seed a PENDING_VERSION run
817+
const pendingVersionId = "run_promote_pv_1";
818+
await prisma.taskRun.create({
819+
data: {
820+
id: pendingVersionId,
821+
engine: "V2",
822+
status: "PENDING_VERSION",
823+
friendlyId: "run_promote_pv_friendly_1",
824+
runtimeEnvironmentId: environment.id,
825+
environmentType: "DEVELOPMENT",
826+
organizationId: organization.id,
827+
projectId: project.id,
828+
taskIdentifier: "my-task",
829+
payload: "{}",
830+
payloadType: "application/json",
831+
traceContext: {},
832+
traceId: "trace_pv1",
833+
spanId: "span_pv1",
834+
queue: "task/my-task",
835+
isTest: false,
836+
taskEventStore: "taskEvent",
837+
depth: 0,
838+
},
839+
});
840+
841+
const result = await store.promotePendingVersionRuns(pendingVersionId);
842+
843+
expect(result.count).toBe(1);
844+
845+
const promoted = await prisma.taskRun.findUniqueOrThrow({ where: { id: pendingVersionId }, select: { status: true } });
846+
expect(promoted.status).toBe("PENDING");
847+
848+
// Seed a run NOT in PENDING_VERSION (e.g. EXECUTING)
849+
const executingId = "run_promote_exec_1";
850+
await prisma.taskRun.create({
851+
data: {
852+
id: executingId,
853+
engine: "V2",
854+
status: "EXECUTING",
855+
friendlyId: "run_promote_exec_friendly_1",
856+
runtimeEnvironmentId: environment.id,
857+
environmentType: "DEVELOPMENT",
858+
organizationId: organization.id,
859+
projectId: project.id,
860+
taskIdentifier: "my-task",
861+
payload: "{}",
862+
payloadType: "application/json",
863+
traceContext: {},
864+
traceId: "trace_exec1",
865+
spanId: "span_exec1",
866+
queue: "task/my-task",
867+
isTest: false,
868+
taskEventStore: "taskEvent",
869+
depth: 0,
870+
},
871+
});
872+
873+
const result2 = await store.promotePendingVersionRuns(executingId);
874+
875+
expect(result2.count).toBe(0);
876+
877+
const unchanged = await prisma.taskRun.findUniqueOrThrow({ where: { id: executingId }, select: { status: true } });
878+
expect(unchanged.status).toBe("EXECUTING");
879+
}
880+
);
881+
882+
postgresTest("suspendForCheckpoint sets status to WAITING_TO_RESUME", async ({ prisma }) => {
883+
const { organization, project, environment } = await seedEnvironment(prisma);
884+
885+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
886+
const runId = "run_suspend_1";
887+
888+
await store.createRun(
889+
buildCreateRunInput({
890+
runId,
891+
organizationId: organization.id,
892+
projectId: project.id,
893+
runtimeEnvironmentId: environment.id,
894+
})
895+
);
896+
897+
const run = await store.suspendForCheckpoint(runId, {
898+
include: { runtimeEnvironment: true },
899+
});
900+
901+
expect(run.id).toBe(runId);
902+
expect(run.status).toBe("WAITING_TO_RESUME");
903+
expect(run.runtimeEnvironment).toBeDefined();
904+
});
905+
906+
postgresTest("resumeFromCheckpoint sets status to EXECUTING", async ({ prisma }) => {
907+
const { organization, project, environment } = await seedEnvironment(prisma);
908+
909+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
910+
const runId = "run_resume_1";
911+
912+
await store.createRun(
913+
buildCreateRunInput({
914+
runId,
915+
organizationId: organization.id,
916+
projectId: project.id,
917+
runtimeEnvironmentId: environment.id,
918+
})
919+
);
920+
921+
// Suspend first so we start from a realistic state
922+
await store.suspendForCheckpoint(runId, { include: {} });
923+
924+
const run = await store.resumeFromCheckpoint(runId, {
925+
select: { id: true, status: true },
926+
});
927+
928+
expect(run.id).toBe(runId);
929+
expect(run.status).toBe("EXECUTING");
930+
});
554931
});

0 commit comments

Comments
 (0)