Skip to content

Make worker handlers idempotent under lease reclaim#37

Open
galuis116 wants to merge 1 commit into
aglover1221:mainfrom
galuis116:fix/idempotent-worker-handlers
Open

Make worker handlers idempotent under lease reclaim#37
galuis116 wants to merge 1 commit into
aglover1221:mainfrom
galuis116:fix/idempotent-worker-handlers

Conversation

@galuis116
Copy link
Copy Markdown

The Wave 6 durable job queue (PR #5) re-claims any job whose lease_expires_at <= now, but the Anthropic-batch and Reducto-poll handlers were written as if they execute exactly once. A reclaim during the batch result-stream (long batches routinely outlive the 10-minute default lease) re-runs the entire streaming loop and rewrites already-completed rows, double-counts cost, and silently drops results whose custom_id has no extraction_results row.

This change closes the gap:

  • New idempotent transition helpers in lib/pipeline/runs.ts — markResultRunningIdempotent / markResultCompletedIdempotent / markResultFailedIdempotent. Every UPDATE is guarded by a status IN (…) WHERE clause so a row already in a terminal state is never rewritten by a later attempt.

  • Run-level aggregates are derived from a single SQL query over extraction_results (finalizeRunFromResults / aggregateRunTotalsFromResults). In-memory successCount/failCount/totalCostUsd accumulators are gone — they would reset on every reclaim and overwrite the cumulative truth.

  • worker/handlers/anthropic-batch.ts: per-result idempotency skip, extendLease every 10 results, orphan-custom_id rows synthesised as failed so success + fail == request_count always balances.

  • worker/handlers/reducto.ts: failed-state UPDATEs guarded so a transient second-attempt error cannot erase a first-attempt completed row.

  • lib/pipeline/extract.ts: writeExtractionJson routes through atomicWriteJson (tmp + rename), matching the spot-fix path (lib/pipeline/spotfix.ts) and every other canonical-file writer in the repo. Crash-mid-write no longer truncates the prior good extraction.json.

  • Migration 0003 adds extraction_results.cost_usd (the per-result cost field the SQL aggregate reads) and an (run_id, status) composite index.

Verified: tsc --noEmit clean; npm run migrate applies 0003; migrate:status reports no drift; npm run test passes all 35 existing vitest cases; npm run build succeeds.

Closes #36

Guard per-result state transitions in extraction_results so a reclaim
during the Anthropic Batch result stream cannot rewrite an already-
completed row, double-count cost, or erase a successful first attempt.
Run-level aggregates (success_count / fail_count / cost_actual_usd)
are now derived from a single SQL query over extraction_results
instead of in-memory accumulators that reset every attempt.

Also: extendLease every 10 results in the batch poller; route
writeExtractionJson through atomicWriteJson so a crash mid-write no
longer truncates the canonical file; synthesise orphan custom_id
entries as failed rows so success + fail == request_count always
balances; guard the parse-run failed UPDATE in worker/handlers/reducto.ts
with the same WHERE-status clause.

Migration 0003 adds extraction_results.cost_usd and an (run_id, status)
composite index for the aggregate.
@galuis116 galuis116 force-pushed the fix/idempotent-worker-handlers branch from 8862d3e to fd7271a Compare May 26, 2026 20:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[bug] Worker handlers are not idempotent under lease reclaim — duplicate writes, lost cost accounting, and inflated/erased counters on retry

1 participant