Skip to content

Commit 126b05f

Browse files
committed
refactor(webapp): route API and loader TaskRun reads through the run store
Relocate the route and loader TaskRun reads to the RunStore read methods, preserving the exact client per site, including the replica-resolve then writer-recheck realtime paths. Behavior-preserving.
1 parent 5683952 commit 126b05f

38 files changed

Lines changed: 621 additions & 477 deletions

File tree

apps/webapp/app/routes/@.runs.$runParam.ts

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { redirect, type LoaderFunctionArgs } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { prisma } from "~/db.server";
4+
import { runStore } from "~/v3/runStore.server";
45
import { redirectWithErrorMessage } from "~/models/message.server";
56
import { requireUser } from "~/services/session.server";
67
import { impersonate, rootPath, v3RunPath, v3RunSpanPath } from "~/utils/pathBuilder";
@@ -28,29 +29,32 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
2829
);
2930
}
3031

31-
const run = await prisma.taskRun.findFirst({
32-
where: {
32+
const run = await runStore.findRun(
33+
{
3334
friendlyId: runParam,
3435
},
35-
select: {
36-
spanId: true,
37-
runtimeEnvironment: {
38-
select: {
39-
slug: true,
36+
{
37+
select: {
38+
spanId: true,
39+
runtimeEnvironment: {
40+
select: {
41+
slug: true,
42+
},
4043
},
41-
},
42-
project: {
43-
select: {
44-
slug: true,
45-
organization: {
46-
select: {
47-
slug: true,
44+
project: {
45+
select: {
46+
slug: true,
47+
organization: {
48+
select: {
49+
slug: true,
50+
},
4851
},
4952
},
5053
},
5154
},
5255
},
53-
});
56+
prisma
57+
);
5458

5559
if (!run) {
5660
// Admin impersonation route — bypass org membership so admins can

apps/webapp/app/routes/admin.api.v1.runs-replication.backfill.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { type TaskRun } from "@trigger.dev/database";
33
import { z } from "zod";
44
import { prisma } from "~/db.server";
5+
import { runStore } from "~/v3/runStore.server";
56
import { logger } from "~/services/logger.server";
67
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
78
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
@@ -25,14 +26,17 @@ export async function action({ request }: ActionFunctionArgs) {
2526
const runs: TaskRun[] = [];
2627
for (let i = 0; i < runIds.length; i += MAX_BATCH_SIZE) {
2728
const batch = runIds.slice(i, i + MAX_BATCH_SIZE);
28-
const batchRuns = await prisma.taskRun.findMany({
29-
where: {
30-
id: { in: batch },
31-
status: {
32-
in: FINAL_RUN_STATUSES,
29+
const batchRuns = await runStore.findRuns(
30+
{
31+
where: {
32+
id: { in: batch },
33+
status: {
34+
in: FINAL_RUN_STATUSES,
35+
},
3336
},
3437
},
35-
});
38+
prisma
39+
);
3640
runs.push(...batchRuns);
3741
}
3842

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
} from "@trigger.dev/core/v3";
77
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
88
import { $replica } from "~/db.server";
9+
import { runStore } from "~/v3/runStore.server";
910
import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server";
1011
import {
1112
deleteInputStreamWaitpoint,
@@ -32,18 +33,21 @@ const { action, loader } = createActionApiRoute(
3233
},
3334
async ({ authentication, body, params }) => {
3435
try {
35-
const run = await $replica.taskRun.findFirst({
36-
where: {
36+
const run = await runStore.findRun(
37+
{
3738
friendlyId: params.runFriendlyId,
3839
runtimeEnvironmentId: authentication.environment.id,
3940
},
40-
select: {
41-
id: true,
42-
friendlyId: true,
43-
realtimeStreamsVersion: true,
44-
streamBasinName: true,
41+
{
42+
select: {
43+
id: true,
44+
friendlyId: true,
45+
realtimeStreamsVersion: true,
46+
streamBasinName: true,
47+
},
4548
},
46-
});
49+
$replica
50+
);
4751

4852
if (!run) {
4953
return json({ error: "Run not found" }, { status: 404 });

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
77
import { z } from "zod";
88
import { $replica } from "~/db.server";
9+
import { runStore } from "~/v3/runStore.server";
910
import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server";
1011
import {
1112
canonicalSessionAddressingKey,
@@ -38,17 +39,20 @@ const { action, loader } = createActionApiRoute(
3839
},
3940
async ({ authentication, body, params }) => {
4041
try {
41-
const run = await $replica.taskRun.findFirst({
42-
where: {
42+
const run = await runStore.findRun(
43+
{
4344
friendlyId: params.runFriendlyId,
4445
runtimeEnvironmentId: authentication.environment.id,
4546
},
46-
select: {
47-
id: true,
48-
friendlyId: true,
49-
realtimeStreamsVersion: true,
47+
{
48+
select: {
49+
id: true,
50+
friendlyId: true,
51+
realtimeStreamsVersion: true,
52+
},
5053
},
51-
});
54+
$replica
55+
);
5256

5357
if (!run) {
5458
return json({ error: "Run not found" }, { status: 404 });

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server
1717
import { ServiceValidationError } from "~/v3/services/common.server";
1818
import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server";
1919
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
20+
import { runStore } from "~/v3/runStore.server";
2021

2122
const ParamsSchema = z.object({
2223
runId: z.string(),
@@ -39,10 +40,11 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
3940

4041
const env = authenticationResult.environment;
4142

42-
const pgRun = await $replica.taskRun.findFirst({
43-
where: { friendlyId: parsed.data.runId, runtimeEnvironmentId: env.id },
44-
select: { metadata: true, metadataType: true },
45-
});
43+
const pgRun = await runStore.findRun(
44+
{ friendlyId: parsed.data.runId, runtimeEnvironmentId: env.id },
45+
{ select: { metadata: true, metadataType: true } },
46+
$replica
47+
);
4648
if (pgRun) {
4749
return json({ metadata: pgRun.metadata, metadataType: pgRun.metadataType }, { status: 200 });
4850
}

apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
1111
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
1212
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
1313
import { buildSyntheticSpanDetailBody } from "~/v3/mollifier/syntheticApiResponses.server";
14+
import { runStore } from "~/v3/runStore.server";
1415

1516
const ParamsSchema = z.object({
1617
runId: z.string(),
@@ -28,9 +29,10 @@ type ResolvedRun =
2829
| { source: "buffer"; run: NonNullable<Awaited<ReturnType<typeof findRunByIdWithMollifierFallback>>> };
2930

3031
async function findPgRun(runId: string, environmentId: string) {
31-
return $replica.taskRun.findFirst({
32-
where: { friendlyId: runId, runtimeEnvironmentId: environmentId },
33-
});
32+
return runStore.findRun(
33+
{ friendlyId: runId, runtimeEnvironmentId: environmentId },
34+
$replica
35+
);
3436
}
3537

3638
export const loader = createLoaderApiRoute(
@@ -121,19 +123,22 @@ export const loader = createLoaderApiRoute(
121123
? extractAISpanData(span.properties as Record<string, unknown>, durationMs)
122124
: undefined;
123125

124-
const triggeredRuns = await $replica.taskRun.findMany({
125-
take: 50,
126-
select: {
127-
friendlyId: true,
128-
taskIdentifier: true,
129-
status: true,
130-
createdAt: true,
131-
},
132-
where: {
133-
runtimeEnvironmentId: authentication.environment.id,
134-
parentSpanId: params.spanId,
126+
const triggeredRuns = await runStore.findRuns(
127+
{
128+
take: 50,
129+
select: {
130+
friendlyId: true,
131+
taskIdentifier: true,
132+
status: true,
133+
createdAt: true,
134+
},
135+
where: {
136+
runtimeEnvironmentId: authentication.environment.id,
137+
parentSpanId: params.spanId,
138+
},
135139
},
136-
});
140+
$replica
141+
);
137142

138143
const properties =
139144
span.properties &&

apps/webapp/app/routes/api.v1.runs.$runId.trace.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
1010
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
1111
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
1212
import { buildSyntheticTraceBody } from "~/v3/mollifier/syntheticApiResponses.server";
13+
import { runStore } from "~/v3/runStore.server";
1314

1415
const ParamsSchema = z.object({
1516
runId: z.string(), // This is the run friendly ID
@@ -26,9 +27,10 @@ type ResolvedRun =
2627
| { source: "buffer"; run: NonNullable<Awaited<ReturnType<typeof findRunByIdWithMollifierFallback>>> };
2728

2829
async function findPgRun(runId: string, environmentId: string) {
29-
return $replica.taskRun.findFirst({
30-
where: { friendlyId: runId, runtimeEnvironmentId: environmentId },
31-
});
30+
return runStore.findRun(
31+
{ friendlyId: runId, runtimeEnvironmentId: environmentId },
32+
$replica
33+
);
3234
}
3335

3436
export const loader = createLoaderApiRoute(

apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { json } from "@remix-run/server-runtime";
33
import type { TaskRun } from "@trigger.dev/database";
44
import { z } from "zod";
55
import { prisma } from "~/db.server";
6+
import { runStore } from "~/v3/runStore.server";
67
import { authenticateApiRequest } from "~/services/apiAuth.server";
78
import { logger } from "~/services/logger.server";
89
import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server";
@@ -73,12 +74,13 @@ export async function action({ request, params }: ActionFunctionArgs) {
7374
// filter beyond friendlyId is the existing semantic; findFirst with
7475
// env scoping tightens it minimally without changing behaviour for
7576
// a correctly-authed caller.
76-
let taskRun: TaskRun | null = await prisma.taskRun.findFirst({
77-
where: {
77+
let taskRun: TaskRun | null = await runStore.findRun(
78+
{
7879
friendlyId: runParam,
7980
runtimeEnvironmentId: env.id,
8081
},
81-
});
82+
prisma
83+
);
8284

8385
if (!taskRun) {
8486
// Buffered fallback. SyntheticRun carries every field

apps/webapp/app/routes/api.v1.sessions.$session.end-and-continue.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
anyResource,
1313
createActionApiRoute,
1414
} from "~/services/routeBuilders/apiBuilder.server";
15+
import { runStore } from "~/v3/runStore.server";
1516

1617
const ParamsSchema = z.object({
1718
session: z.string(),
@@ -83,13 +84,14 @@ const { action, loader } = createActionApiRoute(
8384
// SDK exposes via `ctx.run.id`). Internally `Session.currentRunId`
8485
// stores the TaskRun.id cuid, so resolve before handing to the
8586
// optimistic-claim service.
86-
const callingRun = await $replica.taskRun.findFirst({
87-
where: {
87+
const callingRun = await runStore.findRun(
88+
{
8889
friendlyId: body.callingRunId,
8990
runtimeEnvironmentId: authentication.environment.id,
9091
},
91-
select: { id: true },
92-
});
92+
{ select: { id: true } },
93+
$replica
94+
);
9395
if (!callingRun) {
9496
return json({ error: "callingRunId not found in this environment" }, { status: 404 });
9597
}
@@ -118,10 +120,11 @@ const { action, loader } = createActionApiRoute(
118120
// `$replica`. A replica miss here would silently fall back to
119121
// returning the internal cuid, which the public API contract
120122
// says is a friendlyId.
121-
const run = await prisma.taskRun.findFirst({
122-
where: { id: result.runId },
123-
select: { friendlyId: true },
124-
});
123+
const run = await runStore.findRun(
124+
{ id: result.runId },
125+
{ select: { friendlyId: true } },
126+
prisma
127+
);
125128

126129
const responseBody: EndAndContinueSessionResponseBody = {
127130
runId: run?.friendlyId ?? result.runId,

apps/webapp/app/routes/api.v1.sessions.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {
2929
createLoaderApiRoute,
3030
} from "~/services/routeBuilders/apiBuilder.server";
3131
import { ServiceValidationError } from "~/v3/services/common.server";
32+
import { runStore } from "~/v3/runStore.server";
3233

3334
function asArray<T>(value: T | T[] | undefined): T[] | undefined {
3435
if (value === undefined) return undefined;
@@ -264,10 +265,11 @@ const { action } = createActionApiRoute(
264265
// Read-after-write: the run was just triggered in this request,
265266
// so go to the writer rather than $replica. Replica lag here
266267
// would null this out and turn a successful create into a 500.
267-
const run = await prisma.taskRun.findFirst({
268-
where: { id: ensureResult.runId },
269-
select: { friendlyId: true },
270-
});
268+
const run = await runStore.findRun(
269+
{ id: ensureResult.runId },
270+
{ select: { friendlyId: true } },
271+
prisma
272+
);
271273
if (!run) {
272274
throw new Error(`Triggered run ${ensureResult.runId} not found`);
273275
}

0 commit comments

Comments
 (0)