Skip to content

Commit 1ece983

Browse files
d-csclaude
andcommitted
revert(webapp): drop buffered scan from bulk-action service
The bulk-action confirmation count is sourced from ClickHouse, so PG rows not yet replicated to ClickHouse are silently excluded from both the count and the processing pass. Phase 4's first-batch mollifier- buffer scan broke that symmetry — buffered runs were processed without being counted, so a customer confirming "Replay ~0 runs" could see N buffered runs replayed without seeing them anywhere in the UI. Restore the eventually-consistent contract: bulk actions only target runs visible to ClickHouse. Buffered runs are picked up by subsequent bulk actions once they drain into PG → ClickHouse, mirroring how PG-not-yet-CH runs already work today. Removes `bulkActionBuffer.server.ts` (helper) and its container-backed test. Will reimplement once the buffered-runs UX (global status indicator) gives the customer a way to see and confirm against the buffered set. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4408743 commit 1ece983

4 files changed

Lines changed: 6 additions & 515 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Drop the first-batch mollifier-buffer scan from `BulkActionV2`. The action's confirmation count comes from ClickHouse (eventually consistent for PG-but-not-yet-replicated runs) and never included buffered runs, so processing buffered entries created a safety gap: a customer confirming "Replay ~0 runs" could see N buffered runs replayed they didn't know about. Bulk actions are now uniformly bound by what ClickHouse can see; buffered runs are picked up by subsequent bulk actions once they drain into PG → ClickHouse — matching the existing eventually-consistent contract for PG-not-yet-CH runs. Removes `bulkActionBuffer.server.ts` and its container-backed tests; the buffered-runs UX will be reimplemented when the global status indicator lands.

apps/webapp/app/v3/mollifier/bulkActionBuffer.server.ts

Lines changed: 0 additions & 247 deletions
This file was deleted.

apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@ import { logger } from "@trigger.dev/sdk";
2020
import { CancelTaskRunService } from "../cancelTaskRun.server";
2121
import { tryCatch } from "@trigger.dev/core";
2222
import { ReplayTaskRunService } from "../replayTaskRun.server";
23-
import {
24-
processBufferedCancelBulkAction,
25-
processBufferedReplayBulkAction,
26-
} from "~/v3/mollifier/bulkActionBuffer.server";
2723
import { timeFilters } from "~/components/runs/v3/SharedFilters";
2824
import parseDuration from "parse-duration";
2925
import { v3BulkActionPath } from "~/utils/pathBuilder";
@@ -177,45 +173,6 @@ export class BulkActionService extends BaseService {
177173
// Slice because we fetch an extra for the cursor
178174
const runIdsToProcess = runIds.slice(0, env.BULK_ACTION_BATCH_SIZE);
179175

180-
// First-batch only: also process runs that are currently sitting in
181-
// the mollifier buffer. They aren't in ClickHouse (no OTEL events
182-
// yet) so the listRunIds query never returned them. Gated on the
183-
// cursor being null so worker retries don't reprocess the same set.
184-
const isFirstBatch = !group.cursor;
185-
if (isFirstBatch && group.environmentId) {
186-
const bufferedFilters = {
187-
tasks: filters.tasks,
188-
tags: filters.tags,
189-
statuses: filters.statuses,
190-
period: filters.period,
191-
from: filters.from,
192-
to: filters.to,
193-
isTest: filters.isTest,
194-
runId: filters.runId,
195-
};
196-
const bufferedCtx = {
197-
envId: group.environmentId,
198-
organizationId: group.project.organizationId,
199-
filters: bufferedFilters,
200-
};
201-
if (group.type === BulkActionType.CANCEL) {
202-
const r = await processBufferedCancelBulkAction({
203-
...bufferedCtx,
204-
cancelReason: `Bulk action ${group.friendlyId} cancelled run`,
205-
});
206-
successCount += r.successCount;
207-
failureCount += r.failureCount;
208-
} else if (group.type === BulkActionType.REPLAY) {
209-
const r = await processBufferedReplayBulkAction({
210-
...bufferedCtx,
211-
bulkActionId,
212-
prismaClient: this._prisma,
213-
});
214-
successCount += r.successCount;
215-
failureCount += r.failureCount;
216-
}
217-
}
218-
219176
switch (group.type) {
220177
case BulkActionType.CANCEL: {
221178
const cancelService = new CancelTaskRunService(this._prisma);

0 commit comments

Comments
 (0)