From 5d123540201f8ee83e5407144e556a7462d1c18a Mon Sep 17 00:00:00 2001
From: Zina Rasoamanana
Date: Fri, 6 Dec 2024 11:32:24 +0100
Subject: [PATCH 01/11] [IMP] queue_job: remove cron garbage collector and
automatically requeue jobs in timeout
[IMP] queue_job: increment 'retry' when re-queuing job that have been killed
---
queue_job/controllers/main.py | 2 +
queue_job/data/queue_data.xml | 9 --
queue_job/job.py | 56 +++++++
queue_job/jobrunner/runner.py | 141 +++++++++++++-----
.../migrations/17.0.1.2.0/pre-migration.py | 22 +++
queue_job/models/__init__.py | 1 +
queue_job/models/queue_job.py | 50 -------
queue_job/models/queue_job_locks.py | 24 +++
queue_job/readme/CONFIGURE.md | 3 +
queue_job/tests/__init__.py | 1 +
queue_job/tests/test_requeue_dead_job.py | 133 +++++++++++++++++
11 files changed, 342 insertions(+), 100 deletions(-)
create mode 100644 queue_job/migrations/17.0.1.2.0/pre-migration.py
create mode 100644 queue_job/models/queue_job_locks.py
create mode 100644 queue_job/tests/test_requeue_dead_job.py
diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py
index f18401476f..ca3e02acaa 100644
--- a/queue_job/controllers/main.py
+++ b/queue_job/controllers/main.py
@@ -31,6 +31,8 @@ def _try_perform_job(self, env, job):
job.set_started()
job.store()
env.cr.commit()
+ job.lock()
+
_logger.debug("%s started", job)
job.perform()
diff --git a/queue_job/data/queue_data.xml b/queue_job/data/queue_data.xml
index ca5a747746..a2680cc475 100644
--- a/queue_job/data/queue_data.xml
+++ b/queue_job/data/queue_data.xml
@@ -1,15 +1,6 @@
-
- Jobs Garbage Collector
- 5
- minutes
- -1
-
- code
- model.requeue_stuck_jobs()
-
Job failed
diff --git a/queue_job/job.py b/queue_job/job.py
index 9843c01f05..e76c28bfac 100644
--- a/queue_job/job.py
+++ b/queue_job/job.py
@@ -238,6 +238,61 @@ def load_many(cls, env, job_uuids):
recordset = cls.db_records_from_uuids(env, job_uuids)
return {cls._load_from_db_record(record) for record in recordset}
+ def add_lock_record(self):
+ """
+ Create row in db to be locked once the job is performed
+ """
+ self.env.cr.execute(
+ """
+ INSERT INTO
+ queue_job_locks (id)
+ SELECT
+ id
+ FROM
+ queue_job
+ WHERE
+ uuid = %s
+ ON CONFLICT(id)
+ DO NOTHING;
+ """,
+ [self.uuid],
+ )
+
+ def lock(self):
+ """
+ Lock row of job that is being performed
+
+ If a job cannot be locked,
+ it means that the job wasn't started,
+ a RetryableJobError is thrown.
+ """
+ self.env.cr.execute(
+ """
+ SELECT
+ *
+ FROM
+ queue_job_locks
+ WHERE
+ id in (
+ SELECT
+ id
+ FROM
+ queue_job
+ WHERE
+ uuid = %s
+ AND state='started'
+ )
+ FOR UPDATE;
+ """,
+ [self.uuid],
+ )
+
+ # 1 job should be locked
+ if 1 != len(self.env.cr.fetchall()):
+ raise RetryableJobError(
+ f"Trying to lock job that wasn't started, uuid: {self.uuid}"
+ )
+
@classmethod
def _load_from_db_record(cls, job_db_record):
stored = job_db_record
@@ -819,6 +874,7 @@ def set_started(self):
self.state = STARTED
self.date_started = datetime.now()
self.worker_pid = os.getpid()
+ self.add_lock_record()
def set_done(self, result=None):
self.state = DONE
diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py
index 47417caa4f..6ba2906c54 100644
--- a/queue_job/jobrunner/runner.py
+++ b/queue_job/jobrunner/runner.py
@@ -114,22 +114,6 @@
* After creating a new database or installing queue_job on an
existing database, Odoo must be restarted for the runner to detect it.
-* When Odoo shuts down normally, it waits for running jobs to finish.
- However, when the Odoo server crashes or is otherwise force-stopped,
- running jobs are interrupted while the runner has no chance to know
- they have been aborted. In such situations, jobs may remain in
- ``started`` or ``enqueued`` state after the Odoo server is halted.
- Since the runner has no way to know if they are actually running or
- not, and does not know for sure if it is safe to restart the jobs,
- it does not attempt to restart them automatically. Such stale jobs
- therefore fill the running queue and prevent other jobs to start.
- You must therefore requeue them manually, either from the Jobs view,
- or by running the following SQL statement *before starting Odoo*:
-
-.. code-block:: sql
-
- update queue_job set state='pending' where state in ('started', 'enqueued')
-
.. rubric:: Footnotes
.. [1] From a security standpoint, it is safe to have an anonymous HTTP
@@ -155,7 +139,7 @@
from odoo.tools import config
from . import queue_job_config
-from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager
+from .channels import ENQUEUED, NOT_DONE, ChannelManager
SELECT_TIMEOUT = 60
ERROR_RECOVERY_DELAY = 5
@@ -207,28 +191,6 @@ def _connection_info_for(db_name):
def _async_http_get(scheme, host, port, user, password, db_name, job_uuid):
- # Method to set failed job (due to timeout, etc) as pending,
- # to avoid keeping it as enqueued.
- def set_job_pending():
- connection_info = _connection_info_for(db_name)
- conn = psycopg2.connect(**connection_info)
- conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
- with closing(conn.cursor()) as cr:
- cr.execute(
- "UPDATE queue_job SET state=%s, "
- "date_enqueued=NULL, date_started=NULL "
- "WHERE uuid=%s and state=%s "
- "RETURNING uuid",
- (PENDING, job_uuid, ENQUEUED),
- )
- if cr.fetchone():
- _logger.warning(
- "state of job %s was reset from %s to %s",
- job_uuid,
- ENQUEUED,
- PENDING,
- )
-
# TODO: better way to HTTP GET asynchronously (grequest, ...)?
# if this was python3 I would be doing this with
# asyncio, aiohttp and aiopg
@@ -236,6 +198,7 @@ def urlopen():
url = "{}://{}:{}/queue_job/runjob?db={}&job_uuid={}".format(
scheme, host, port, db_name, job_uuid
)
+ # pylint: disable=except-pass
try:
auth = None
if user:
@@ -249,10 +212,10 @@ def urlopen():
# for codes between 500 and 600
response.raise_for_status()
except requests.Timeout:
- set_job_pending()
+ # A timeout is a normal behaviour, it shouldn't be logged as an exception
+ pass
except Exception:
_logger.exception("exception in GET %s", url)
- set_job_pending()
thread = threading.Thread(target=urlopen)
thread.daemon = True
@@ -343,6 +306,96 @@ def set_job_enqueued(self, uuid):
(ENQUEUED, uuid),
)
+ def _query_requeue_dead_jobs(self):
+ return """
+ UPDATE
+ queue_job
+ SET
+ state=(
+ CASE
+ WHEN
+ max_retries IS NOT NULL AND
+ retry IS NOT NULL AND
+ retry>max_retries
+ THEN 'failed'
+ ELSE 'pending'
+ END),
+ retry=(
+ CASE
+ WHEN state='started'
+ THEN COALESCE(retry,0)+1 ELSE retry
+ END),
+ exc_name=(
+ CASE
+ WHEN
+ max_retries IS NOT NULL AND
+ retry IS NOT NULL AND
+ retry>max_retries
+ THEN 'JobFoundDead'
+ ELSE exc_name
+ END),
+ exc_info=(
+ CASE
+ WHEN
+ max_retries IS NOT NULL AND
+ retry IS NOT NULL AND
+ retry>max_retries
+ THEN 'Job found dead after too many retries'
+ ELSE exc_info
+ END)
+ WHERE
+ id in (
+ SELECT
+ id
+ FROM
+ queue_job_locks
+ WHERE
+ id in (
+ SELECT
+ id
+ FROM
+ queue_job
+ WHERE
+ state IN ('enqueued','started')
+ AND date_enqueued <
+ (now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
+ )
+ FOR UPDATE SKIP LOCKED
+ )
+ RETURNING uuid
+ """
+
+ def requeue_dead_jobs(self):
+ """
+ Set started and enqueued jobs but not locked to pending
+
+ A job is locked when it's being executed
+ When a job is killed, it releases the lock
+
+ If the number of retries exceeds the number of max retries,
+ the job is set as 'failed' with the error 'JobFoundDead'.
+
+ Adding a buffer on 'date_enqueued' to check
+ that it has been enqueued for more than 10sec.
+ This prevents from requeuing jobs before they are actually started.
+
+ When Odoo shuts down normally, it waits for running jobs to finish.
+ However, when the Odoo server crashes or is otherwise force-stopped,
+ running jobs are interrupted while the runner has no chance to know
+ they have been aborted.
+ """
+
+ with closing(self.conn.cursor()) as cr:
+ query = self._query_requeue_dead_jobs()
+
+ cr.execute(query)
+
+ for (uuid,) in cr.fetchall():
+ _logger.warning(
+ "Re-queued job with uuid: %s",
+ uuid,
+ )
+
class QueueJobRunner:
def __init__(
@@ -424,6 +477,11 @@ def initialize_databases(self):
self.channel_manager.notify(db_name, *job_data)
_logger.info("queue job runner ready for db %s", db_name)
+ def requeue_dead_jobs(self):
+ for db in self.db_by_name.values():
+ if db.has_queue_job:
+ db.requeue_dead_jobs()
+
def run_jobs(self):
now = _odoo_now()
for job in self.channel_manager.get_jobs_to_run(now):
@@ -516,6 +574,7 @@ def run(self):
_logger.info("database connections ready")
# inner loop does the normal processing
while not self._stop:
+ self.requeue_dead_jobs()
self.process_notifications()
self.run_jobs()
self.wait_notification()
diff --git a/queue_job/migrations/17.0.1.2.0/pre-migration.py b/queue_job/migrations/17.0.1.2.0/pre-migration.py
new file mode 100644
index 0000000000..8dbb6ff7f1
--- /dev/null
+++ b/queue_job/migrations/17.0.1.2.0/pre-migration.py
@@ -0,0 +1,22 @@
+# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
+
+
+def migrate(cr, version):
+ # Deactivate cron garbage collector
+ cr.execute(
+ """
+ UPDATE
+ ir_cron
+ SET
+ active=False
+ WHERE id IN (
+ SELECT res_id
+ FROM
+ ir_model_data
+ WHERE
+ module='queue_job'
+ AND model='ir.cron'
+ AND name='ir_cron_queue_job_garbage_collector'
+ );
+ """
+ )
diff --git a/queue_job/models/__init__.py b/queue_job/models/__init__.py
index 4744e7ab46..9048fd3959 100644
--- a/queue_job/models/__init__.py
+++ b/queue_job/models/__init__.py
@@ -3,3 +3,4 @@
from . import queue_job
from . import queue_job_channel
from . import queue_job_function
+from . import queue_job_locks
diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py
index 7607a2701f..33dbf2346d 100644
--- a/queue_job/models/queue_job.py
+++ b/queue_job/models/queue_job.py
@@ -6,7 +6,6 @@
from datetime import datetime, timedelta
from odoo import _, api, exceptions, fields, models
-from odoo.osv import expression
from odoo.tools import config, html_escape
from odoo.addons.base_sparse_field.models.fields import Serialized
@@ -414,55 +413,6 @@ def autovacuum(self):
break
return True
- def requeue_stuck_jobs(self, enqueued_delta=5, started_delta=0):
- """Fix jobs that are in a bad states
-
- :param in_queue_delta: lookup time in minutes for jobs
- that are in enqueued state
-
- :param started_delta: lookup time in minutes for jobs
- that are in enqueued state,
- 0 means that it is not checked
- """
- self._get_stuck_jobs_to_requeue(
- enqueued_delta=enqueued_delta, started_delta=started_delta
- ).requeue()
- return True
-
- def _get_stuck_jobs_domain(self, queue_dl, started_dl):
- domain = []
- now = fields.datetime.now()
- if queue_dl:
- queue_dl = now - timedelta(minutes=queue_dl)
- domain.append(
- [
- "&",
- ("date_enqueued", "<=", fields.Datetime.to_string(queue_dl)),
- ("state", "=", "enqueued"),
- ]
- )
- if started_dl:
- started_dl = now - timedelta(minutes=started_dl)
- domain.append(
- [
- "&",
- ("date_started", "<=", fields.Datetime.to_string(started_dl)),
- ("state", "=", "started"),
- ]
- )
- if not domain:
- raise exceptions.ValidationError(
- _("If both parameters are 0, ALL jobs will be requeued!")
- )
- return expression.OR(domain)
-
- def _get_stuck_jobs_to_requeue(self, enqueued_delta, started_delta):
- job_model = self.env["queue.job"]
- stuck_jobs = job_model.search(
- self._get_stuck_jobs_domain(enqueued_delta, started_delta)
- )
- return stuck_jobs
-
def related_action_open_record(self):
"""Open a form view with the record(s) of the job.
diff --git a/queue_job/models/queue_job_locks.py b/queue_job/models/queue_job_locks.py
new file mode 100644
index 0000000000..d2c3d73437
--- /dev/null
+++ b/queue_job/models/queue_job_locks.py
@@ -0,0 +1,24 @@
+# Copyright 2025 ACSONE SA/NV
+# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
+
+from odoo import models
+
+
+class QueueJobLocks(models.AbstractModel):
+
+ _name = "queue.job.locks"
+ _description = "Queue Job Locks"
+
+ def init(self):
+ # Create job lock table
+ self.env.cr.execute(
+ """
+ CREATE TABLE IF NOT EXISTS queue_job_locks (
+ id INT PRIMARY KEY,
+ CONSTRAINT
+ queue_job_locks_queue_job_id_fkey
+ FOREIGN KEY (id)
+ REFERENCES queue_job (id) ON DELETE CASCADE
+ );
+ """
+ )
diff --git a/queue_job/readme/CONFIGURE.md b/queue_job/readme/CONFIGURE.md
index 07b7b84126..216b5358af 100644
--- a/queue_job/readme/CONFIGURE.md
+++ b/queue_job/readme/CONFIGURE.md
@@ -35,3 +35,6 @@ channels = root:2
[^1]: It works with the threaded Odoo server too, although this way of
running Odoo is obviously not for production purposes.
+
+* Jobs that remain in `enqueued` or `started` state (because, for instance,
+ their worker has been killed) will be automatically re-queued.
diff --git a/queue_job/tests/__init__.py b/queue_job/tests/__init__.py
index e0ff9576a5..047942bde4 100644
--- a/queue_job/tests/__init__.py
+++ b/queue_job/tests/__init__.py
@@ -6,3 +6,4 @@
from . import test_model_job_function
from . import test_queue_job_protected_write
from . import test_wizards
+from . import test_requeue_dead_job
diff --git a/queue_job/tests/test_requeue_dead_job.py b/queue_job/tests/test_requeue_dead_job.py
new file mode 100644
index 0000000000..3d63dd8780
--- /dev/null
+++ b/queue_job/tests/test_requeue_dead_job.py
@@ -0,0 +1,133 @@
+# Copyright 2025 ACSONE SA/NV
+# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
+from contextlib import closing
+from datetime import datetime, timedelta
+
+from odoo.tests.common import TransactionCase
+
+from odoo.addons.queue_job.job import Job
+from odoo.addons.queue_job.jobrunner.runner import Database
+
+
+class TestRequeueDeadJob(TransactionCase):
+ def create_dummy_job(self, uuid):
+ """
+ Create dummy job for tests
+ """
+ return (
+ self.env["queue.job"]
+ .with_context(
+ _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL,
+ )
+ .create(
+ {
+ "uuid": uuid,
+ "user_id": self.env.user.id,
+ "state": "pending",
+ "model_name": "queue.job",
+ "method_name": "write",
+ }
+ )
+ )
+
+ def get_locks(self, uuid, cr=None):
+ """
+ Retrieve lock rows
+ """
+ if cr is None:
+ cr = self.env.cr
+
+ cr.execute(
+ """
+ SELECT
+ id
+ FROM
+ queue_job_locks
+ WHERE
+ id IN (
+ SELECT
+ id
+ FROM
+ queue_job
+ WHERE
+ uuid = %s
+ )
+ FOR UPDATE SKIP LOCKED
+ """,
+ [uuid],
+ )
+
+ return cr.fetchall()
+
+ def test_add_lock_record(self):
+ queue_job = self.create_dummy_job("test_add_lock")
+ job_obj = Job.load(self.env, queue_job.uuid)
+
+ job_obj.set_started()
+ self.assertEqual(job_obj.state, "started")
+
+ locks = self.get_locks(job_obj.uuid)
+
+ self.assertEqual(1, len(locks))
+
+ def test_lock(self):
+ queue_job = self.create_dummy_job("test_lock")
+ job_obj = Job.load(self.env, queue_job.uuid)
+
+ job_obj.set_started()
+ job_obj.store()
+
+ locks = self.get_locks(job_obj.uuid)
+
+ self.assertEqual(1, len(locks))
+
+ # commit to update queue_job records in DB
+ self.env.cr.commit() # pylint: disable=E8102
+
+ job_obj.lock()
+
+ with closing(self.env.registry.cursor()) as new_cr:
+ locks = self.get_locks(job_obj.uuid, new_cr)
+
+ # Row should be locked
+ self.assertEqual(0, len(locks))
+
+ # clean up
+ queue_job.unlink()
+
+ self.env.cr.commit() # pylint: disable=E8102
+
+ # because we committed the cursor, the savepoint of the test method is
+ # gone, and this would break TransactionCase cleanups
+ self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id)
+
+ def test_requeue_dead_jobs(self):
+ uuid = "test_requeue_dead_jobs"
+
+ queue_job = self.create_dummy_job(uuid)
+ job_obj = Job.load(self.env, queue_job.uuid)
+
+ job_obj.set_enqueued()
+ # simulate enqueuing was in the past
+ job_obj.date_enqueued = datetime.now() - timedelta(minutes=1)
+ job_obj.set_started()
+
+ job_obj.store()
+ self.env.cr.commit() # pylint: disable=E8102
+
+ # requeue dead jobs using current cursor
+ query = Database(self.env.cr.dbname)._query_requeue_dead_jobs()
+ self.env.cr.execute(query)
+
+ uuids_requeued = self.env.cr.fetchall()
+
+ self.assertEqual(len(uuids_requeued), 1)
+ self.assertEqual(uuids_requeued[0][0], uuid)
+
+ # clean up
+ queue_job.unlink()
+ self.env.cr.commit() # pylint: disable=E8102
+
+ # because we committed the cursor, the savepoint of the test method is
+ # gone, and this would break TransactionCase cleanups
+ self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id)
From 274c7b30543976e8405677a4b08713c5682a29a9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Bidoul?=
Date: Sat, 1 Feb 2025 11:08:18 +0100
Subject: [PATCH 02/11] [IMP] queue_job: use queue_job_lock model
A model is better than a manually managed table as it will
protect the table from deletion by database_cleanup.
---
queue_job/job.py | 8 ++++----
queue_job/jobrunner/runner.py | 6 +++---
queue_job/models/__init__.py | 2 +-
queue_job/models/queue_job_lock.py | 16 ++++++++++++++++
queue_job/models/queue_job_locks.py | 24 ------------------------
queue_job/security/ir.model.access.csv | 1 +
queue_job/tests/test_requeue_dead_job.py | 6 +++---
7 files changed, 28 insertions(+), 35 deletions(-)
create mode 100644 queue_job/models/queue_job_lock.py
delete mode 100644 queue_job/models/queue_job_locks.py
diff --git a/queue_job/job.py b/queue_job/job.py
index e76c28bfac..d58857322b 100644
--- a/queue_job/job.py
+++ b/queue_job/job.py
@@ -245,9 +245,9 @@ def add_lock_record(self):
self.env.cr.execute(
"""
INSERT INTO
- queue_job_locks (id)
+ queue_job_lock (id, queue_job_id)
SELECT
- id
+ id, id
FROM
queue_job
WHERE
@@ -271,9 +271,9 @@ def lock(self):
SELECT
*
FROM
- queue_job_locks
+ queue_job_lock
WHERE
- id in (
+ queue_job_id in (
SELECT
id
FROM
diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py
index 6ba2906c54..df7260e5d6 100644
--- a/queue_job/jobrunner/runner.py
+++ b/queue_job/jobrunner/runner.py
@@ -346,11 +346,11 @@ def _query_requeue_dead_jobs(self):
WHERE
id in (
SELECT
- id
+ queue_job_id
FROM
- queue_job_locks
+ queue_job_lock
WHERE
- id in (
+ queue_job_id in (
SELECT
id
FROM
diff --git a/queue_job/models/__init__.py b/queue_job/models/__init__.py
index 9048fd3959..6265dfe9cb 100644
--- a/queue_job/models/__init__.py
+++ b/queue_job/models/__init__.py
@@ -3,4 +3,4 @@
from . import queue_job
from . import queue_job_channel
from . import queue_job_function
-from . import queue_job_locks
+from . import queue_job_lock
diff --git a/queue_job/models/queue_job_lock.py b/queue_job/models/queue_job_lock.py
new file mode 100644
index 0000000000..b01c7f3a91
--- /dev/null
+++ b/queue_job/models/queue_job_lock.py
@@ -0,0 +1,16 @@
+# Copyright 2025 ACSONE SA/NV
+# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
+
+from odoo import fields, models
+
+
+class QueueJobLock(models.Model):
+ _name = "queue.job.lock"
+ _description = "Queue Job Lock"
+
+ queue_job_id = fields.Many2one(
+ comodel_name="queue.job",
+ required=True,
+ ondelete="cascade",
+ index=True,
+ )
diff --git a/queue_job/models/queue_job_locks.py b/queue_job/models/queue_job_locks.py
deleted file mode 100644
index d2c3d73437..0000000000
--- a/queue_job/models/queue_job_locks.py
+++ /dev/null
@@ -1,24 +0,0 @@
-# Copyright 2025 ACSONE SA/NV
-# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
-
-from odoo import models
-
-
-class QueueJobLocks(models.AbstractModel):
-
- _name = "queue.job.locks"
- _description = "Queue Job Locks"
-
- def init(self):
- # Create job lock table
- self.env.cr.execute(
- """
- CREATE TABLE IF NOT EXISTS queue_job_locks (
- id INT PRIMARY KEY,
- CONSTRAINT
- queue_job_locks_queue_job_id_fkey
- FOREIGN KEY (id)
- REFERENCES queue_job (id) ON DELETE CASCADE
- );
- """
- )
diff --git a/queue_job/security/ir.model.access.csv b/queue_job/security/ir.model.access.csv
index 634daf8ede..4def7dc38a 100644
--- a/queue_job/security/ir.model.access.csv
+++ b/queue_job/security/ir.model.access.csv
@@ -1,5 +1,6 @@
id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
access_queue_job_manager,queue job manager,queue_job.model_queue_job,queue_job.group_queue_job_manager,1,1,1,1
+access_queue_job_lock_manager,queue job lock manager,queue_job.model_queue_job_lock,queue_job.group_queue_job_manager,1,0,0,0
access_queue_job_function_manager,queue job functions manager,queue_job.model_queue_job_function,queue_job.group_queue_job_manager,1,1,1,1
access_queue_job_channel_manager,queue job channel manager,queue_job.model_queue_job_channel,queue_job.group_queue_job_manager,1,1,1,1
access_queue_requeue_job,queue requeue job manager,queue_job.model_queue_requeue_job,queue_job.group_queue_job_manager,1,1,1,1
diff --git a/queue_job/tests/test_requeue_dead_job.py b/queue_job/tests/test_requeue_dead_job.py
index 3d63dd8780..c6c82a2f4d 100644
--- a/queue_job/tests/test_requeue_dead_job.py
+++ b/queue_job/tests/test_requeue_dead_job.py
@@ -40,11 +40,11 @@ def get_locks(self, uuid, cr=None):
cr.execute(
"""
SELECT
- id
+ queue_job_id
FROM
- queue_job_locks
+ queue_job_lock
WHERE
- id IN (
+ queue_job_id IN (
SELECT
id
FROM
From ca5fba5edcfd89a1313d631a6b17e43c65ea488c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Bidoul?=
Date: Sat, 1 Feb 2025 11:30:18 +0100
Subject: [PATCH 03/11] [IMP] queue_job: tweak comment and warning message
---
queue_job/__manifest__.py | 2 +-
queue_job/job.py | 2 +-
queue_job/jobrunner/runner.py | 5 +----
3 files changed, 3 insertions(+), 6 deletions(-)
diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py
index 93ae82789c..ea5978dcf5 100644
--- a/queue_job/__manifest__.py
+++ b/queue_job/__manifest__.py
@@ -2,7 +2,7 @@
{
"name": "Job Queue",
- "version": "17.0.1.1.1",
+ "version": "17.0.1.2.0",
"author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)",
"website": "https://github.com/OCA/queue",
"license": "LGPL-3",
diff --git a/queue_job/job.py b/queue_job/job.py
index d58857322b..e03dd2b517 100644
--- a/queue_job/job.py
+++ b/queue_job/job.py
@@ -240,7 +240,7 @@ def load_many(cls, env, job_uuids):
def add_lock_record(self):
"""
- Create row in db to be locked once the job is performed
+ Create row in db to be locked while the job is being performed.
"""
self.env.cr.execute(
"""
diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py
index df7260e5d6..2414d1bf9b 100644
--- a/queue_job/jobrunner/runner.py
+++ b/queue_job/jobrunner/runner.py
@@ -391,10 +391,7 @@ def requeue_dead_jobs(self):
cr.execute(query)
for (uuid,) in cr.fetchall():
- _logger.warning(
- "Re-queued job with uuid: %s",
- uuid,
- )
+ _logger.warning("Re-queued dead job with uuid: %s", uuid)
class QueueJobRunner:
From 376c42416cae7c0b3fb1a390acfd725f49b1f462 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Bidoul?=
Date: Tue, 2 Jul 2024 13:50:11 +0200
Subject: [PATCH 04/11] [FIX] queue_job: close connection to databases without
job queue
Without this, we leak connections to Databases that don't have queue_job
installed.
---
queue_job/jobrunner/runner.py | 2 ++
1 file changed, 2 insertions(+)
diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py
index 47417caa4f..a2be314815 100644
--- a/queue_job/jobrunner/runner.py
+++ b/queue_job/jobrunner/runner.py
@@ -423,6 +423,8 @@ def initialize_databases(self):
for job_data in cr:
self.channel_manager.notify(db_name, *job_data)
_logger.info("queue job runner ready for db %s", db_name)
+ else:
+ db.close()
def run_jobs(self):
now = _odoo_now()
From e8247a11bd443db1d59a9a39c28c44da3945a04b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Bidoul?=
Date: Tue, 2 Jul 2024 13:45:08 +0200
Subject: [PATCH 05/11] [FIX] queue_job: handle exceptions in Database
constructor
Without this we risk connection leaks in case of exceptions in the
constructor.
---
queue_job/jobrunner/runner.py | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py
index a2be314815..a3ffda688c 100644
--- a/queue_job/jobrunner/runner.py
+++ b/queue_job/jobrunner/runner.py
@@ -264,10 +264,14 @@ def __init__(self, db_name):
self.db_name = db_name
connection_info = _connection_info_for(db_name)
self.conn = psycopg2.connect(**connection_info)
- self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
- self.has_queue_job = self._has_queue_job()
- if self.has_queue_job:
- self._initialize()
+ try:
+ self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
+ self.has_queue_job = self._has_queue_job()
+ if self.has_queue_job:
+ self._initialize()
+ except BaseException:
+ self.close()
+ raise
def close(self):
# pylint: disable=except-pass
From f71465112eeba9cde4b7229c9dbd8867600dd39b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Bidoul?=
Date: Tue, 2 Jul 2024 13:06:14 +0200
Subject: [PATCH 06/11] [IMP] queue_job: HA job runner using session level
advisory lock
---
queue_job/jobrunner/runner.py | 26 ++++++++++++++++++++++++--
1 file changed, 24 insertions(+), 2 deletions(-)
diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py
index a3ffda688c..afaaa90200 100644
--- a/queue_job/jobrunner/runner.py
+++ b/queue_job/jobrunner/runner.py
@@ -159,12 +159,17 @@
SELECT_TIMEOUT = 60
ERROR_RECOVERY_DELAY = 5
+PG_ADVISORY_LOCK_ID = 2293787760715711918
_logger = logging.getLogger(__name__)
select = selectors.DefaultSelector
+class MasterElectionLost(Exception):
+ pass
+
+
# Unfortunately, it is not possible to extend the Odoo
# server command line arguments, so we resort to environment variables
# to configure the runner (channels mostly).
@@ -268,6 +273,7 @@ def __init__(self, db_name):
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.has_queue_job = self._has_queue_job()
if self.has_queue_job:
+ self._acquire_master_lock()
self._initialize()
except BaseException:
self.close()
@@ -284,6 +290,14 @@ def close(self):
pass
self.conn = None
+ def _acquire_master_lock(self):
+ """Acquire the master runner lock or raise MasterElectionLost"""
+ with closing(self.conn.cursor()) as cr:
+ cr.execute("SELECT pg_try_advisory_lock(%s)", (PG_ADVISORY_LOCK_ID,))
+ if not cr.fetchone()[0]:
+ msg = f"could not acquire master runner lock on {self.db_name}"
+ raise MasterElectionLost(msg)
+
def _has_queue_job(self):
with closing(self.conn.cursor()) as cr:
cr.execute(
@@ -406,7 +420,7 @@ def get_db_names(self):
db_names = config["db_name"].split(",")
else:
db_names = odoo.service.db.list_dbs(True)
- return db_names
+ return sorted(db_names)
def close_databases(self, remove_jobs=True):
for db_name, db in self.db_by_name.items():
@@ -515,7 +529,7 @@ def run(self):
while not self._stop:
# outer loop does exception recovery
try:
- _logger.info("initializing database connections")
+ _logger.debug("initializing database connections")
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.initialize_databases()
@@ -530,6 +544,14 @@ def run(self):
except InterruptedError:
# Interrupted system call, i.e. KeyboardInterrupt during select
self.stop()
+ except MasterElectionLost as e:
+ _logger.debug(
+ "master election lost: %s, sleeping %ds and retrying",
+ e,
+ ERROR_RECOVERY_DELAY,
+ )
+ self.close_databases()
+ time.sleep(ERROR_RECOVERY_DELAY)
except Exception:
_logger.exception(
"exception: sleeping %ds and retrying", ERROR_RECOVERY_DELAY
From d9f38cbdc5bd1098f216910c7dfffd4720e341e1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Bidoul?=
Date: Tue, 2 Jul 2024 20:35:11 +0200
Subject: [PATCH 07/11] [IMP] queue_job: make sorting more explicit
---
queue_job/jobrunner/runner.py | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py
index afaaa90200..3ffd7d035f 100644
--- a/queue_job/jobrunner/runner.py
+++ b/queue_job/jobrunner/runner.py
@@ -420,7 +420,7 @@ def get_db_names(self):
db_names = config["db_name"].split(",")
else:
db_names = odoo.service.db.list_dbs(True)
- return sorted(db_names)
+ return db_names
def close_databases(self, remove_jobs=True):
for db_name, db in self.db_by_name.items():
@@ -433,7 +433,8 @@ def close_databases(self, remove_jobs=True):
self.db_by_name = {}
def initialize_databases(self):
- for db_name in self.get_db_names():
+ for db_name in sorted(self.get_db_names()):
+ # sorting is important to avoid deadlocks in acquiring the master lock
db = Database(db_name)
if db.has_queue_job:
self.db_by_name[db_name] = db
From d34c3f5e65784d37fcbef43a3c83fada46a86fab Mon Sep 17 00:00:00 2001
From: oca-ci
Date: Mon, 17 Mar 2025 13:06:20 +0000
Subject: [PATCH 08/11] [UPD] Update queue_job.pot
---
queue_job/i18n/queue_job.pot | 24 ++++++++++++------------
1 file changed, 12 insertions(+), 12 deletions(-)
diff --git a/queue_job/i18n/queue_job.pot b/queue_job/i18n/queue_job.pot
index 6a3515a101..8aaa602147 100644
--- a/queue_job/i18n/queue_job.pot
+++ b/queue_job/i18n/queue_job.pot
@@ -165,6 +165,7 @@ msgstr ""
#. module: queue_job
#: model:ir.model.fields,field_description:queue_job.field_queue_job_channel__create_uid
+#: model:ir.model.fields,field_description:queue_job.field_queue_job_lock__create_uid
#: model:ir.model.fields,field_description:queue_job.field_queue_jobs_to_cancelled__create_uid
#: model:ir.model.fields,field_description:queue_job.field_queue_jobs_to_done__create_uid
#: model:ir.model.fields,field_description:queue_job.field_queue_requeue_job__create_uid
@@ -173,6 +174,7 @@ msgstr ""
#. module: queue_job
#: model:ir.model.fields,field_description:queue_job.field_queue_job_channel__create_date
+#: model:ir.model.fields,field_description:queue_job.field_queue_job_lock__create_date
#: model:ir.model.fields,field_description:queue_job.field_queue_jobs_to_cancelled__create_date
#: model:ir.model.fields,field_description:queue_job.field_queue_jobs_to_done__create_date
#: model:ir.model.fields,field_description:queue_job.field_queue_requeue_job__create_date
@@ -219,6 +221,7 @@ msgstr ""
#: model:ir.model.fields,field_description:queue_job.field_queue_job__display_name
#: model:ir.model.fields,field_description:queue_job.field_queue_job_channel__display_name
#: model:ir.model.fields,field_description:queue_job.field_queue_job_function__display_name
+#: model:ir.model.fields,field_description:queue_job.field_queue_job_lock__display_name
#: model:ir.model.fields,field_description:queue_job.field_queue_jobs_to_cancelled__display_name
#: model:ir.model.fields,field_description:queue_job.field_queue_jobs_to_done__display_name
#: model:ir.model.fields,field_description:queue_job.field_queue_requeue_job__display_name
@@ -349,6 +352,7 @@ msgstr ""
#: model:ir.model.fields,field_description:queue_job.field_queue_job__id
#: model:ir.model.fields,field_description:queue_job.field_queue_job_channel__id
#: model:ir.model.fields,field_description:queue_job.field_queue_job_function__id
+#: model:ir.model.fields,field_description:queue_job.field_queue_job_lock__id
#: model:ir.model.fields,field_description:queue_job.field_queue_jobs_to_cancelled__id
#: model:ir.model.fields,field_description:queue_job.field_queue_jobs_to_done__id
#: model:ir.model.fields,field_description:queue_job.field_queue_requeue_job__id
@@ -370,13 +374,6 @@ msgstr ""
msgid "Identity Key"
msgstr ""
-#. module: queue_job
-#. odoo-python
-#: code:addons/queue_job/models/queue_job.py:0
-#, python-format
-msgid "If both parameters are 0, ALL jobs will be requeued!"
-msgstr ""
-
#. module: queue_job
#: model:ir.model.fields,help:queue_job.field_queue_job__message_needaction
msgid "If checked, new messages require your attention."
@@ -468,11 +465,6 @@ msgstr ""
msgid "Jobs"
msgstr ""
-#. module: queue_job
-#: model:ir.actions.server,name:queue_job.ir_cron_queue_job_garbage_collector_ir_actions_server
-msgid "Jobs Garbage Collector"
-msgstr ""
-
#. module: queue_job
#. odoo-python
#: code:addons/queue_job/models/queue_job.py:0
@@ -487,6 +479,7 @@ msgstr ""
#. module: queue_job
#: model:ir.model.fields,field_description:queue_job.field_queue_job_channel__write_uid
+#: model:ir.model.fields,field_description:queue_job.field_queue_job_lock__write_uid
#: model:ir.model.fields,field_description:queue_job.field_queue_jobs_to_cancelled__write_uid
#: model:ir.model.fields,field_description:queue_job.field_queue_jobs_to_done__write_uid
#: model:ir.model.fields,field_description:queue_job.field_queue_requeue_job__write_uid
@@ -495,6 +488,7 @@ msgstr ""
#. module: queue_job
#: model:ir.model.fields,field_description:queue_job.field_queue_job_channel__write_date
+#: model:ir.model.fields,field_description:queue_job.field_queue_job_lock__write_date
#: model:ir.model.fields,field_description:queue_job.field_queue_jobs_to_cancelled__write_date
#: model:ir.model.fields,field_description:queue_job.field_queue_jobs_to_done__write_date
#: model:ir.model.fields,field_description:queue_job.field_queue_requeue_job__write_date
@@ -646,9 +640,15 @@ msgstr ""
#. module: queue_job
#: model:ir.model,name:queue_job.model_queue_job
+#: model:ir.model.fields,field_description:queue_job.field_queue_job_lock__queue_job_id
msgid "Queue Job"
msgstr ""
+#. module: queue_job
+#: model:ir.model,name:queue_job.model_queue_job_lock
+msgid "Queue Job Lock"
+msgstr ""
+
#. module: queue_job
#. odoo-python
#: code:addons/queue_job/models/queue_job.py:0
From f327ed0fd619618ea2cf754b61d49c1a1794e48c Mon Sep 17 00:00:00 2001
From: OCA-git-bot
Date: Mon, 17 Mar 2025 13:08:06 +0000
Subject: [PATCH 09/11] [BOT] post-merge updates
---
README.md | 2 +-
queue_job/README.rst | 6 +++-
queue_job/static/description/index.html | 47 +++++++++++++------------
3 files changed, 31 insertions(+), 24 deletions(-)
diff --git a/README.md b/README.md
index bcca4cd784..b525b00dc3 100644
--- a/README.md
+++ b/README.md
@@ -21,7 +21,7 @@ Available addons
----------------
addon | version | maintainers | summary
--- | --- | --- | ---
-[queue_job](queue_job/) | 17.0.1.1.1 | [](https://github.com/guewen) | Job Queue
+[queue_job](queue_job/) | 17.0.1.2.0 | [](https://github.com/guewen) | Job Queue
[queue_job_cron](queue_job_cron/) | 17.0.1.0.0 | | Scheduled Actions as Queue Jobs
[queue_job_cron_jobrunner](queue_job_cron_jobrunner/) | 17.0.1.0.0 | [](https://github.com/ivantodorovich) | Run jobs without a dedicated JobRunner
[queue_job_subscribe](queue_job_subscribe/) | 17.0.1.0.0 | | Control which users are subscribed to queue job notifications
diff --git a/queue_job/README.rst b/queue_job/README.rst
index 50d20da798..6f4f2bfd63 100644
--- a/queue_job/README.rst
+++ b/queue_job/README.rst
@@ -7,7 +7,7 @@ Job Queue
!! This file is generated by oca-gen-addon-readme !!
!! changes will be overwritten. !!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
- !! source digest: sha256:985afa6fddcad78278ffe6f760e0483e547b2dc57dad1d829187d485ac1a22cb
+ !! source digest: sha256:bbcde88ce903b226f8c929ca470aa04a217171796635cfa8842de2070e21301e
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
.. |badge1| image:: https://img.shields.io/badge/maturity-Mature-brightgreen.png
@@ -158,6 +158,10 @@ Configuration
- Tip: to enable debug logging for the queue job, use
``--log-handler=odoo.addons.queue_job:DEBUG``
+- Jobs that remain in ``enqueued`` or ``started`` state (because, for
+ instance, their worker has been killed) will be automatically
+ re-queued.
+
.. [1]
It works with the threaded Odoo server too, although this way of
running Odoo is obviously not for production purposes.
diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html
index 8fb0f6b0e1..13e3c686b0 100644
--- a/queue_job/static/description/index.html
+++ b/queue_job/static/description/index.html
@@ -367,7 +367,7 @@ Job Queue
!! This file is generated by oca-gen-addon-readme !!
!! changes will be overwritten. !!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-!! source digest: sha256:985afa6fddcad78278ffe6f760e0483e547b2dc57dad1d829187d485ac1a22cb
+!! source digest: sha256:bbcde88ce903b226f8c929ca470aa04a217171796635cfa8842de2070e21301e
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -->

This addon adds an integrated Job Queue to Odoo.
@@ -376,19 +376,19 @@ Job Queue
transaction.
Example:
-from odoo import models, fields, api
+from odoo import models, fields, api
-class MyModel(models.Model):
+class MyModel(models.Model):
_name = 'my.model'
- def my_method(self, a, k=None):
+ def my_method(self, a, k=None):
_logger.info('executed with a: %s and k: %s', a, k)
-class MyOtherModel(models.Model):
+class MyOtherModel(models.Model):
_name = 'my.other.model'
- def button_do_stuff(self):
+ def button_do_stuff(self):
self.env['my.model'].with_delay().my_method('a', k=2)
In the snippet of code above, when we call button_do_stuff, a job
@@ -517,6 +517,9 @@
immediately and in parallel.
Tip: to enable debug logging for the queue job, use
--log-handler=odoo.addons.queue_job:DEBUG
+Jobs that remain in enqueued or started state (because, for
+instance, their worker has been killed) will be automatically
+re-queued.