Skip to content

Commit 213185b

Browse files
d-csclaude
andcommitted
feat(webapp): dashboard parity for mollifier-buffered runs
Synthesise the SpanRun shape from buffer snapshots so the run-detail page's inspector panel renders identically to a PG-resident run. SSE log stream, realtime stream resources, logs-download and debug resource fall back to the buffer instead of 404-ing. Short-URL redirects resolve buffered runs to the canonical dashboard URL. Bulk-cancel scans the buffer alongside the ClickHouse selection so runs queued mid-burst are included. Trigger response now carries the snapshot's spanId so the dashboard's Run Test redirect opens the details panel without an extra click. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d7bdfb5 commit 213185b

25 files changed

Lines changed: 1503 additions & 106 deletions
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Dashboard parity for runs that live in the mollifier buffer. Synthesises
7+
the SpanRun shape from the buffer snapshot so the run-detail page's
8+
inspector panel renders identically to a PG-resident run. SSE log
9+
stream, realtime stream resources, logs-download and debug resources
10+
fall back to the buffer instead of 404-ing. Short-URL redirects
11+
(`/runs/{id}`, `/@/runs/{id}`, `/projects/v3/{ref}/runs/{id}`) resolve
12+
buffered runs to the canonical dashboard URL. Bulk-cancel scans the
13+
buffer alongside the ClickHouse selection so runs queued mid-burst are
14+
included in the action. Trigger response now carries the snapshot's
15+
spanId so the dashboard's Run Test redirect opens the details panel
16+
without an extra click.

apps/webapp/app/components/runs/MollifierBanner.tsx

Lines changed: 0 additions & 86 deletions
This file was deleted.

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,7 +1090,7 @@ const EnvironmentSchema = z
10901090
.transform((v) => v ?? process.env.REDIS_PASSWORD),
10911091
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
10921092
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
1093-
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
1093+
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().nonnegative().default(100),
10941094
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
10951095
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
10961096
TRIGGER_MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600),

apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import { logger } from "~/services/logger.server";
33
import { singleton } from "~/utils/singleton";
44
import { ABORT_REASON_SEND_ERROR, createSSELoader, SendFunction } from "~/utils/sse";
55
import { throttle } from "~/utils/throttle";
6+
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
7+
import { deserialiseSnapshot } from "@trigger.dev/redis-worker";
68
import { tracePubSub } from "~/v3/services/tracePubSub.server";
79

810
const PING_INTERVAL = 5_000;
@@ -37,17 +39,45 @@ export class RunStreamPresenter {
3739
},
3840
});
3941

40-
if (!run) {
42+
// Fall back to the mollifier buffer when the run isn't in PG yet.
43+
// The buffered run has no execution events to stream, but we still
44+
// attach a trace-pubsub subscription using the snapshot's traceId
45+
// so that the moment the drainer materialises the row and execution
46+
// begins, those events flow to this open SSE connection. Closing
47+
// with 404 would force the dashboard to keep retrying.
48+
let traceId: string | null = run?.traceId ?? null;
49+
if (!traceId) {
50+
const buffer = getMollifierBuffer();
51+
if (buffer) {
52+
try {
53+
const entry = await buffer.getEntry(runFriendlyId);
54+
if (entry) {
55+
const snapshot = deserialiseSnapshot<{ traceId?: string }>(entry.payload);
56+
if (typeof snapshot.traceId === "string") {
57+
traceId = snapshot.traceId;
58+
}
59+
}
60+
} catch (err) {
61+
logger.warn("RunStreamPresenter buffer fallback failed", {
62+
runFriendlyId,
63+
err: err instanceof Error ? err.message : String(err),
64+
});
65+
}
66+
}
67+
}
68+
69+
if (!traceId) {
4170
throw new Response("Not found", { status: 404 });
4271
}
72+
const resolvedRun = { traceId };
4373

4474
logger.info("RunStreamPresenter.start", {
4575
runFriendlyId,
46-
traceId: run.traceId,
76+
traceId: resolvedRun.traceId,
4777
});
4878

4979
// Subscribe to trace updates
50-
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId);
80+
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(resolvedRun.traceId);
5181

5282
// Only send max every 1 second
5383
const throttledSend = throttle(
@@ -105,7 +135,7 @@ export class RunStreamPresenter {
105135
cleanup: () => {
106136
logger.info("RunStreamPresenter.cleanup", {
107137
runFriendlyId,
108-
traceId: run.traceId,
138+
traceId: resolvedRun.traceId,
109139
});
110140

111141
// Remove message listener
@@ -119,13 +149,13 @@ export class RunStreamPresenter {
119149
.then(() => {
120150
logger.info("RunStreamPresenter.cleanup.unsubscribe succeeded", {
121151
runFriendlyId,
122-
traceId: run.traceId,
152+
traceId: resolvedRun.traceId,
123153
});
124154
})
125155
.catch((error) => {
126156
logger.error("RunStreamPresenter.cleanup.unsubscribe failed", {
127157
runFriendlyId,
128-
traceId: run.traceId,
158+
traceId: resolvedRun.traceId,
129159
error: {
130160
name: error.name,
131161
message: error.message,

apps/webapp/app/routes/@.runs.$runParam.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { prisma } from "~/db.server";
44
import { redirectWithErrorMessage } from "~/models/message.server";
55
import { requireUser } from "~/services/session.server";
66
import { impersonate, rootPath, v3RunPath } from "~/utils/pathBuilder";
7+
import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server";
78

89
const ParamsSchema = z.object({
910
runParam: z.string(),
@@ -51,6 +52,26 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
5152
});
5253

5354
if (!run) {
55+
// Admin impersonation route — bypass org membership so admins can
56+
// open any buffered run by friendlyId, mirroring the existing PG
57+
// behaviour above (no membership filter on the find).
58+
const buffered = await findBufferedRunRedirectInfo({
59+
runFriendlyId: runParam,
60+
userId: user.id,
61+
skipOrgMembershipCheck: true,
62+
});
63+
if (buffered) {
64+
return redirect(
65+
impersonate(
66+
v3RunPath(
67+
{ slug: buffered.organizationSlug },
68+
{ slug: buffered.projectSlug },
69+
{ slug: buffered.environmentSlug },
70+
{ friendlyId: runParam }
71+
)
72+
)
73+
);
74+
}
5475
return redirectWithErrorMessage(rootPath(), request, "Run doesn't exist", {
5576
ephemeral: false,
5677
});

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ import {
6767
useTree,
6868
} from "~/components/primitives/TreeView/TreeView";
6969
import { type NodesState } from "~/components/primitives/TreeView/reducer";
70-
import { MollifierBanner } from "~/components/runs/MollifierBanner";
7170
import { CancelRunDialog } from "~/components/runs/v3/CancelRunDialog";
7271
import { ReplayRunDialog } from "~/components/runs/v3/ReplayRunDialog";
7372
import { getRunFiltersFromSearchParams } from "~/components/runs/v3/RunFilters";
@@ -95,6 +94,7 @@ import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
9594
import { NextRunListPresenter } from "~/presenters/v3/NextRunListPresenter.server";
9695
import { RunEnvironmentMismatchError, RunPresenter } from "~/presenters/v3/RunPresenter.server";
9796
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
97+
import { buildSyntheticTraceForBufferedRun } from "~/v3/mollifier/syntheticTrace.server";
9898
import { clickhouseClient } from "~/services/clickhouseInstance.server";
9999
import { getImpersonationId } from "~/services/impersonation.server";
100100
import { logger } from "~/services/logger.server";
@@ -297,11 +297,10 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
297297

298298
return json({
299299
run: buffered.run,
300-
trace: undefined,
300+
trace: buffered.trace,
301301
maximumLiveReloadingSetting: env.MAXIMUM_LIVE_RELOADING_EVENTS,
302302
resizable: { parent, tree },
303303
runsList: null,
304-
isMollified: true,
305304
});
306305
}
307306

@@ -330,7 +329,6 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
330329
tree,
331330
},
332331
runsList,
333-
isMollified: false,
334332
});
335333
};
336334

@@ -376,13 +374,14 @@ async function tryMollifiedRunFallback(args: {
376374
userName: undefined,
377375
},
378376
},
377+
trace: buildSyntheticTraceForBufferedRun(buffered),
379378
};
380379
}
381380

382381
type LoaderData = SerializeFrom<typeof loader>;
383382

384383
export default function Page() {
385-
const { run, trace, maximumLiveReloadingSetting, runsList, resizable, isMollified } =
384+
const { run, trace, maximumLiveReloadingSetting, runsList, resizable } =
386385
useLoaderData<typeof loader>();
387386
const organization = useOrganization();
388387
const project = useProject();
@@ -502,7 +501,6 @@ export default function Page() {
502501
</PageAccessories>
503502
</NavBar>
504503
<PageBody scrollable={false}>
505-
{isMollified ? <MollifierBanner className="mx-3 mt-3" /> : null}
506504
{trace ? (
507505
<TraceView
508506
run={run}
@@ -691,9 +689,13 @@ function NoLogsView({ run, resizable }: Pick<LoaderData, "run" | "resizable">) {
691689
>
692690
<div className="grid h-full place-items-center">
693691
{daysSinceCompleted === undefined ? (
694-
<InfoPanel variant="info" icon={InformationCircleIcon} title="We delete old logs">
692+
<InfoPanel
693+
variant="info"
694+
icon={InformationCircleIcon}
695+
title="Waiting to start"
696+
>
695697
<Paragraph variant="small">
696-
We tidy up older logs to keep things running smoothly.
698+
This run is queued. Logs will appear here once it begins executing.
697699
</Paragraph>
698700
</InfoPanel>
699701
) : isWithinLogRetention ? (

apps/webapp/app/routes/projects.v3.$projectRef.runs.$runParam.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ import { type LoaderFunctionArgs, redirect } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { prisma } from "~/db.server";
44
import { requireUserId } from "~/services/session.server";
5-
import { v3RunSpanPath } from "~/utils/pathBuilder";
5+
import { v3RunPath, v3RunSpanPath } from "~/utils/pathBuilder";
6+
import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server";
67

78
const ParamsSchema = z.object({
89
projectRef: z.string(),
@@ -44,6 +45,28 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
4445
});
4546

4647
if (!run) {
48+
// Fall back to the mollifier buffer so a /projects/v3/{ref}/runs/{id}
49+
// share link works during the buffered window.
50+
const buffered = await findBufferedRunRedirectInfo({
51+
runFriendlyId: validatedParams.runParam,
52+
userId,
53+
});
54+
if (buffered) {
55+
const url = new URL(request.url);
56+
const searchParams = url.searchParams;
57+
if (!searchParams.has("span") && buffered.spanId) {
58+
searchParams.set("span", buffered.spanId);
59+
}
60+
return redirect(
61+
v3RunPath(
62+
{ slug: buffered.organizationSlug },
63+
{ slug: buffered.projectSlug },
64+
{ slug: buffered.environmentSlug },
65+
{ friendlyId: validatedParams.runParam },
66+
searchParams
67+
)
68+
);
69+
}
4770
throw new Response("Not found", { status: 404 });
4871
}
4972

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.sessions.$sessionId.$io.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
1313
import { requireUserId } from "~/services/session.server";
1414
import { EnvironmentParamSchema } from "~/utils/pathBuilder";
15+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
1516

1617
const ParamsSchema = z.object({
1718
runParam: z.string(),
@@ -59,6 +60,20 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
5960
});
6061

6162
if (!run) {
63+
// Buffered run has no Session linkage yet. Return 204 so the SDK's
64+
// SSE client treats this as "channel not yet active" and retries
65+
// naturally once the drainer materialises the row.
66+
const buffered = await findRunByIdWithMollifierFallback({
67+
runId: runParam,
68+
environmentId: environment.id,
69+
organizationId: project.organizationId,
70+
});
71+
if (buffered) {
72+
return new Response(null, {
73+
status: 204,
74+
headers: { "content-type": "text/event-stream; charset=utf-8" },
75+
});
76+
}
6277
return new Response("Run not found", { status: 404 });
6378
}
6479

0 commit comments

Comments
 (0)