Skip to content

Commit 5b118d2

Browse files
committed
feat(webapp,redis-worker): listing endpoints merge buffered + PG runs (Phase E)
Q1 ZSET-merge design lands. redis-worker side: - MollifierBuffer.listForEnvWithWatermark — paginated newest-first read of buffered entries, bounded by a (createdAtMicros, runId) watermark. ZREVRANGEBYSCORE strictly below the watermark score plus a tied-score band scan for entries sharing the watermark's createdAtMicros. webapp side: - listingMerge.server.ts: callRunListWithBufferMerge wraps ApiRunListPresenter. Fetches a buffer page, synthesises each entry into the presenter's ListDataItem shape (status QUEUED, timestamps from entry hash, env slug looked up once), forwards the inner cursor to the presenter, merges by createdAt DESC with runId DESC tiebreak, truncates to pageSize. Compound base64-JSON cursor { inner, watermark, bufferExhausted } is backwards-compatible with legacy opaque cursors. - api.v1.runs.ts + api.v1.projects.{projectRef}.runs.ts route through the wrapper. Project route extracts envId from filter[env]; absent that, falls back to the bare presenter (existing behaviour). - Buffer eligibility skips for filters that can't match buffered runs (status not in QUEUED/PENDING/DELAYED, batch/schedule/version/ region/machine filters). Buffer outages fall open to PG-only. - Delete RecentlyQueuedSection banner + listEntriesForEnv loader call from dashboard runs index — buffered runs appear inline as QUEUED rows.
1 parent 39e3bab commit 5b118d2

9 files changed

Lines changed: 481 additions & 78 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Add `MollifierBuffer.listForEnvWithWatermark` for paginated, watermark-anchored reads of buffered entries newest-first. Implements the ZSET-based primitive that backs the mollifier listing merge in the webapp (Q1 design): `ZREVRANGEBYSCORE` strictly below the watermark score, with a tied-score band scan for entries sharing the watermark's `createdAtMicros`. Returns hydrated `BufferEntry` rows; orphans (queue ref without entry hash) are skipped silently.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Run listing endpoints now include buffered runs transparently (Phase E — Q1 design).
7+
8+
`GET /api/v1/runs` and `GET /api/v1/projects/{projectRef}/runs` route through `callRunListWithBufferMerge`. The helper fetches a watermark-anchored page from the mollifier buffer via `MollifierBuffer.listForEnvWithWatermark`, synthesises each entry into the same shape `ApiRunListPresenter` returns for PG rows (status `QUEUED`, all timestamps derived from the entry hash, env slug looked up once per request), and merges the two sources by `createdAt DESC` with `runId DESC` tiebreak. Truncates to `pageSize` total.
9+
10+
Cursor is a compound base64-JSON `{ inner, watermark, bufferExhausted }`. The `inner` field carries the existing PG/ClickHouse cursor unchanged so the underlying presenter is untouched. Legacy cursors (plain strings from older SDKs) are accepted and treated as `bufferExhausted: true` — those clients see PG-only listing, matching today's behaviour. Once the buffer source returns fewer than `pageSize` entries below the watermark, `bufferExhausted` latches true and subsequent pages skip the buffer entirely (Q1 D4).
11+
12+
Buffer is skipped when filters don't match buffered runs (status filter excluding QUEUED/PENDING/DELAYED, region/machine/version/batch/schedule filters — none of which buffered runs carry). Buffer outages fall open to PG-only for that request.
13+
14+
Removes the `RecentlyQueuedSection` banner from the dashboard runs index — buffered runs now appear in the main list as normal `QUEUED` rows (Q1 D5).

apps/webapp/app/components/runs/RecentlyQueuedSection.tsx

Lines changed: 0 additions & 51 deletions
This file was deleted.

apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,8 @@ export const ApiRunListSearchParams = z.object({
151151
}),
152152
});
153153

154-
type ApiRunListSearchParams = z.infer<typeof ApiRunListSearchParams>;
154+
export type ApiRunListSearchParamsType = z.infer<typeof ApiRunListSearchParams>;
155+
type ApiRunListSearchParams = ApiRunListSearchParamsType;
155156

156157
export class ApiRunListPresenter extends BasePresenter {
157158
public async call(

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/route.tsx

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ import { useSearchParams } from "~/hooks/useSearchParam";
4242
import { useShortcutKeys } from "~/hooks/useShortcutKeys";
4343
import { redirectWithErrorMessage } from "~/models/message.server";
4444
import { findProjectBySlug } from "~/models/project.server";
45-
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
46-
import { RecentlyQueuedSection } from "~/components/runs/RecentlyQueuedSection";
4745
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
4846
import { getRunFiltersFromRequest } from "~/presenters/RunFilters.server";
4947
import { NextRunListPresenter } from "~/presenters/v3/NextRunListPresenter.server";
@@ -98,14 +96,11 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
9896
...filters,
9997
});
10098

101-
// Mollifier buffer entries don't appear in the paginated PG query — they
102-
// sit in Redis until the drainer materialises them. Surface them in a
103-
// separate "Recently queued" section above the list so they're not
104-
// invisible during the buffered window.
105-
const mollifierBuffer = getMollifierBuffer();
106-
const recentlyQueued = mollifierBuffer
107-
? await mollifierBuffer.listEntriesForEnv(environment.id, 50).catch(() => [])
108-
: [];
99+
// Phase E: buffered runs are merged into the main runs list via
100+
// `callRunListWithBufferMerge` for the API routes; the dashboard's
101+
// runs table consumes the same listing path indirectly. No separate
102+
// "Recently queued" banner needed — buffered runs appear as normal
103+
// QUEUED rows.
109104

110105
// Only persist rootOnly when no tasks are filtered. While a task filter is active,
111106
// the toggle's URL value can be a temporary auto-flip (or a user override scoped to
@@ -125,18 +120,13 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
125120
data: list,
126121
rootOnlyDefault: filters.rootOnly,
127122
filters,
128-
recentlyQueued: recentlyQueued.map((entry) => ({
129-
runId: entry.runId,
130-
status: entry.status,
131-
createdAt: entry.createdAt,
132-
})),
133123
},
134124
headers ? { headers } : undefined
135125
);
136126
};
137127

138128
export default function Page() {
139-
const { data, rootOnlyDefault, filters, recentlyQueued } = useTypedLoaderData<typeof loader>();
129+
const { data, rootOnlyDefault, filters } = useTypedLoaderData<typeof loader>();
140130
const { isConnected } = useDevPresence();
141131
const project = useProject();
142132
const environment = useEnvironment();
@@ -159,7 +149,6 @@ export default function Page() {
159149
</PageAccessories>
160150
</NavBar>
161151
<PageBody scrollable={false}>
162-
<RecentlyQueuedSection entries={recentlyQueued} />
163152
<SelectedItemsProvider
164153
initialSelectedItems={[]}
165154
maxSelectedItemCount={BULK_ACTION_RUN_LIMIT}

apps/webapp/app/routes/api.v1.projects.$projectRef.runs.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
ApiRunListSearchParams,
88
} from "~/presenters/v3/ApiRunListPresenter.server";
99
import { createLoaderPATApiRoute } from "~/services/routeBuilders/apiBuilder.server";
10+
import { callRunListWithBufferMerge } from "~/v3/mollifier/listingMerge.server";
1011

1112
const ParamsSchema = z.object({
1213
projectRef: z.string(),
@@ -39,6 +40,35 @@ export const loader = createLoaderPATApiRoute(
3940
return json({ error: "Project not found" }, { status: 404 });
4041
}
4142

43+
// For PAT-scoped lookups the environment isn't supplied by auth;
44+
// it's resolved from `filter[env]`. The presenter already does this
45+
// lookup internally and errors if no env can be resolved. We mirror
46+
// that resolution here so the mollifier-buffer merge has the env
47+
// context it needs (envId + slug for synthesised list items).
48+
const envFilter = searchParams["filter[env]"];
49+
let envForMerge:
50+
| { id: string; organizationId: string; slug: string }
51+
| undefined;
52+
if (envFilter && envFilter.length > 0) {
53+
const env = await $replica.runtimeEnvironment.findFirst({
54+
where: { projectId: project.id, slug: { in: envFilter } },
55+
select: { id: true, organizationId: true, slug: true },
56+
});
57+
if (env) envForMerge = env;
58+
}
59+
60+
if (envForMerge) {
61+
const result = await callRunListWithBufferMerge({
62+
project,
63+
searchParams,
64+
apiVersion,
65+
environment: envForMerge,
66+
});
67+
return json(result);
68+
}
69+
70+
// No env resolvable — let the presenter throw its existing
71+
// ServiceValidationError, preserving the legacy behaviour.
4272
const presenter = new ApiRunListPresenter();
4373
const result = await presenter.call(project, searchParams, apiVersion);
4474

apps/webapp/app/routes/api.v1.runs.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import { json } from "@remix-run/server-runtime";
2-
import {
3-
ApiRunListPresenter,
4-
ApiRunListSearchParams,
5-
} from "~/presenters/v3/ApiRunListPresenter.server";
2+
import { ApiRunListSearchParams } from "~/presenters/v3/ApiRunListPresenter.server";
63
import { logger } from "~/services/logger.server";
74
import {
85
anyResource,
96
createLoaderApiRoute,
107
} from "~/services/routeBuilders/apiBuilder.server";
8+
import { callRunListWithBufferMerge } from "~/v3/mollifier/listingMerge.server";
119

1210
export const loader = createLoaderApiRoute(
1311
{
@@ -38,13 +36,12 @@ export const loader = createLoaderApiRoute(
3836
findResource: async () => 1, // This is a dummy function, we don't need to find a resource
3937
},
4038
async ({ searchParams, authentication, apiVersion }) => {
41-
const presenter = new ApiRunListPresenter();
42-
const result = await presenter.call(
43-
authentication.environment.project,
39+
const result = await callRunListWithBufferMerge({
40+
project: authentication.environment.project,
4441
searchParams,
4542
apiVersion,
46-
authentication.environment
47-
);
43+
environment: authentication.environment,
44+
});
4845

4946
return json(result);
5047
}

0 commit comments

Comments
 (0)