Skip to content

Commit 5b74b48

Browse files
committed
refactor(webapp): route service-layer TaskRun reads through the run store
Relocate the direct TaskRun reads in webapp services, run-engine concerns, realtime, mollifier and metadata to the RunStore read methods, preserving the exact client (writer, replica, or transaction) at each site. The run hydrator now receives the store by injection. Behavior-preserving.
1 parent cfa9052 commit 5b74b48

33 files changed

Lines changed: 642 additions & 528 deletions

apps/webapp/app/models/runtimeEnvironment.server.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { AuthenticatedEnvironment } from "@internal/run-engine";
22
import type { Prisma, PrismaClientOrTransaction, RuntimeEnvironment } from "@trigger.dev/database";
33
import { $replica, prisma } from "~/db.server";
4+
import { runStore } from "~/v3/runStore.server";
45
import { logger } from "~/services/logger.server";
56
import { getUsername } from "~/utils/username";
67
import { sanitizeBranchName } from "@trigger.dev/core/v3/utils/gitBranch";
@@ -251,14 +252,17 @@ export async function findEnvironmentFromRun(
251252
): Promise<EnvironmentFromRun | null> {
252253
// The include (no select) already pulls every taskRun scalar, so runTags/batchId
253254
// ride along for free — no extra query for the realtime publish to send a full record.
254-
const taskRun = await (tx ?? $replica).taskRun.findFirst({
255-
where: {
255+
const taskRun = await runStore.findRun(
256+
{
256257
id: runId,
257258
},
258-
include: {
259-
runtimeEnvironment: { include: authIncludeBase },
259+
{
260+
include: {
261+
runtimeEnvironment: { include: authIncludeBase },
262+
},
260263
},
261-
});
264+
tx ?? $replica
265+
);
262266
if (!taskRun?.runtimeEnvironment) {
263267
return null;
264268
}

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,19 @@ export class IdempotencyKeyConcern {
151151
}
152152

153153
const existingRun = idempotencyKey
154-
? await this.prisma.taskRun.findFirst({
155-
where: {
154+
? await runStore.findRun(
155+
{
156156
runtimeEnvironmentId: request.environment.id,
157157
idempotencyKey,
158158
taskIdentifier: request.taskId,
159159
},
160-
include: {
161-
associatedWaitpoint: true,
160+
{
161+
include: {
162+
associatedWaitpoint: true,
163+
},
162164
},
163-
})
165+
this.prisma
166+
)
164167
: undefined;
165168

166169
// Buffer fallback per the mollifier-idempotency design. PG missed —
@@ -329,14 +332,15 @@ export class IdempotencyKeyConcern {
329332
// Another concurrent trigger committed first. Re-resolve via the
330333
// existing checks: writer-side PG findFirst first (defeats
331334
// replica lag), then buffer fallback for the buffered case.
332-
const writerRun = await this.prisma.taskRun.findFirst({
333-
where: {
335+
const writerRun = await runStore.findRun(
336+
{
334337
runtimeEnvironmentId: request.environment.id,
335338
idempotencyKey,
336339
taskIdentifier: request.taskId,
337340
},
338-
include: { associatedWaitpoint: true },
339-
});
341+
{ include: { associatedWaitpoint: true } },
342+
this.prisma
343+
);
340344
if (writerRun) {
341345
return { isCached: true, run: writerRun };
342346
}

apps/webapp/app/runEngine/services/triggerFailedTask.server.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { getEventRepository } from "~/v3/eventRepository/index.server";
99
import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server";
1010
import { DefaultQueueManager } from "../concerns/queues.server";
1111
import type { TriggerTaskRequest } from "../types";
12+
import { runStore } from "~/v3/runStore.server";
1213

1314
export type TriggerFailedTaskRequest = {
1415
/** The task identifier (e.g. "my-task") */
@@ -82,12 +83,13 @@ export class TriggerFailedTaskService {
8283

8384
// Resolve parent run for rootTaskRunId and depth (same as triggerTask.server.ts)
8485
const parentRun = request.parentRunId
85-
? await this.prisma.taskRun.findFirst({
86-
where: {
86+
? await runStore.findRun(
87+
{
8788
id: RunId.fromFriendlyId(request.parentRunId),
8889
runtimeEnvironmentId: request.environment.id,
8990
},
90-
})
91+
this.prisma
92+
)
9193
: undefined;
9294

9395
const depth = parentRun ? parentRun.depth + 1 : 0;
@@ -275,12 +277,13 @@ export class TriggerFailedTaskService {
275277
let depth = 0;
276278

277279
if (opts.parentRunId) {
278-
const parentRun = await this.prisma.taskRun.findFirst({
279-
where: {
280+
const parentRun = await runStore.findRun(
281+
{
280282
id: RunId.fromFriendlyId(opts.parentRunId),
281283
runtimeEnvironmentId: opts.environmentId,
282284
},
283-
});
285+
this.prisma
286+
);
284287

285288
if (parentRun) {
286289
parentTaskRunId = parentRun.id;

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ import {
6767
import { mollifyTrigger } from "~/v3/mollifier/mollifierMollify.server";
6868
import { type MollifierBuffer } from "@trigger.dev/redis-worker";
6969
import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server";
70+
import { runStore } from "~/v3/runStore.server";
7071

7172
class NoopTriggerRacepointSystem implements TriggerRacepointSystem {
7273
async waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise<void> {
@@ -241,12 +242,13 @@ export class RunEngineTriggerTaskService {
241242

242243
// Get parent run if specified
243244
const parentRun = body.options?.parentRunId
244-
? await this.prisma.taskRun.findFirst({
245-
where: {
245+
? await runStore.findRun(
246+
{
246247
id: RunId.fromFriendlyId(body.options.parentRunId),
247248
runtimeEnvironmentId: environment.id,
248249
},
249-
})
250+
this.prisma
251+
)
250252
: undefined;
251253

252254
// Validate parent run

apps/webapp/app/services/metadata/updateMetadata.server.ts

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -189,18 +189,21 @@ export class UpdateMetadataService {
189189
// Fetch current run (+ the realtime membership keys, so a flush can publish)
190190
const run = yield* _(
191191
Effect.tryPromise(() =>
192-
this._prisma.taskRun.findFirst({
193-
where: { id: runId },
194-
select: {
195-
id: true,
196-
metadata: true,
197-
metadataType: true,
198-
metadataVersion: true,
199-
runtimeEnvironmentId: true,
200-
runTags: true,
201-
batchId: true,
192+
this._runStore.findRun(
193+
{ id: runId },
194+
{
195+
select: {
196+
id: true,
197+
metadata: true,
198+
metadataType: true,
199+
metadataVersion: true,
200+
runtimeEnvironmentId: true,
201+
runTags: true,
202+
batchId: true,
203+
},
202204
},
203-
})
205+
this._prisma
206+
)
204207
)
205208
);
206209

@@ -332,38 +335,41 @@ export class UpdateMetadataService {
332335
) {
333336
const runIdType = runId.startsWith("run_") ? "friendly" : "internal";
334337

335-
const taskRun = await this._prisma.taskRun.findFirst({
336-
where: environment
338+
const taskRun = await this._runStore.findRun(
339+
environment
337340
? {
338341
runtimeEnvironmentId: environment.id,
339342
...(runIdType === "internal" ? { id: runId } : { friendlyId: runId }),
340343
}
341344
: {
342345
...(runIdType === "internal" ? { id: runId } : { friendlyId: runId }),
343346
},
344-
select: {
345-
id: true,
346-
batchId: true,
347-
runTags: true,
348-
completedAt: true,
349-
status: true,
350-
metadata: true,
351-
metadataType: true,
352-
metadataVersion: true,
353-
parentTaskRun: {
354-
select: {
355-
id: true,
356-
status: true,
347+
{
348+
select: {
349+
id: true,
350+
batchId: true,
351+
runTags: true,
352+
completedAt: true,
353+
status: true,
354+
metadata: true,
355+
metadataType: true,
356+
metadataVersion: true,
357+
parentTaskRun: {
358+
select: {
359+
id: true,
360+
status: true,
361+
},
357362
},
358-
},
359-
rootTaskRun: {
360-
select: {
361-
id: true,
362-
status: true,
363+
rootTaskRun: {
364+
select: {
365+
id: true,
366+
status: true,
367+
},
363368
},
364369
},
365370
},
366-
});
371+
this._prisma
372+
);
367373

368374
if (!taskRun) {
369375
return;
@@ -427,10 +433,13 @@ export class UpdateMetadataService {
427433

428434
while (attempts <= MAX_RETRIES) {
429435
// Fetch the latest run data
430-
const run = await this._prisma.taskRun.findFirst({
431-
where: { id: runId },
432-
select: { metadata: true, metadataType: true, metadataVersion: true },
433-
});
436+
const run = await this._runStore.findRun(
437+
{ id: runId },
438+
{
439+
select: { metadata: true, metadataType: true, metadataVersion: true },
440+
},
441+
this._prisma
442+
);
434443

435444
if (!run) {
436445
throw new Error(`Run ${runId} not found`);

apps/webapp/app/services/realtime/nativeRealtimeClientInstance.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { getMeter } from "@internal/tracing";
22
import { $replica } from "~/db.server";
3+
import { runStore } from "~/v3/runStore.server";
34
import { env } from "~/env.server";
45
import { singleton } from "~/utils/singleton";
56
import { getCachedLimit } from "../platform.v3.server";
@@ -122,6 +123,7 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient {
122123
// One RunHydrator shared by the router and the client, so its single-flight + short-TTL cache covers both.
123124
const runReader = new RunHydrator({
124125
replica: $replica,
126+
runStore,
125127
cacheTtlMs: env.REALTIME_BACKEND_NATIVE_RUN_CACHE_TTL_MS,
126128
maxCacheEntries: env.REALTIME_BACKEND_NATIVE_RUN_CACHE_MAX_ENTRIES,
127129
});

apps/webapp/app/services/realtime/runReader.server.ts

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { type Prisma, type PrismaClient } from "@trigger.dev/database";
1+
import { type Prisma, type PrismaClient, type PrismaClientOrTransaction } from "@trigger.dev/database";
2+
import type { RunStore } from "@internal/run-store";
23
import { BoundedTtlCache } from "./boundedTtlCache";
34
import { RESERVED_COLUMNS, type RealtimeRunRow } from "./electricStreamProtocol.server";
45

@@ -79,6 +80,8 @@ export interface RunListResolver {
7980
export type RunHydratorOptions = {
8081
/** A read-replica Prisma client (`$replica`). Always Postgres. */
8182
replica: Pick<PrismaClient, "taskRun">;
83+
/** RunStore the reads are routed through; `replica` is passed as the read client. */
84+
runStore: RunStore;
8285
/** Read-through cache TTL (ms) collapsing duplicate refetches for the same run. Set 0 to disable. Defaults to 250ms. */
8386
cacheTtlMs?: number;
8487
/** Hard cap on cache entries before expired entries are swept. */
@@ -139,24 +142,28 @@ export class RunHydrator {
139142
if (ids.length === 0) {
140143
return [];
141144
}
142-
const rows = await this.options.replica.taskRun.findMany({
143-
where: {
144-
runtimeEnvironmentId: environmentId,
145-
id: { in: ids },
145+
const rows = await this.options.runStore.findRuns(
146+
{
147+
where: {
148+
runtimeEnvironmentId: environmentId,
149+
id: { in: ids },
150+
},
151+
select: buildHydratorSelect(skipColumns),
146152
},
147-
select: buildHydratorSelect(skipColumns),
148-
});
153+
this.options.replica as PrismaClientOrTransaction
154+
);
149155
return rows as unknown as RealtimeRunRow[];
150156
}
151157

152158
async #fetch(environmentId: string, runId: string): Promise<RealtimeRunRow | null> {
153-
const run = await this.options.replica.taskRun.findFirst({
154-
where: {
159+
const run = await this.options.runStore.findRun(
160+
{
155161
id: runId,
156162
runtimeEnvironmentId: environmentId,
157163
},
158-
select: RUN_HYDRATOR_SELECT,
159-
});
164+
{ select: RUN_HYDRATOR_SELECT },
165+
this.options.replica as PrismaClientOrTransaction
166+
);
160167

161168
return (run ?? null) as RealtimeRunRow | null;
162169
}

0 commit comments

Comments
 (0)