Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions backend/src/__tests__/upload.worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ describe('upload worker — retryable errors escape the processor without a DB u
// it calls updateUploadStatus('failed') so the row no longer stays permanently
// stuck at 'in_progress'.

describe('upload worker — failed handler skips DB on intermediate BullMQ attempt', () => {
describe('upload worker — failed handler resets status to queued on intermediate BullMQ attempt', () => {
it.each([
['HashLockError', new HashLockError('lock already held')],
['SourceCdnError', new SourceCdnError('cdn network error (ECONNRESET)')],
])('%s: does not touch the DB when BullMQ will retry', async (_name, error) => {
])('%s: resets status to queued when BullMQ will retry', async (_name, error) => {
const failedHandler = capturedHandlers.get('failed')
expect(failedHandler).toBeDefined()

Expand All @@ -119,9 +119,20 @@ describe('upload worker — failed handler skips DB on intermediate BullMQ attem

await failedHandler!(job, error)

expect(mockUpdateStatus).not.toHaveBeenCalled()
expect(mockUpdateStatus).toHaveBeenCalledWith(1, 'queued')
expect(mockClearToken).not.toHaveBeenCalled()
})

it('swallows DB errors when resetting status to queued so the handler never rejects', async () => {
mockUpdateStatus.mockRejectedValueOnce(new Error('DB connection lost'))

const failedHandler = capturedHandlers.get('failed')
expect(failedHandler).toBeDefined()

const job = { ...makeJob(1), attemptsMade: 1, opts: { attempts: 3 } }

await expect(failedHandler!(job, new SourceCdnError('cdn error'))).resolves.toBeUndefined()
})
})

describe('upload worker — permanent BullMQ failure marks upload as failed in DB', () => {
Expand Down
10 changes: 9 additions & 1 deletion backend/src/workers/upload.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,20 @@ export function createUploadWorker(redis: Redis, deps?: WorkerDeps): Worker<Uplo
if (!job) return
// BullMQ fires 'failed' on every attempt, not just the final one.
// For errors that rely on BullMQ's built-in retry (not StorageError's custom requeue),
// skip DB updates on intermediate attempts — the access token must stay intact for retries.
// reset status to queued on intermediate attempts — the access token must stay intact for retries.
if (!(err instanceof StorageError) && job.attemptsMade < (job.opts.attempts ?? 1)) {
logger.warn(
{ jobId: job.id, err },
`[worker] [${uploadId}/${batchId}] job attempt failed, will retry`,
)
try {
await uploads.updateUploadStatus(job.data.uploadId, 'queued')
} catch (dbErr) {
logger.error(
{ jobId: job.id, err: dbErr },
`[worker] [${uploadId}/${batchId}] failed to update db status after job failure`,
)
}
return
}
logger.error({ jobId: job.id, err }, `[worker] [${uploadId}/${batchId}] job permanently failed`)
Expand Down