Skip to content

feat: implement race condition handling for task dequeue and status updates#237

Merged
ronenkapelian merged 14 commits into
masterfrom
fix/dequeue/bug
Mar 24, 2026
Merged

feat: implement race condition handling for task dequeue and status updates#237
ronenkapelian merged 14 commits into
masterfrom
fix/dequeue/bug

Conversation

@ronenkapelian

Copy link
Copy Markdown
Collaborator
Question Answer
Bug fix
New feature
Breaking change
Deprecations
Documentation
Tests added
Chore

Related issues:

Further information:
Enhance the system to handle race conditions during task dequeue and status updates. Introduce appropriate error handling and response codes to manage conflicts when multiple workers attempt to modify the same task or stage simultaneously. This includes adding timeouts for transactions and updating the API documentation to reflect these changes.

@ronenkapelian ronenkapelian self-assigned this Feb 19, 2026
@github-actions

github-actions Bot commented Feb 19, 2026

Copy link
Copy Markdown

Coverage Report

Status Category Percentage Covered / Total
🔵 Lines 100% (🎯 80%) 765 / 765
🔵 Statements 100% (🎯 80%) 782 / 782
🔵 Functions 100% (🎯 80%) 111 / 111
🔵 Branches 100% (🎯 80%) 217 / 217
File Coverage
File Stmts Branches Functions Lines Uncovered Lines
Changed Files
src/api/v1/tasks/controller.ts 100% 100% 100% 100%
src/stages/models/manager.ts 100% 100% 100% 100%
src/tasks/DAL/taskRepository.ts 100% 100% 100% 100%
src/tasks/models/helper.ts 100% 100% 100% 100%
src/tasks/models/manager.ts 100% 100% 100% 100%
Generated in workflow #723 for commit 80455ca by the Vitest Coverage Report Action

Comment thread src/tasks/DAL/taskRepository.ts Outdated
Comment on lines +46 to +51
// Note: $queryRaw returns raw database values, not Prisma-mapped values
// We need to re-fetch the task using Prisma to get properly mapped enum values
const rawTask = tasks[0]!;
const task = await tx.task.findUnique({
where: { id: rawTask.id },
});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its bad practice to query again as it increases the load on the database. prisma recommends using TypedSql in their docs.
https://www.prisma.io/docs/orm/prisma-client/using-raw-sql/typedsql

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved

Comment thread src/jobs/models/manager.ts Outdated
async (newTx) => {
await this.executeUpdateStatus(jobId, status, newTx);
},
{ timeout: TX_TIMEOUT_MS }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to global timeout for transactions (pretty sure its a thing)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed on most, it is not working on some parts, I checked that there is a transaction that must be mentioned explicitly

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a bug? i dont see why a pg option wont work

Comment thread src/stages/models/manager.ts Outdated
// This prevents errors during race conditions where multiple workers
// try to set the same status (e.g., multiple tasks setting stage to IN_PROGRESS)
/* v8 ignore next 4 -- @preserve */
if (stage.status === status) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know its already exists, but maybe a name like newStatus/wantedStatus would be better?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment thread src/tasks/DAL/taskRepository.ts Outdated
Comment on lines +17 to +19
* Uses SELECT FOR UPDATE SKIP LOCKED for pessimistic locking:
* - FOR UPDATE: Locks the row so other transactions wait
* - SKIP LOCKED: Skip rows that are already locked (instead of waiting)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is too digging

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment thread src/tasks/models/manager.ts Outdated
Comment on lines +512 to +537
* Always includes the current status to implement optimistic locking.
* This ensures updates only succeed if the task is still in the expected state.
*
* **Why this is necessary:**
* In high-concurrency scenarios with multiple workers, race conditions can occur:
*
* Scenario 1: Concurrent dequeue operations
* - Worker A and B both read Task1 as PENDING
* - Worker A updates: WHERE id=X AND status=PENDING → IN_PROGRESS (succeeds)
* - Worker B updates: WHERE id=X AND status=PENDING → IN_PROGRESS (fails - optimistic lock)
*
* Scenario 2: Dequeue during update
* - Task1 is PENDING
* - Worker A calls updateStatus(Task1, COMPLETED) - reads task as PENDING
* - Worker B calls dequeue() - reads Task1 as PENDING
* - Worker B commits: WHERE id=X AND status=PENDING → IN_PROGRESS (succeeds)
* - Worker A commits: WHERE id=X AND status=PENDING → COMPLETED (fails - status is now IN_PROGRESS)
*
* Scenario 3: Double completion
* - Task1 is IN_PROGRESS
* - Worker A and B both try to update to COMPLETED
* - Worker A updates: WHERE id=X AND status=IN_PROGRESS → COMPLETED (succeeds)
* - Worker B updates: WHERE id=X AND status=IN_PROGRESS → COMPLETED (fails - status is now COMPLETED)
*
* Without status check, these scenarios would succeed silently, causing data inconsistency.
* With status check (optimistic locking), the second update fails with TASK_STATUS_UPDATE_FAILED.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

digging

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment thread openapi3.yaml Outdated
Comment thread openapi3.yaml Outdated
Comment thread src/jobs/models/manager.ts Outdated
async (newTx) => {
await this.executeUpdateStatus(jobId, status, newTx);
},
{ timeout: TX_TIMEOUT_MS }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a bug? i dont see why a pg option wont work

Comment thread src/tasks/models/helper.ts Outdated
...raw,
stageId: raw.stage_id, // Handle camelCase conversion
status: raw.status.toUpperCase() as TaskPrismaObject['status'],
data: (raw.data ?? {}) as Record<string, unknown>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isnt data not null? so we always get a value

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.

@ronenkapelian ronenkapelian merged commit e05f283 into master Mar 24, 2026
10 of 11 checks passed
@ronenkapelian ronenkapelian deleted the fix/dequeue/bug branch March 24, 2026 14:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants