Skip to content

Commit c5226a2

Browse files
committed
feat(run-store): add TaskRun read methods to the run store
Add findRun, findRunOrThrow and findRuns to RunStore, mirroring the existing write methods. They pass where/select/include through the same Prisma generics and default to the read replica, while letting the caller pass the writer or a transaction client when needed. This lets Postgres reads of TaskRun be routed through the store the same way writes already are. Additive only; no call sites change yet.
1 parent 76f3494 commit c5226a2

4 files changed

Lines changed: 290 additions & 0 deletions

File tree

internal-packages/run-store/src/NoopRunStore.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,7 @@ export class NoopRunStore implements RunStore {
2929
clearIdempotencyKey(): never { return this.fail("clearIdempotencyKey"); }
3030
pushTags(): never { return this.fail("pushTags"); }
3131
pushRealtimeStream(): never { return this.fail("pushRealtimeStream"); }
32+
findRun(): never { return this.fail("findRun"); }
33+
findRunOrThrow(): never { return this.fail("findRunOrThrow"); }
34+
findRuns(): never { return this.fail("findRuns"); }
3235
}

internal-packages/run-store/src/PostgresRunStore.test.ts

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,3 +1528,159 @@ describe("PostgresRunStore — delayed / debounce / metadata / idempotency / arr
15281528
}
15291529
);
15301530
});
1531+
1532+
describe("PostgresRunStore — read", () => {
1533+
postgresTest("findRun by id with select returns the projected row", async ({ prisma }) => {
1534+
const { organization, project, environment } = await seedEnvironment(prisma);
1535+
1536+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
1537+
const runId = "run_find_select_id_1";
1538+
1539+
await store.createRun(
1540+
buildCreateRunInput({
1541+
runId,
1542+
organizationId: organization.id,
1543+
projectId: project.id,
1544+
runtimeEnvironmentId: environment.id,
1545+
})
1546+
);
1547+
1548+
const run = await store.findRun({ id: runId }, { select: { friendlyId: true } });
1549+
1550+
expect(run).toEqual({ friendlyId: "run_friendly_1" });
1551+
});
1552+
1553+
postgresTest("findRun by friendlyId with select returns the matching row", async ({ prisma }) => {
1554+
const { organization, project, environment } = await seedEnvironment(prisma);
1555+
1556+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
1557+
const runId = "run_find_select_friendly_1";
1558+
1559+
await store.createRun(
1560+
buildCreateRunInput({
1561+
runId,
1562+
organizationId: organization.id,
1563+
projectId: project.id,
1564+
runtimeEnvironmentId: environment.id,
1565+
})
1566+
);
1567+
1568+
const run = await store.findRun({ friendlyId: "run_friendly_1" }, { select: { id: true } });
1569+
1570+
expect(run?.id).toBe(runId);
1571+
});
1572+
1573+
postgresTest("findRun returns null when no row matches", async ({ prisma }) => {
1574+
await seedEnvironment(prisma);
1575+
1576+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
1577+
1578+
const run = await store.findRun({ id: "missing" }, { select: { id: true } });
1579+
1580+
expect(run).toBeNull();
1581+
});
1582+
1583+
postgresTest("findRunOrThrow throws when no row matches", async ({ prisma }) => {
1584+
await seedEnvironment(prisma);
1585+
1586+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
1587+
1588+
await expect(store.findRunOrThrow({ id: "missing" }, { select: { id: true } })).rejects.toThrow();
1589+
});
1590+
1591+
postgresTest("findRun with include hydrates the relation", async ({ prisma }) => {
1592+
const { organization, project, environment } = await seedEnvironment(prisma);
1593+
1594+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
1595+
const runId = "run_find_include_1";
1596+
1597+
await store.createRun(
1598+
buildCreateRunInput({
1599+
runId,
1600+
organizationId: organization.id,
1601+
projectId: project.id,
1602+
runtimeEnvironmentId: environment.id,
1603+
})
1604+
);
1605+
1606+
const run = await store.findRun({ id: runId }, { include: { runtimeEnvironment: true } });
1607+
1608+
expect(run?.id).toBe(runId);
1609+
expect(run?.runtimeEnvironment).toBeDefined();
1610+
expect(run?.runtimeEnvironment.id).toBe(environment.id);
1611+
});
1612+
1613+
postgresTest("findRuns applies where/orderBy/take and returns ordered, limited rows", async ({ prisma }) => {
1614+
const { organization, project, environment } = await seedEnvironment(prisma);
1615+
1616+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
1617+
1618+
const earliest = new Date("2026-06-01T00:00:00.000Z");
1619+
const middle = new Date("2026-06-02T00:00:00.000Z");
1620+
const latest = new Date("2026-06-03T00:00:00.000Z");
1621+
1622+
const rows: Array<{ id: string; createdAt: Date }> = [
1623+
{ id: "run_find_many_earliest", createdAt: earliest },
1624+
{ id: "run_find_many_middle", createdAt: middle },
1625+
{ id: "run_find_many_latest", createdAt: latest },
1626+
];
1627+
1628+
for (const row of rows) {
1629+
await prisma.taskRun.create({
1630+
data: {
1631+
id: row.id,
1632+
engine: "V2",
1633+
status: "PENDING",
1634+
friendlyId: `${row.id}_friendly`,
1635+
runtimeEnvironmentId: environment.id,
1636+
environmentType: "DEVELOPMENT",
1637+
organizationId: organization.id,
1638+
projectId: project.id,
1639+
taskIdentifier: "my-task",
1640+
payload: "{}",
1641+
payloadType: "application/json",
1642+
traceContext: {},
1643+
traceId: `trace_${row.id}`,
1644+
spanId: `span_${row.id}`,
1645+
queue: "task/my-task",
1646+
isTest: false,
1647+
taskEventStore: "taskEvent",
1648+
depth: 0,
1649+
createdAt: row.createdAt,
1650+
},
1651+
});
1652+
}
1653+
1654+
const found = await store.findRuns({
1655+
where: { projectId: project.id },
1656+
select: { id: true },
1657+
orderBy: { createdAt: "desc" },
1658+
take: 2,
1659+
});
1660+
1661+
expect(found).toEqual([{ id: "run_find_many_latest" }, { id: "run_find_many_middle" }]);
1662+
});
1663+
1664+
postgresTest("findRun reads a just-written row when passed the writer client", async ({ prisma }) => {
1665+
const { organization, project, environment } = await seedEnvironment(prisma);
1666+
1667+
// Use a NoopRunStore-style read replica that must NOT be hit: pass the writer
1668+
// (prisma) explicitly so reads go through it for read-after-write consistency.
1669+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
1670+
const runId = "run_find_read_after_write_1";
1671+
1672+
await store.createRun(
1673+
buildCreateRunInput({
1674+
runId,
1675+
organizationId: organization.id,
1676+
projectId: project.id,
1677+
runtimeEnvironmentId: environment.id,
1678+
})
1679+
);
1680+
1681+
const run = await store.findRun({ id: runId }, { select: { id: true, status: true } }, prisma);
1682+
1683+
expect(run?.id).toBe(runId);
1684+
expect(run?.status).toBe("PENDING");
1685+
});
1686+
});

internal-packages/run-store/src/PostgresRunStore.ts

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,4 +617,89 @@ export class PostgresRunStore implements RunStore {
617617
data: { realtimeStreams: { push: streamId } },
618618
});
619619
}
620+
621+
findRun<S extends Prisma.TaskRunSelect>(
622+
where: Prisma.TaskRunWhereInput,
623+
args: { select: S },
624+
client?: PrismaClientOrTransaction
625+
): Promise<Prisma.TaskRunGetPayload<{ select: S }> | null>;
626+
findRun<I extends Prisma.TaskRunInclude>(
627+
where: Prisma.TaskRunWhereInput,
628+
args: { include: I },
629+
client?: PrismaClientOrTransaction
630+
): Promise<Prisma.TaskRunGetPayload<{ include: I }> | null>;
631+
async findRun(
632+
where: Prisma.TaskRunWhereInput,
633+
args: { select?: Prisma.TaskRunSelect; include?: Prisma.TaskRunInclude },
634+
client?: PrismaClientOrTransaction
635+
): Promise<unknown> {
636+
const prisma = client ?? this.readOnlyPrisma;
637+
638+
return prisma.taskRun.findFirst({
639+
where,
640+
...args,
641+
});
642+
}
643+
644+
findRunOrThrow<S extends Prisma.TaskRunSelect>(
645+
where: Prisma.TaskRunWhereInput,
646+
args: { select: S },
647+
client?: PrismaClientOrTransaction
648+
): Promise<Prisma.TaskRunGetPayload<{ select: S }>>;
649+
findRunOrThrow<I extends Prisma.TaskRunInclude>(
650+
where: Prisma.TaskRunWhereInput,
651+
args: { include: I },
652+
client?: PrismaClientOrTransaction
653+
): Promise<Prisma.TaskRunGetPayload<{ include: I }>>;
654+
async findRunOrThrow(
655+
where: Prisma.TaskRunWhereInput,
656+
args: { select?: Prisma.TaskRunSelect; include?: Prisma.TaskRunInclude },
657+
client?: PrismaClientOrTransaction
658+
): Promise<unknown> {
659+
const prisma = client ?? this.readOnlyPrisma;
660+
661+
return prisma.taskRun.findFirstOrThrow({
662+
where,
663+
...args,
664+
});
665+
}
666+
667+
findRuns<S extends Prisma.TaskRunSelect>(
668+
args: {
669+
where: Prisma.TaskRunWhereInput;
670+
select: S;
671+
orderBy?: Prisma.TaskRunOrderByWithRelationInput | Prisma.TaskRunOrderByWithRelationInput[];
672+
take?: number;
673+
skip?: number;
674+
cursor?: Prisma.TaskRunWhereUniqueInput;
675+
},
676+
client?: PrismaClientOrTransaction
677+
): Promise<Prisma.TaskRunGetPayload<{ select: S }>[]>;
678+
findRuns<I extends Prisma.TaskRunInclude>(
679+
args: {
680+
where: Prisma.TaskRunWhereInput;
681+
include: I;
682+
orderBy?: Prisma.TaskRunOrderByWithRelationInput | Prisma.TaskRunOrderByWithRelationInput[];
683+
take?: number;
684+
skip?: number;
685+
cursor?: Prisma.TaskRunWhereUniqueInput;
686+
},
687+
client?: PrismaClientOrTransaction
688+
): Promise<Prisma.TaskRunGetPayload<{ include: I }>[]>;
689+
async findRuns(
690+
args: {
691+
where: Prisma.TaskRunWhereInput;
692+
select?: Prisma.TaskRunSelect;
693+
include?: Prisma.TaskRunInclude;
694+
orderBy?: Prisma.TaskRunOrderByWithRelationInput | Prisma.TaskRunOrderByWithRelationInput[];
695+
take?: number;
696+
skip?: number;
697+
cursor?: Prisma.TaskRunWhereUniqueInput;
698+
},
699+
client?: PrismaClientOrTransaction
700+
): Promise<unknown> {
701+
const prisma = client ?? this.readOnlyPrisma;
702+
703+
return prisma.taskRun.findMany(args);
704+
}
620705
}

internal-packages/run-store/src/types.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,4 +319,50 @@ export interface RunStore {
319319
clearIdempotencyKey(params: ClearIdempotencyKeyInput, tx?: PrismaClientOrTransaction): Promise<{ count: number }>;
320320
pushTags(runId: string, tags: string[], where: { runtimeEnvironmentId: string }, tx?: PrismaClientOrTransaction): Promise<{ updatedAt: Date }>;
321321
pushRealtimeStream(runId: string, streamId: string, tx?: PrismaClientOrTransaction): Promise<void>;
322+
323+
// Read
324+
findRun<S extends Prisma.TaskRunSelect>(
325+
where: Prisma.TaskRunWhereInput,
326+
args: { select: S },
327+
client?: PrismaClientOrTransaction
328+
): Promise<Prisma.TaskRunGetPayload<{ select: S }> | null>;
329+
findRun<I extends Prisma.TaskRunInclude>(
330+
where: Prisma.TaskRunWhereInput,
331+
args: { include: I },
332+
client?: PrismaClientOrTransaction
333+
): Promise<Prisma.TaskRunGetPayload<{ include: I }> | null>;
334+
335+
findRunOrThrow<S extends Prisma.TaskRunSelect>(
336+
where: Prisma.TaskRunWhereInput,
337+
args: { select: S },
338+
client?: PrismaClientOrTransaction
339+
): Promise<Prisma.TaskRunGetPayload<{ select: S }>>;
340+
findRunOrThrow<I extends Prisma.TaskRunInclude>(
341+
where: Prisma.TaskRunWhereInput,
342+
args: { include: I },
343+
client?: PrismaClientOrTransaction
344+
): Promise<Prisma.TaskRunGetPayload<{ include: I }>>;
345+
346+
findRuns<S extends Prisma.TaskRunSelect>(
347+
args: {
348+
where: Prisma.TaskRunWhereInput;
349+
select: S;
350+
orderBy?: Prisma.TaskRunOrderByWithRelationInput | Prisma.TaskRunOrderByWithRelationInput[];
351+
take?: number;
352+
skip?: number;
353+
cursor?: Prisma.TaskRunWhereUniqueInput;
354+
},
355+
client?: PrismaClientOrTransaction
356+
): Promise<Prisma.TaskRunGetPayload<{ select: S }>[]>;
357+
findRuns<I extends Prisma.TaskRunInclude>(
358+
args: {
359+
where: Prisma.TaskRunWhereInput;
360+
include: I;
361+
orderBy?: Prisma.TaskRunOrderByWithRelationInput | Prisma.TaskRunOrderByWithRelationInput[];
362+
take?: number;
363+
skip?: number;
364+
cursor?: Prisma.TaskRunWhereUniqueInput;
365+
},
366+
client?: PrismaClientOrTransaction
367+
): Promise<Prisma.TaskRunGetPayload<{ include: I }>[]>;
322368
}

0 commit comments

Comments
 (0)