Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/route-taskrun-reads-through-run-store.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Route Postgres task run reads through the run store so they can be retargeted to a different backing store without changing call sites.
14 changes: 9 additions & 5 deletions apps/webapp/app/models/runtimeEnvironment.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { AuthenticatedEnvironment } from "@internal/run-engine";
import type { Prisma, PrismaClientOrTransaction, RuntimeEnvironment } from "@trigger.dev/database";
import { $replica, prisma } from "~/db.server";
import { runStore } from "~/v3/runStore.server";
import { logger } from "~/services/logger.server";
import { getUsername } from "~/utils/username";
import { sanitizeBranchName } from "@trigger.dev/core/v3/utils/gitBranch";
Expand Down Expand Up @@ -251,14 +252,17 @@ export async function findEnvironmentFromRun(
): Promise<EnvironmentFromRun | null> {
// The include (no select) already pulls every taskRun scalar, so runTags/batchId
// ride along for free — no extra query for the realtime publish to send a full record.
const taskRun = await (tx ?? $replica).taskRun.findFirst({
where: {
const taskRun = await runStore.findRun(
{
id: runId,
},
include: {
runtimeEnvironment: { include: authIncludeBase },
{
include: {
runtimeEnvironment: { include: authIncludeBase },
},
},
});
tx ?? $replica
);
if (!taskRun?.runtimeEnvironment) {
return null;
}
Expand Down
55 changes: 43 additions & 12 deletions apps/webapp/app/presenters/v3/ApiBatchResultsPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { BatchTaskRunExecutionResult } from "@trigger.dev/core/v3";
import { executionResultForTaskRun } from "~/models/taskRun.server";
import { executionResultForTaskRun, TaskRunWithAttempts } from "~/models/taskRun.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { runStore } from "~/v3/runStore.server";
import { BasePresenter } from "./basePresenter.server";

export class ApiBatchResultsPresenter extends BasePresenter {
Expand All @@ -16,16 +17,8 @@ export class ApiBatchResultsPresenter extends BasePresenter {
},
include: {
items: {
include: {
taskRun: {
include: {
attempts: {
orderBy: {
createdAt: "desc",
},
},
},
},
select: {
taskRunId: true,
},
},
},
Expand All @@ -35,10 +28,48 @@ export class ApiBatchResultsPresenter extends BasePresenter {
return undefined;
}

const taskRunIds = batchRun.items.map((item) => item.taskRunId);

if (taskRunIds.length === 0) {
return {
id: batchRun.friendlyId,
items: [],
};
}

const taskRuns = await runStore.findRuns(
{
where: { id: { in: taskRunIds } },
select: {
id: true,
friendlyId: true,
status: true,
taskIdentifier: true,
attempts: {
select: {
status: true,
output: true,
outputType: true,
error: true,
},
orderBy: {
createdAt: "desc",
},
},
},
},
this._prisma
);

const runMap = new Map(taskRuns.map((run) => [run.id, run]));

return {
id: batchRun.friendlyId,
items: batchRun.items
.map((item) => executionResultForTaskRun(item.taskRun))
.map((item) => {
const run = runMap.get(item.taskRunId);
return run ? executionResultForTaskRun(run as TaskRunWithAttempts) : undefined;
})
.filter(Boolean),
};
});
Expand Down
56 changes: 30 additions & 26 deletions apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
type SyntheticRun,
} from "~/v3/mollifier/readFallback.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
import { runStore } from "~/v3/runStore.server";
import { tracer } from "~/v3/tracer.server";
import { startSpanWithEnv } from "~/v3/tracing.server";

Expand Down Expand Up @@ -110,38 +111,41 @@ export class ApiRetrieveRunPresenter {
friendlyId: string,
env: AuthenticatedEnvironment,
): Promise<FoundRun | null> {
const pgRow = await $replica.taskRun.findFirst({
where: {
const pgRow = await runStore.findRun(
{
friendlyId,
runtimeEnvironmentId: env.id,
},
select: {
...commonRunSelect,
traceId: true,
payload: true,
payloadType: true,
output: true,
outputType: true,
error: true,
attempts: {
select: {
id: true,
{
select: {
...commonRunSelect,
traceId: true,
payload: true,
payloadType: true,
output: true,
outputType: true,
error: true,
attempts: {
select: {
id: true,
},
},
attemptNumber: true,
engine: true,
taskEventStore: true,
parentTaskRun: {
select: commonRunSelect,
},
rootTaskRun: {
select: commonRunSelect,
},
childRuns: {
select: commonRunSelect,
},
},
attemptNumber: true,
engine: true,
taskEventStore: true,
parentTaskRun: {
select: commonRunSelect,
},
rootTaskRun: {
select: commonRunSelect,
},
childRuns: {
select: commonRunSelect,
},
},
});
$replica
);

if (pgRow) return { ...pgRow, isBuffered: false };

Expand Down
18 changes: 11 additions & 7 deletions apps/webapp/app/presenters/v3/ApiRunResultPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { TaskRunExecutionResult } from "@trigger.dev/core/v3";
import { executionResultForTaskRun } from "~/models/taskRun.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { runStore } from "~/v3/runStore.server";
import { BasePresenter } from "./basePresenter.server";

export class ApiRunResultPresenter extends BasePresenter {
Expand All @@ -9,19 +10,22 @@ export class ApiRunResultPresenter extends BasePresenter {
env: AuthenticatedEnvironment
): Promise<TaskRunExecutionResult | undefined> {
return this.traceWithEnv("call", env, async (span) => {
const taskRun = await this._prisma.taskRun.findFirst({
where: {
const taskRun = await runStore.findRun(
{
friendlyId,
runtimeEnvironmentId: env.id,
},
include: {
attempts: {
orderBy: {
createdAt: "desc",
{
include: {
attempts: {
orderBy: {
createdAt: "desc",
},
},
},
},
});
this._prisma
);

if (!taskRun) {
return undefined;
Expand Down
8 changes: 5 additions & 3 deletions apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { getTaskIdentifiers } from "~/models/task.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server";
import { machinePresetFromRun } from "~/v3/machinePresets.server";
import { runStore } from "~/v3/runStore.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";

Expand Down Expand Up @@ -206,11 +207,12 @@ export class NextRunListPresenter {
let hasAnyRuns = runs.length > 0;

if (!hasAnyRuns) {
const firstRun = await this.replica.taskRun.findFirst({
where: {
const firstRun = await runStore.findRun(
{
runtimeEnvironmentId: environmentId,
},
});
this.replica
);

if (firstRun) {
hasAnyRuns = true;
Expand Down
108 changes: 56 additions & 52 deletions apps/webapp/app/presenters/v3/RunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
import { isFinalRunStatus } from "~/v3/taskStatus";
import { env } from "~/env.server";
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { runStore } from "~/v3/runStore.server";

type Result = Awaited<ReturnType<RunPresenter["call"]>>;
export type Run = Result["run"];
Expand Down Expand Up @@ -62,57 +63,8 @@ export class RunPresenter {
// buffer view. `findFirstOrThrow` would log a `PrismaClient error`
// every tick of the page poll, masking real DB issues with synthetic
// not-found noise.
const run = await this.#prismaClient.taskRun.findFirst({
select: {
id: true,
createdAt: true,
taskEventStore: true,
taskIdentifier: true,
number: true,
traceId: true,
spanId: true,
parentSpanId: true,
friendlyId: true,
status: true,
startedAt: true,
completedAt: true,
logsDeletedAt: true,
annotations: true,
rootTaskRun: {
select: {
friendlyId: true,
spanId: true,
createdAt: true,
},
},
parentTaskRun: {
select: {
friendlyId: true,
spanId: true,
createdAt: true,
},
},
runtimeEnvironment: {
select: {
id: true,
type: true,
slug: true,
organizationId: true,
orgMember: {
select: {
user: {
select: {
id: true,
name: true,
displayName: true,
},
},
},
},
},
},
},
where: {
const run = await runStore.findRun(
{
friendlyId: runFriendlyId,
project: {
slug: projectSlug,
Expand All @@ -125,7 +77,59 @@ export class RunPresenter {
},
},
},
});
{
select: {
id: true,
createdAt: true,
taskEventStore: true,
taskIdentifier: true,
number: true,
traceId: true,
spanId: true,
parentSpanId: true,
friendlyId: true,
status: true,
startedAt: true,
completedAt: true,
logsDeletedAt: true,
annotations: true,
rootTaskRun: {
select: {
friendlyId: true,
spanId: true,
createdAt: true,
},
},
parentTaskRun: {
select: {
friendlyId: true,
spanId: true,
createdAt: true,
},
},
runtimeEnvironment: {
select: {
id: true,
type: true,
slug: true,
organizationId: true,
orgMember: {
select: {
user: {
select: {
id: true,
name: true,
displayName: true,
},
},
},
},
},
},
},
},
this.#prismaClient
);

if (!run) {
throw new RunNotInPgError(runFriendlyId);
Expand Down
Loading