Skip to content

Job Deduplication and Worker Heartbeats

Latest

Choose a tag to compare

@github-actions github-actions released this 28 Jun 18:52

What's Changed

New Features

  • Added job deduplication through the new .dedup() dispatcher method.

    • .dedup({ id }) skips duplicate jobs while an existing job with the same key is still present.
    • .dedup({ id, ttl }) skips duplicates within a time window.
    • .dedup({ id, ttl, extend: true }) refreshes the deduplication window when a duplicate is dispatched.
    • .dedup({ id, ttl, replace: true }) replaces the payload of an existing pending or delayed job.
    • .dedup({ id, ttl, extend: true, replace: true }) enables debounce-style dispatching.
    • Dispatch results now expose a deduped outcome: added, skipped, replaced, or extended.
  • Added worker heartbeats for long-running in-flight jobs.

    • Workers periodically renew the acquired timestamp of active jobs they own.
    • Healthy long-running jobs are no longer re-delivered just because their handler runs longer than stalledThreshold.
    • stalledThreshold now behaves as a crash-detection window again, instead of an implicit maximum job runtime.

Bug Fixes

  • Fixed Redis job payload preservation for empty arrays.

    • Redis lifecycle operations now keep stored job JSON opaque to Lua, preventing payload values like [] from being serialized back as {}.
    • This applies across acquisition, inspection, retry, stalled recovery, and dedup replacement flows.
  • Fixed worker heartbeat ownership checks.

    • A worker can only renew leases for jobs it still owns.
    • Late heartbeats from a slow or stale worker cannot extend jobs that were already recovered and re-acquired by another worker.

Internal Improvements

  • Moved Redis Lua scripts out of the Redis adapter into dedicated script modules.

Upgrade Notes

  • Knex users with an existing queue_jobs table must add the new deduplication columns before using .dedup():

    import { QueueSchemaService } from '@boringnode/queue'
    
    const schema = new QueueSchemaService(knex)
    await schema.addDedupColumns()

    If you use a custom jobs table name, pass it explicitly:

    await schema.addDedupColumns('my_queue_jobs')

    New tables created with createJobsTable() already include these columns.

  • Deduplication is supported by the Redis and Knex adapters. The Sync adapter still runs every dispatch inline and does not apply deduplication.

  • Deduplication only applies to single-job dispatch. Batch dispatch and scheduled jobs do not support deduplication.

  • The user-provided deduplication ID must be 400 characters or fewer, and the final <jobName>::<id> key must be 510 characters or fewer.

  • Custom adapter implementations must implement the new renewJobs(queue, jobIds) adapter method.

  • Existing Redis jobs remain compatible after upgrading. Jobs that do not yet have the new Redis metadata are read from their existing stored JSON.

  • Redis jobs whose payloads were already corrupted before upgrading cannot be automatically repaired, because an already-stored {} cannot be distinguished from an original [].

Commits Since v0.5.2

  • chore: update dependencies (a400cf5)
  • chore: update yarn (48787f7)
  • feat: renew in-flight job timestamps via a worker heartbeat (#17) (177ebd5)
  • refactor(redis): move lua scripts out of adapter (4a576a3)
  • docs: add changelog for job deduplication (500935a)
  • fix(redis): preserve opaque job payloads (b14e488)
  • chore: remove old changelog (cb40c90)
  • feat: add .dedup() method for job deduplication (#12) (52c83be)