Make worker handlers idempotent under lease reclaim#37
Open
galuis116 wants to merge 1 commit into
Open
Conversation
Guard per-result state transitions in extraction_results so a reclaim during the Anthropic Batch result stream cannot rewrite an already- completed row, double-count cost, or erase a successful first attempt. Run-level aggregates (success_count / fail_count / cost_actual_usd) are now derived from a single SQL query over extraction_results instead of in-memory accumulators that reset every attempt. Also: extendLease every 10 results in the batch poller; route writeExtractionJson through atomicWriteJson so a crash mid-write no longer truncates the canonical file; synthesise orphan custom_id entries as failed rows so success + fail == request_count always balances; guard the parse-run failed UPDATE in worker/handlers/reducto.ts with the same WHERE-status clause. Migration 0003 adds extraction_results.cost_usd and an (run_id, status) composite index for the aggregate.
8862d3e to
fd7271a
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The Wave 6 durable job queue (PR #5) re-claims any job whose lease_expires_at <= now, but the Anthropic-batch and Reducto-poll handlers were written as if they execute exactly once. A reclaim during the batch result-stream (long batches routinely outlive the 10-minute default lease) re-runs the entire streaming loop and rewrites already-completed rows, double-counts cost, and silently drops results whose custom_id has no extraction_results row.
This change closes the gap:
New idempotent transition helpers in lib/pipeline/runs.ts — markResultRunningIdempotent / markResultCompletedIdempotent / markResultFailedIdempotent. Every UPDATE is guarded by a
status IN (…)WHERE clause so a row already in a terminal state is never rewritten by a later attempt.Run-level aggregates are derived from a single SQL query over extraction_results (finalizeRunFromResults / aggregateRunTotalsFromResults). In-memory successCount/failCount/totalCostUsd accumulators are gone — they would reset on every reclaim and overwrite the cumulative truth.
worker/handlers/anthropic-batch.ts: per-result idempotency skip, extendLease every 10 results, orphan-custom_id rows synthesised as failed so success + fail == request_count always balances.
worker/handlers/reducto.ts: failed-state UPDATEs guarded so a transient second-attempt error cannot erase a first-attempt completed row.
lib/pipeline/extract.ts: writeExtractionJson routes through atomicWriteJson (tmp + rename), matching the spot-fix path (lib/pipeline/spotfix.ts) and every other canonical-file writer in the repo. Crash-mid-write no longer truncates the prior good extraction.json.
Migration 0003 adds extraction_results.cost_usd (the per-result cost field the SQL aggregate reads) and an (run_id, status) composite index.
Verified: tsc --noEmit clean; npm run migrate applies 0003; migrate:status reports no drift; npm run test passes all 35 existing vitest cases; npm run build succeeds.
Closes #36