feat: implement race condition handling for task dequeue and status updates#237
Conversation
Coverage Report
File Coverage
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Note: $queryRaw returns raw database values, not Prisma-mapped values | ||
| // We need to re-fetch the task using Prisma to get properly mapped enum values | ||
| const rawTask = tasks[0]!; | ||
| const task = await tx.task.findUnique({ | ||
| where: { id: rawTask.id }, | ||
| }); |
There was a problem hiding this comment.
its bad practice to query again as it increases the load on the database. prisma recommends using TypedSql in their docs.
https://www.prisma.io/docs/orm/prisma-client/using-raw-sql/typedsql
| async (newTx) => { | ||
| await this.executeUpdateStatus(jobId, status, newTx); | ||
| }, | ||
| { timeout: TX_TIMEOUT_MS } |
There was a problem hiding this comment.
change to global timeout for transactions (pretty sure its a thing)
There was a problem hiding this comment.
changed on most, it is not working on some parts, I checked that there is a transaction that must be mentioned explicitly
There was a problem hiding this comment.
is it a bug? i dont see why a pg option wont work
| // This prevents errors during race conditions where multiple workers | ||
| // try to set the same status (e.g., multiple tasks setting stage to IN_PROGRESS) | ||
| /* v8 ignore next 4 -- @preserve */ | ||
| if (stage.status === status) { |
There was a problem hiding this comment.
i know its already exists, but maybe a name like newStatus/wantedStatus would be better?
| * Uses SELECT FOR UPDATE SKIP LOCKED for pessimistic locking: | ||
| * - FOR UPDATE: Locks the row so other transactions wait | ||
| * - SKIP LOCKED: Skip rows that are already locked (instead of waiting) |
| * Always includes the current status to implement optimistic locking. | ||
| * This ensures updates only succeed if the task is still in the expected state. | ||
| * | ||
| * **Why this is necessary:** | ||
| * In high-concurrency scenarios with multiple workers, race conditions can occur: | ||
| * | ||
| * Scenario 1: Concurrent dequeue operations | ||
| * - Worker A and B both read Task1 as PENDING | ||
| * - Worker A updates: WHERE id=X AND status=PENDING → IN_PROGRESS (succeeds) | ||
| * - Worker B updates: WHERE id=X AND status=PENDING → IN_PROGRESS (fails - optimistic lock) | ||
| * | ||
| * Scenario 2: Dequeue during update | ||
| * - Task1 is PENDING | ||
| * - Worker A calls updateStatus(Task1, COMPLETED) - reads task as PENDING | ||
| * - Worker B calls dequeue() - reads Task1 as PENDING | ||
| * - Worker B commits: WHERE id=X AND status=PENDING → IN_PROGRESS (succeeds) | ||
| * - Worker A commits: WHERE id=X AND status=PENDING → COMPLETED (fails - status is now IN_PROGRESS) | ||
| * | ||
| * Scenario 3: Double completion | ||
| * - Task1 is IN_PROGRESS | ||
| * - Worker A and B both try to update to COMPLETED | ||
| * - Worker A updates: WHERE id=X AND status=IN_PROGRESS → COMPLETED (succeeds) | ||
| * - Worker B updates: WHERE id=X AND status=IN_PROGRESS → COMPLETED (fails - status is now COMPLETED) | ||
| * | ||
| * Without status check, these scenarios would succeed silently, causing data inconsistency. | ||
| * With status check (optimistic locking), the second update fails with TASK_STATUS_UPDATE_FAILED. |
Co-authored-by: Ofer <12687466+CptSchnitz@users.noreply.github.com>
Co-authored-by: Ofer <12687466+CptSchnitz@users.noreply.github.com>
…y for race condition handling
…tus method for clarity
| async (newTx) => { | ||
| await this.executeUpdateStatus(jobId, status, newTx); | ||
| }, | ||
| { timeout: TX_TIMEOUT_MS } |
There was a problem hiding this comment.
is it a bug? i dont see why a pg option wont work
| ...raw, | ||
| stageId: raw.stage_id, // Handle camelCase conversion | ||
| status: raw.status.toUpperCase() as TaskPrismaObject['status'], | ||
| data: (raw.data ?? {}) as Record<string, unknown>, |
There was a problem hiding this comment.
isnt data not null? so we always get a value
…OperationStatus.PENDING
Related issues:
Further information:
Enhance the system to handle race conditions during task dequeue and status updates. Introduce appropriate error handling and response codes to manage conflicts when multiple workers attempt to modify the same task or stage simultaneously. This includes adding timeouts for transactions and updating the API documentation to reflect these changes.