Skip to content

Commit fc71e7d

Browse files
fix: handle fast-completion race in batch streaming seal check (#3427)
## Problem When `batchTrigger()` is called with large payloads, each item's payload is uploaded to R2 server-side during the streaming loop before being enqueued. This makes the loop slow — around 3 seconds per item. Workers pick up and execute each item as it's enqueued, running concurrently with the ongoing stream. For the last item in the batch, a race exists between the streaming loop finishing and the batch completion cleanup: 1. The loop enqueues the last item and returns from `enqueueBatchItem()` 2. A waiting worker picks up the item almost instantly and executes it 3. `recordSuccess()` fires, `processedCount` hits the expected total, `finalizeBatch()` runs 4. `cleanup()` deletes all Redis keys for the batch, including `enqueuedItemsKey` 5. The streaming loop exits and calls `getBatchEnqueuedCount()` — reads the now-deleted key — returns 0 The count check finds `enqueuedCount (0) !== batch.runCount`, falls through to a Postgres fallback, but the fallback only checked `sealed`. The BatchQueue completion path sets `status = COMPLETED` in Postgres without setting `sealed = true` (that's the streaming endpoint's job), so the fallback misses it too. This causes the endpoint to return `sealed: false`. The SDK treats this as retryable and retries up to 5 times with exponential backoff. Each retry calls `enqueueBatchItem()`, which reads the batch meta key from Redis — also deleted by `cleanup()` — and throws "Batch not found or not initialized" (500). The final retry gets a 422 because the batch is already COMPLETED, which the SDK does not retry, causing an `ApiError` to be thrown from `await batchTrigger()` in the parent run — even though all child runs completed successfully. ## Fix In the Postgres fallback inside `StreamBatchItemsService`, also check `status === "COMPLETED"` alongside `sealed`. This covers the fast-completion path where the BatchQueue finishes all runs before the streaming endpoint gets to seal the batch normally. Also switches `findUnique` to `findFirst` per webapp convention. --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 8eb596f commit fc71e7d

2 files changed

Lines changed: 151 additions & 10 deletions

File tree

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -212,15 +212,18 @@ export class StreamBatchItemsService extends WithRunEngine {
212212
// Validate we received the expected number of items
213213
if (enqueuedCount !== batch.runCount) {
214214
// The batch queue consumers may have already processed all items and
215-
// cleaned up the Redis keys before we got here (especially likely when
216-
// items include pre-failed runs that complete instantly). Check if the
217-
// batch was already sealed/completed in Postgres.
218-
const currentBatch = await this._prisma.batchTaskRun.findUnique({
215+
// cleaned up the Redis keys before we got here. This happens when all
216+
// runs complete fast enough that cleanup() deletes the enqueuedItemsKey
217+
// before we read it — typically when the last item executes in the
218+
// milliseconds between the loop ending and getBatchEnqueuedCount() being called.
219+
// Check both sealed (sealed by this endpoint on a concurrent request) and
220+
// COMPLETED (sealed by the BatchQueue completion path before we got here).
221+
const currentBatch = await this._prisma.batchTaskRun.findFirst({
219222
where: { id: batchId },
220223
select: { sealed: true, status: true },
221224
});
222225

223-
if (currentBatch?.sealed) {
226+
if (currentBatch?.sealed || currentBatch?.status === "COMPLETED") {
224227
logger.info("Batch already sealed before count check (fast completion)", {
225228
batchId: batchFriendlyId,
226229
itemsAccepted,
@@ -279,8 +282,18 @@ export class StreamBatchItemsService extends WithRunEngine {
279282

280283
// Check if we won the race to seal the batch
281284
if (sealResult.count === 0) {
282-
// Another request sealed the batch first - re-query to check current state
283-
const currentBatch = await this._prisma.batchTaskRun.findUnique({
285+
// The conditional update failed because the batch was no longer in
286+
// PENDING status. Re-query to determine which path got there first:
287+
// - A concurrent streaming request already sealed and moved it to
288+
// PROCESSING.
289+
// - The BatchQueue completion path finished all runs and set it to
290+
// COMPLETED (without setting sealed=true — that's this endpoint's
291+
// job). This window exists between completionCallback (which calls
292+
// tryCompleteBatch) and cleanup() in BatchQueue — see
293+
// batch-queue/index.ts.
294+
// Either way the goal — a durable batch that the SDK stops retrying —
295+
// has been achieved, so we return sealed: true.
296+
const currentBatch = await this._prisma.batchTaskRun.findFirst({
284297
where: { id: batchId },
285298
select: {
286299
id: true,
@@ -290,13 +303,17 @@ export class StreamBatchItemsService extends WithRunEngine {
290303
},
291304
});
292305

293-
if (currentBatch?.sealed && currentBatch.status === "PROCESSING") {
294-
// The batch was sealed by another request - this is fine, the goal was achieved
295-
logger.info("Batch already sealed by concurrent request", {
306+
if (
307+
(currentBatch?.sealed && currentBatch.status === "PROCESSING") ||
308+
currentBatch?.status === "COMPLETED"
309+
) {
310+
logger.info("Batch already sealed/completed by concurrent path", {
296311
batchId: batchFriendlyId,
297312
itemsAccepted,
298313
itemsDeduplicated,
299314
envId: environment.id,
315+
batchStatus: currentBatch.status,
316+
batchSealed: currentBatch.sealed,
300317
});
301318

302319
span.setAttribute("itemsAccepted", itemsAccepted);

apps/webapp/test/engine/streamBatchItems.test.ts

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,130 @@ describe("StreamBatchItemsService", () => {
384384
}
385385
);
386386

387+
containerTest(
388+
"should return sealed=true when batch is COMPLETED by BatchQueue before seal attempt",
389+
async ({ prisma, redisOptions }) => {
390+
const engine = new RunEngine({
391+
prisma,
392+
worker: {
393+
redis: redisOptions,
394+
workers: 1,
395+
tasksPerWorker: 10,
396+
pollIntervalMs: 100,
397+
disabled: true,
398+
},
399+
queue: {
400+
redis: redisOptions,
401+
},
402+
runLock: {
403+
redis: redisOptions,
404+
},
405+
machines: {
406+
defaultMachine: "small-1x",
407+
machines: {
408+
"small-1x": {
409+
name: "small-1x" as const,
410+
cpu: 0.5,
411+
memory: 0.5,
412+
centsPerMs: 0.0001,
413+
},
414+
},
415+
baseCostInCents: 0.0005,
416+
},
417+
batchQueue: {
418+
redis: redisOptions,
419+
},
420+
tracer: trace.getTracer("test", "0.0.0"),
421+
});
422+
423+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
424+
425+
// Create a batch in PENDING state
426+
const batch = await createBatch(prisma, authenticatedEnvironment.id, {
427+
runCount: 2,
428+
status: "PENDING",
429+
sealed: false,
430+
});
431+
432+
// Initialize the batch in Redis
433+
await engine.initializeBatch({
434+
batchId: batch.id,
435+
friendlyId: batch.friendlyId,
436+
environmentId: authenticatedEnvironment.id,
437+
environmentType: authenticatedEnvironment.type,
438+
organizationId: authenticatedEnvironment.organizationId,
439+
projectId: authenticatedEnvironment.projectId,
440+
runCount: 2,
441+
processingConcurrency: 10,
442+
});
443+
444+
// Enqueue items - the enqueued count check passes but the seal updateMany
445+
// will race with tryCompleteBatch moving status to COMPLETED.
446+
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, {
447+
task: "test-task",
448+
payload: JSON.stringify({ data: "item1" }),
449+
payloadType: "application/json",
450+
});
451+
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, {
452+
task: "test-task",
453+
payload: JSON.stringify({ data: "item2" }),
454+
payloadType: "application/json",
455+
});
456+
457+
// Simulate the race where BatchQueue's completionCallback runs
458+
// tryCompleteBatch between getEnqueuedCount and the seal updateMany.
459+
// tryCompleteBatch sets status=COMPLETED but NOT sealed=true.
460+
const racingPrisma = {
461+
...prisma,
462+
batchTaskRun: {
463+
...prisma.batchTaskRun,
464+
findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun),
465+
updateMany: async () => {
466+
await prisma.batchTaskRun.update({
467+
where: { id: batch.id },
468+
data: {
469+
status: "COMPLETED",
470+
},
471+
});
472+
// The conditional updateMany(where: status="PENDING") would now fail
473+
return { count: 0 };
474+
},
475+
findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun),
476+
},
477+
} as unknown as PrismaClient;
478+
479+
const service = new StreamBatchItemsService({
480+
prisma: racingPrisma,
481+
engine,
482+
});
483+
484+
const result = await service.call(
485+
authenticatedEnvironment,
486+
batch.friendlyId,
487+
itemsToAsyncIterable([]),
488+
{
489+
maxItemBytes: 1024 * 1024,
490+
}
491+
);
492+
493+
// The endpoint should accept the COMPLETED state as a success case so the
494+
// SDK does not retry a batch whose child runs have already finished.
495+
expect(result.sealed).toBe(true);
496+
expect(result.id).toBe(batch.friendlyId);
497+
498+
const updatedBatch = await prisma.batchTaskRun.findUnique({
499+
where: { id: batch.id },
500+
});
501+
502+
expect(updatedBatch?.status).toBe("COMPLETED");
503+
// sealed stays false because the BatchQueue completion path does not set
504+
// it - that's fine, the batch is terminal.
505+
expect(updatedBatch?.sealed).toBe(false);
506+
507+
await engine.quit();
508+
}
509+
);
510+
387511
containerTest(
388512
"should throw error when race condition leaves batch in unexpected state",
389513
async ({ prisma, redisOptions }) => {

0 commit comments

Comments
 (0)