Skip to content

Commit a1fe5f7

Browse files
d-csclaude
andcommitted
feat(webapp): merge mollifier-buffered runs into the dashboard runs list
Buffered runs are prepended to the runs table on the runs list page so customers see freshly-triggered work even while the gate is diverting. The merge uses a compound base64 cursor that wraps the PG presenter's own cursor — page 1 can be entirely buffered (top of the list), page 2 takes the buffered overflow and transitions into the PG content, and later pages drop the buffer scan entirely once it's been exhausted. Filter predicates (tasks, statuses, tags, period, from/to, isTest, runId) are evaluated against the buffer snapshot so the list reflects the same filter scope as the PG-side query. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 2052d3e commit a1fe5f7

2 files changed

Lines changed: 318 additions & 6 deletions

File tree

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/route.tsx

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ import { findProjectBySlug } from "~/models/project.server";
4545
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
4646
import { getRunFiltersFromRequest } from "~/presenters/RunFilters.server";
4747
import { NextRunListPresenter } from "~/presenters/v3/NextRunListPresenter.server";
48+
import {
49+
dashboardListCursor,
50+
mergeBufferedIntoDashboardList,
51+
} from "~/v3/mollifier/dashboardListingMerge.server";
4852
import { clickhouseClient } from "~/services/clickhouseInstance.server";
4953
import {
5054
setRootOnlyFilterPreference,
@@ -89,18 +93,43 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
8993

9094
const filters = await getRunFiltersFromRequest(request);
9195

96+
// Buffered-run pagination uses a compound cursor that wraps the PG
97+
// presenter's own cursor. Decode here so the inner PG cursor is
98+
// forwarded to the presenter; the merge helper reconstructs the
99+
// outgoing cursor based on what fits on this page.
100+
const decodedCursor = dashboardListCursor.decode(filters.cursor);
101+
const pgCursor = decodedCursor ? decodedCursor.inner : filters.cursor;
102+
const dashboardPageSize = 25;
103+
92104
const presenter = new NextRunListPresenter($replica, clickhouseClient);
93-
const list = presenter.call(project.organizationId, environment.id, {
105+
const baseList = presenter.call(project.organizationId, environment.id, {
94106
userId,
95107
projectId: project.id,
96108
...filters,
109+
cursor: pgCursor,
97110
});
98111

99-
// Phase E: buffered runs are merged into the main runs list via
100-
// `callRunListWithBufferMerge` for the API routes; the dashboard's
101-
// runs table consumes the same listing path indirectly. No separate
102-
// "Recently queued" banner needed — buffered runs appear as normal
103-
// QUEUED rows.
112+
// Prepend mollifier-buffered runs so customers see freshly-triggered
113+
// runs while the gate is diverting traffic. The merge happens inside
114+
// the deferred promise so the page still streams.
115+
const list = baseList.then((result) =>
116+
mergeBufferedIntoDashboardList({
117+
baseList: result,
118+
envId: environment.id,
119+
pageSize: dashboardPageSize,
120+
cursor: filters.cursor,
121+
filters: {
122+
tasks: filters.tasks,
123+
statuses: filters.statuses,
124+
tags: filters.tags,
125+
period: filters.period,
126+
from: filters.from,
127+
to: filters.to,
128+
isTest: filters.isTest,
129+
runId: filters.runId,
130+
},
131+
})
132+
);
104133

105134
// Only persist rootOnly when no tasks are filtered. While a task filter is active,
106135
// the toggle's URL value can be a temporary auto-flip (or a user override scoped to
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
import type { TaskRunStatus } from "@trigger.dev/database";
2+
import parseDuration from "parse-duration";
3+
import { deserialiseSnapshot, type MollifierBuffer } from "@trigger.dev/redis-worker";
4+
import type { NextRunList, NextRunListItem } from "~/presenters/v3/NextRunListPresenter.server";
5+
import { logger } from "~/services/logger.server";
6+
import { getMollifierBuffer } from "./mollifierBuffer.server";
7+
8+
// Subset of the dashboard's runs-list filters that we can evaluate
9+
// against a buffer snapshot. Filters that depend on PG-only fields
10+
// (versions, batchId, bulkId, scheduleId, etc.) are silently ignored —
11+
// a buffered run can't match those anyway.
12+
export type DashboardBufferedFilters = {
13+
tasks?: string[];
14+
tags?: string[];
15+
statuses?: TaskRunStatus[];
16+
period?: string;
17+
from?: number;
18+
to?: number;
19+
isTest?: boolean;
20+
runId?: string[];
21+
};
22+
23+
type BufferEntryLike = { runId: string; createdAt: Date };
24+
25+
function matchesFilter(
26+
snapshot: Record<string, unknown>,
27+
entry: BufferEntryLike,
28+
filters: DashboardBufferedFilters,
29+
): boolean {
30+
if (filters.tasks?.length) {
31+
const taskId = snapshot.taskIdentifier;
32+
if (typeof taskId !== "string" || !filters.tasks.includes(taskId)) return false;
33+
}
34+
35+
// A buffered run is functionally QUEUED / PENDING — when the filter
36+
// restricts statuses we only match if those are wanted.
37+
if (filters.statuses?.length) {
38+
const bufferedStatuses: TaskRunStatus[] = ["PENDING", "QUEUED" as TaskRunStatus];
39+
if (!filters.statuses.some((s) => bufferedStatuses.includes(s))) return false;
40+
}
41+
42+
if (filters.tags?.length) {
43+
const snapshotTags = Array.isArray(snapshot.tags) ? snapshot.tags : [];
44+
const overlap = filters.tags.some((t) => snapshotTags.includes(t));
45+
if (!overlap) return false;
46+
}
47+
48+
if (filters.period) {
49+
const ms = parseDuration(filters.period);
50+
if (typeof ms === "number" && ms > 0) {
51+
const earliest = Date.now() - ms;
52+
if (entry.createdAt.getTime() < earliest) return false;
53+
}
54+
} else if (typeof filters.from === "number" || typeof filters.to === "number") {
55+
const t = entry.createdAt.getTime();
56+
if (typeof filters.from === "number" && t < filters.from) return false;
57+
if (typeof filters.to === "number" && t > filters.to) return false;
58+
}
59+
60+
if (typeof filters.isTest === "boolean") {
61+
if (snapshot.isTest !== filters.isTest) return false;
62+
}
63+
64+
if (filters.runId?.length) {
65+
if (!filters.runId.includes(entry.runId)) return false;
66+
}
67+
68+
return true;
69+
}
70+
71+
function snapshotToNextRunListItem(
72+
entry: BufferEntryLike,
73+
snapshot: Record<string, unknown>,
74+
environment: NextRunListItem["environment"],
75+
): NextRunListItem {
76+
const cancelledAtRaw = typeof snapshot.cancelledAt === "string" ? snapshot.cancelledAt : undefined;
77+
const cancelled = !!cancelledAtRaw;
78+
const queueRaw = typeof snapshot.queue === "string" ? snapshot.queue : "task/";
79+
const tags = Array.isArray(snapshot.tags)
80+
? (snapshot.tags as unknown[]).filter((t): t is string => typeof t === "string").sort((a, b) => a.localeCompare(b))
81+
: [];
82+
return {
83+
id: entry.runId,
84+
number: 1,
85+
friendlyId: entry.runId,
86+
createdAt: entry.createdAt.toISOString(),
87+
updatedAt: cancelledAtRaw ?? entry.createdAt.toISOString(),
88+
startedAt: undefined,
89+
delayUntil: undefined,
90+
hasFinished: cancelled,
91+
finishedAt: cancelledAtRaw,
92+
isTest: snapshot.isTest === true,
93+
status: cancelled ? ("CANCELED" as TaskRunStatus) : ("PENDING" as TaskRunStatus),
94+
version: undefined,
95+
taskIdentifier: typeof snapshot.taskIdentifier === "string" ? snapshot.taskIdentifier : "",
96+
spanId: typeof snapshot.spanId === "string" ? snapshot.spanId : "",
97+
isReplayable: true,
98+
isCancellable: !cancelled,
99+
isPending: !cancelled,
100+
environment,
101+
idempotencyKey: typeof snapshot.idempotencyKey === "string" ? snapshot.idempotencyKey : undefined,
102+
ttl: typeof snapshot.ttl === "string" ? snapshot.ttl : undefined,
103+
expiredAt: undefined,
104+
costInCents: 0,
105+
baseCostInCents: 0,
106+
usageDurationMs: 0,
107+
tags,
108+
depth: typeof snapshot.depth === "number" ? snapshot.depth : 0,
109+
rootTaskRunId: null,
110+
metadata: typeof snapshot.metadata === "string" ? snapshot.metadata : null,
111+
metadataType: typeof snapshot.metadataType === "string" ? snapshot.metadataType : null,
112+
machinePreset: typeof snapshot.machine === "string" ? snapshot.machine : undefined,
113+
queue: {
114+
name: queueRaw.replace("task/", ""),
115+
type: queueRaw.startsWith("task/") ? "task" : "custom",
116+
},
117+
region: typeof snapshot.workerQueue === "string" ? snapshot.workerQueue : undefined,
118+
taskKind: "STANDARD",
119+
};
120+
}
121+
122+
export type MergeBufferedIntoDashboardListInput = {
123+
baseList: NextRunList;
124+
envId: string;
125+
filters: DashboardBufferedFilters;
126+
pageSize: number;
127+
// Opaque incoming cursor from the URL. Decoded as the compound shape
128+
// below when present; otherwise treated as a legacy PG-only cursor.
129+
cursor?: string;
130+
maxBufferedRuns?: number;
131+
};
132+
133+
export type MergeBufferedIntoDashboardListDeps = {
134+
getBuffer?: () => MollifierBuffer | null;
135+
};
136+
137+
const DEFAULT_MAX_BUFFERED_RUNS = 500;
138+
139+
// Compound cursor written into the runs list URL. `bufferOffset` is the
140+
// number of buffered entries already consumed by previous pages;
141+
// `bufferExhausted` short-circuits the buffer scan on subsequent pages
142+
// once we've handed out everything in the buffer. `inner` is the PG
143+
// presenter's own cursor (opaque to this layer).
144+
type DashboardListCursor = {
145+
inner?: string;
146+
bufferOffset: number;
147+
bufferExhausted: boolean;
148+
};
149+
150+
function encodeCursor(c: DashboardListCursor): string {
151+
return Buffer.from(JSON.stringify(c), "utf8").toString("base64url");
152+
}
153+
154+
function decodeCursor(raw: string | undefined): DashboardListCursor | undefined {
155+
if (!raw) return undefined;
156+
try {
157+
const json = Buffer.from(raw, "base64url").toString("utf8");
158+
const parsed = JSON.parse(json);
159+
if (
160+
typeof parsed === "object" &&
161+
parsed !== null &&
162+
typeof parsed.bufferOffset === "number" &&
163+
typeof parsed.bufferExhausted === "boolean" &&
164+
(parsed.inner === undefined || typeof parsed.inner === "string")
165+
) {
166+
return parsed as DashboardListCursor;
167+
}
168+
} catch {
169+
// Falls through to "legacy" — the caller should treat the raw value
170+
// as a PG-only cursor.
171+
}
172+
return undefined;
173+
}
174+
175+
// Surface the encode/decode helpers so the loader can carry the
176+
// compound cursor through to the presenter's `cursor` parameter.
177+
export const dashboardListCursor = {
178+
encode: encodeCursor,
179+
decode: decodeCursor,
180+
};
181+
182+
// Prepend buffered runs to the dashboard's runs list so customers see
183+
// their freshly-triggered runs immediately, even while the gate is
184+
// diverting traffic. Entries are scanned for env, filtered, shaped into
185+
// NextRunListItem, and merged with the PG presenter result. The merged
186+
// list is truncated to `pageSize` and a compound cursor is written for
187+
// the next page so buffered entries that overflow page N show up on
188+
// page N+1, transitioning into mixed PG content once the buffer is
189+
// exhausted.
190+
export async function mergeBufferedIntoDashboardList(
191+
input: MergeBufferedIntoDashboardListInput,
192+
deps: MergeBufferedIntoDashboardListDeps = {},
193+
): Promise<NextRunList> {
194+
const buffer = (deps.getBuffer ?? getMollifierBuffer)();
195+
if (!buffer) return input.baseList;
196+
197+
const cursor = decodeCursor(input.cursor);
198+
const bufferOffset = cursor?.bufferOffset ?? 0;
199+
const bufferExhausted = cursor?.bufferExhausted ?? false;
200+
201+
if (bufferExhausted) {
202+
return input.baseList;
203+
}
204+
205+
const maxBuffered = input.maxBufferedRuns ?? DEFAULT_MAX_BUFFERED_RUNS;
206+
let entries;
207+
try {
208+
entries = await buffer.listEntriesForEnv(input.envId, maxBuffered);
209+
} catch (err) {
210+
logger.warn("dashboard buffered list merge failed", {
211+
envId: input.envId,
212+
err: err instanceof Error ? err.message : String(err),
213+
});
214+
return input.baseList;
215+
}
216+
if (entries.length === 0) return input.baseList;
217+
218+
const environment: NextRunListItem["environment"] = input.baseList.runs[0]?.environment ?? {
219+
id: input.envId,
220+
type: "DEVELOPMENT",
221+
slug: "dev",
222+
userId: undefined,
223+
userName: undefined,
224+
} as NextRunListItem["environment"];
225+
226+
const matchedBuffered: NextRunListItem[] = [];
227+
for (const entry of entries) {
228+
let snapshot: Record<string, unknown>;
229+
try {
230+
snapshot = deserialiseSnapshot(entry.payload) as Record<string, unknown>;
231+
} catch {
232+
continue;
233+
}
234+
if (!matchesFilter(snapshot, entry, input.filters)) continue;
235+
matchedBuffered.push(snapshotToNextRunListItem(entry, snapshot, environment));
236+
}
237+
238+
// Sort buffered newest-first so they appear above PG rows in the merged page.
239+
matchedBuffered.sort((a, b) => b.createdAt.localeCompare(a.createdAt));
240+
241+
// Slice off entries already consumed by previous pages.
242+
const pageBuffered = matchedBuffered.slice(bufferOffset, bufferOffset + input.pageSize);
243+
const newBufferOffset = bufferOffset + pageBuffered.length;
244+
const newBufferExhausted = newBufferOffset >= matchedBuffered.length;
245+
246+
// Determine how many PG rows to show on this page. The presenter was
247+
// already invoked with the inner cursor; we take its first
248+
// (pageSize - pageBuffered.length) rows.
249+
const remainingSlots = Math.max(0, input.pageSize - pageBuffered.length);
250+
const pgRows = input.baseList.runs.slice(0, remainingSlots);
251+
const pgPartiallyConsumed = pgRows.length < input.baseList.runs.length;
252+
253+
// Cursor for the next page: if we've shown all PG rows the presenter
254+
// returned, propagate the presenter's next cursor; otherwise reuse
255+
// the *current* inner cursor so the presenter re-fetches from the
256+
// same anchor and the unread PG rows show up next page.
257+
const nextInner = pgPartiallyConsumed
258+
? cursor?.inner
259+
: input.baseList.pagination.next;
260+
261+
const merged = [...pageBuffered, ...pgRows];
262+
const hasMoreBuffered = !newBufferExhausted;
263+
const hasMorePg = !!nextInner;
264+
265+
const next =
266+
hasMoreBuffered || hasMorePg
267+
? encodeCursor({
268+
inner: nextInner,
269+
bufferOffset: newBufferOffset,
270+
bufferExhausted: newBufferExhausted,
271+
})
272+
: undefined;
273+
274+
return {
275+
...input.baseList,
276+
runs: merged,
277+
hasAnyRuns: input.baseList.hasAnyRuns || merged.length > 0,
278+
pagination: {
279+
next,
280+
previous: input.baseList.pagination.previous,
281+
},
282+
};
283+
}

0 commit comments

Comments
 (0)