`_ project on GitHub.
diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py
index f32b20e2e2..33e2e68ed8 100644
--- a/queue_job/__manifest__.py
+++ b/queue_job/__manifest__.py
@@ -2,7 +2,7 @@
{
"name": "Job Queue",
- "version": "16.0.2.12.0",
+ "version": "16.0.2.13.0",
"author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)",
"website": "https://github.com/OCA/queue",
"license": "LGPL-3",
@@ -29,7 +29,7 @@
},
"installable": True,
"development_status": "Mature",
- "maintainers": ["guewen"],
+ "maintainers": ["guewen", "sbidoul"],
"post_init_hook": "post_init_hook",
"post_load": "post_load",
}
diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py
index 4addf1be23..adc450d52d 100644
--- a/queue_job/controllers/main.py
+++ b/queue_job/controllers/main.py
@@ -26,15 +26,48 @@
class RunJobController(http.Controller):
- def _try_perform_job(self, env, job):
- """Try to perform the job."""
+ @classmethod
+ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None:
+ """Acquire a job for execution.
+
+ - make sure it is in ENQUEUED state
+ - mark it as STARTED and commit the state change
+ - acquire the job lock
+
+ If successful, return the Job instance, otherwise return None. This
+ function may fail to acquire the job is not in the expected state or is
+ already locked by another worker.
+ """
+ env.cr.execute(
+ "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s "
+ "FOR NO KEY UPDATE SKIP LOCKED",
+ (job_uuid, ENQUEUED),
+ )
+ if not env.cr.fetchone():
+ _logger.warning(
+ "was requested to run job %s, but it does not exist, "
+ "or is not in state %s, or is being handled by another worker",
+ job_uuid,
+ ENQUEUED,
+ )
+ return None
+ job = Job.load(env, job_uuid)
+ assert job and job.state == ENQUEUED
job.set_started()
job.store()
env.cr.commit()
- job.lock()
+ if not job.lock():
+ _logger.warning(
+ "was requested to run job %s, but it could not be locked",
+ job_uuid,
+ )
+ return None
+ return job
+ @classmethod
+ def _try_perform_job(cls, env, job):
+ """Try to perform the job, mark it done and commit if successful."""
_logger.debug("%s started", job)
-
job.perform()
# Triggers any stored computed fields before calling 'set_done'
# so that will be part of the 'exec_time'
@@ -45,18 +78,20 @@ def _try_perform_job(self, env, job):
env.cr.commit()
_logger.debug("%s done", job)
- def _enqueue_dependent_jobs(self, env, job):
+ @classmethod
+ def _enqueue_dependent_jobs(cls, env, job):
tries = 0
while True:
try:
- job.enqueue_waiting()
+ with job.env.cr.savepoint():
+ job.enqueue_waiting()
except OperationalError as err:
# Automatically retry the typical transaction serialization
# errors
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
raise
if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE:
- _logger.info(
+ _logger.error(
"%s, maximum number of tries reached to update dependencies",
errorcodes.lookup(err.pgcode),
)
@@ -74,17 +109,8 @@ def _enqueue_dependent_jobs(self, env, job):
else:
break
- @http.route(
- "/queue_job/runjob",
- type="http",
- auth="none",
- save_session=False,
- readonly=False,
- )
- def runjob(self, db, job_uuid, **kw):
- http.request.session.db = db
- env = http.request.env(user=SUPERUSER_ID)
-
+ @classmethod
+ def _runjob(cls, env: api.Environment, job: Job) -> None:
def retry_postpone(job, message, seconds=None):
job.env.clear()
with registry(job.env.cr.dbname).cursor() as new_cr:
@@ -93,26 +119,9 @@ def retry_postpone(job, message, seconds=None):
job.set_pending(reset_retry=False)
job.store()
- # ensure the job to run is in the correct state and lock the record
- env.cr.execute(
- "SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE",
- (job_uuid, ENQUEUED),
- )
- if not env.cr.fetchone():
- _logger.warning(
- "was requested to run job %s, but it does not exist, "
- "or is not in state %s",
- job_uuid,
- ENQUEUED,
- )
- return ""
-
- job = Job.load(env, job_uuid)
- assert job and job.state == ENQUEUED
-
try:
try:
- self._try_perform_job(env, job)
+ cls._try_perform_job(env, job)
except OperationalError as err:
# Automatically retry the typical transaction serialization
# errors
@@ -141,7 +150,6 @@ def retry_postpone(job, message, seconds=None):
# traceback in the logs we should have the traceback when all
# retries are exhausted
env.cr.rollback()
- return ""
except (FailedJobError, Exception) as orig_exception:
buff = StringIO()
@@ -151,19 +159,18 @@ def retry_postpone(job, message, seconds=None):
job.env.clear()
with registry(job.env.cr.dbname).cursor() as new_cr:
job.env = job.env(cr=new_cr)
- vals = self._get_failure_values(job, traceback_txt, orig_exception)
+ vals = cls._get_failure_values(job, traceback_txt, orig_exception)
job.set_failed(**vals)
job.store()
buff.close()
raise
_logger.debug("%s enqueue depends started", job)
- self._enqueue_dependent_jobs(env, job)
+ cls._enqueue_dependent_jobs(env, job)
_logger.debug("%s enqueue depends done", job)
- return ""
-
- def _get_failure_values(self, job, traceback_txt, orig_exception):
+ @classmethod
+ def _get_failure_values(cls, job, traceback_txt, orig_exception):
"""Collect relevant data from exception."""
exception_name = orig_exception.__class__.__name__
if hasattr(orig_exception, "__module__"):
@@ -177,6 +184,22 @@ def _get_failure_values(self, job, traceback_txt, orig_exception):
"exc_message": exc_message,
}
+ @http.route(
+ "/queue_job/runjob",
+ type="http",
+ auth="none",
+ save_session=False,
+ readonly=False,
+ )
+ def runjob(self, db, job_uuid, **kw):
+ http.request.session.db = db
+ env = http.request.env(user=SUPERUSER_ID)
+ job = self._acquire_job(env, job_uuid)
+ if not job:
+ return ""
+ self._runjob(env, job)
+ return ""
+
# flake8: noqa: C901
@http.route("/queue_job/create_test_job", type="http", auth="user")
def create_test_job(
@@ -187,6 +210,7 @@ def create_test_job(
description="Test job",
size=1,
failure_rate=0,
+ job_duration=0,
):
"""Create test jobs
@@ -207,6 +231,12 @@ def create_test_job(
except (ValueError, TypeError):
failure_rate = 0
+ if job_duration is not None:
+ try:
+ job_duration = float(job_duration)
+ except (ValueError, TypeError):
+ job_duration = 0
+
if not (0 <= failure_rate <= 1):
raise BadRequest("failure_rate must be between 0 and 1")
@@ -235,6 +265,7 @@ def create_test_job(
channel=channel,
description=description,
failure_rate=failure_rate,
+ job_duration=job_duration,
)
if size > 1:
@@ -245,6 +276,7 @@ def create_test_job(
channel=channel,
description=description,
failure_rate=failure_rate,
+ job_duration=job_duration,
)
return ""
@@ -256,6 +288,7 @@ def _create_single_test_job(
description="Test job",
size=1,
failure_rate=0,
+ job_duration=0,
):
delayed = (
http.request.env["queue.job"]
@@ -265,7 +298,7 @@ def _create_single_test_job(
channel=channel,
description=description,
)
- ._test_job(failure_rate=failure_rate)
+ ._test_job(failure_rate=failure_rate, job_duration=job_duration)
)
return "job uuid: %s" % (delayed.db_record().uuid,)
@@ -279,6 +312,7 @@ def _create_graph_test_jobs(
channel=None,
description="Test job",
failure_rate=0,
+ job_duration=0,
):
model = http.request.env["queue.job"]
current_count = 0
@@ -301,7 +335,7 @@ def _create_graph_test_jobs(
max_retries=max_retries,
channel=channel,
description="%s #%d" % (description, current_count),
- )._test_job(failure_rate=failure_rate)
+ )._test_job(failure_rate=failure_rate, job_duration=job_duration)
)
grouping = random.choice(possible_grouping_methods)
diff --git a/queue_job/job.py b/queue_job/job.py
index 790e07d90e..86407be3bb 100644
--- a/queue_job/job.py
+++ b/queue_job/job.py
@@ -236,7 +236,7 @@ def load_many(cls, env, job_uuids):
recordset = cls.db_records_from_uuids(env, job_uuids)
return {cls._load_from_db_record(record) for record in recordset}
- def add_lock_record(self):
+ def add_lock_record(self) -> None:
"""
Create row in db to be locked while the job is being performed.
"""
@@ -256,13 +256,11 @@ def add_lock_record(self):
[self.uuid],
)
- def lock(self):
- """
- Lock row of job that is being performed
+ def lock(self) -> bool:
+ """Lock row of job that is being performed.
- If a job cannot be locked,
- it means that the job wasn't started,
- a RetryableJobError is thrown.
+ Return False if a job cannot be locked: it means that the job is not in
+ STARTED state or is already locked by another worker.
"""
self.env.cr.execute(
"""
@@ -278,18 +276,15 @@ def lock(self):
queue_job
WHERE
uuid = %s
- AND state='started'
+ AND state = %s
)
- FOR UPDATE;
+ FOR NO KEY UPDATE SKIP LOCKED;
""",
- [self.uuid],
+ [self.uuid, STARTED],
)
# 1 job should be locked
- if 1 != len(self.env.cr.fetchall()):
- raise RetryableJobError(
- f"Trying to lock job that wasn't started, uuid: {self.uuid}"
- )
+ return bool(self.env.cr.fetchall())
@classmethod
def _load_from_db_record(cls, job_db_record):
diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py
index 846682a666..586c251128 100644
--- a/queue_job/jobrunner/runner.py
+++ b/queue_job/jobrunner/runner.py
@@ -361,23 +361,26 @@ def _query_requeue_dead_jobs(self):
ELSE exc_info
END)
WHERE
- id in (
- SELECT
- queue_job_id
- FROM
- queue_job_lock
- WHERE
- queue_job_id in (
- SELECT
- id
- FROM
- queue_job
- WHERE
- state IN ('enqueued','started')
- AND date_enqueued <
- (now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
- )
- FOR UPDATE SKIP LOCKED
+ state IN ('enqueued','started')
+ AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
+ AND (
+ id in (
+ SELECT
+ queue_job_id
+ FROM
+ queue_job_lock
+ WHERE
+ queue_job_lock.queue_job_id = queue_job.id
+ FOR NO KEY UPDATE SKIP LOCKED
+ )
+ OR NOT EXISTS (
+ SELECT
+ 1
+ FROM
+ queue_job_lock
+ WHERE
+ queue_job_lock.queue_job_id = queue_job.id
+ )
)
RETURNING uuid
"""
@@ -400,6 +403,12 @@ def requeue_dead_jobs(self):
However, when the Odoo server crashes or is otherwise force-stopped,
running jobs are interrupted while the runner has no chance to know
they have been aborted.
+
+ This also handles orphaned jobs (enqueued but never started, no lock).
+ This edge case occurs when the runner marks a job as 'enqueued'
+ but the HTTP request to start the job never reaches the Odoo server
+ (e.g., due to server shutdown/crash between setting enqueued and
+ the controller receiving the request).
"""
with closing(self.conn.cursor()) as cr:
diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py
index df33e2c7c5..d538a2a75c 100644
--- a/queue_job/models/queue_job.py
+++ b/queue_job/models/queue_job.py
@@ -3,6 +3,7 @@
import logging
import random
+import time
from datetime import datetime, timedelta
from odoo import _, api, exceptions, fields, models
@@ -104,6 +105,7 @@ class QueueJob(models.Model):
exec_time = fields.Float(
string="Execution Time (avg)",
group_operator="avg",
+ readonly=True,
help="Time required to execute this job in seconds. Average when grouped.",
)
date_cancelled = fields.Datetime(readonly=True)
@@ -457,7 +459,9 @@ def related_action_open_record(self):
)
return action
- def _test_job(self, failure_rate=0):
+ def _test_job(self, failure_rate=0, job_duration=0):
_logger.info("Running test job.")
if random.random() <= failure_rate:
raise JobError("Job failed")
+ if job_duration:
+ time.sleep(job_duration)
diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html
index 82bed11d0f..1f90212907 100644
--- a/queue_job/static/description/index.html
+++ b/queue_job/static/description/index.html
@@ -372,7 +372,7 @@ Job Queue
!! This file is generated by oca-gen-addon-readme !!
!! changes will be overwritten. !!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-!! source digest: sha256:b92d06dbbf161572f2bf02e0c6a59282cea11cc5e903378094bead986f0125de
+!! source digest: sha256:6846860017b2a564dba3eb31b31d701c7c08c9f2bda739e3e0c3a03e07ef22f9
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -->

This addon adds an integrated Job Queue to Odoo.
@@ -986,8 +986,8 @@
OCA, or the Odoo Community Association, is a nonprofit organization whose
mission is to support the collaborative development of Odoo features and
promote its widespread use.
-Current maintainer:
-
+Current maintainers:
+

This module is part of the OCA/queue project on GitHub.
You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute.
diff --git a/queue_job/tests/__init__.py b/queue_job/tests/__init__.py
index 1062acdc25..16bcdff96b 100644
--- a/queue_job/tests/__init__.py
+++ b/queue_job/tests/__init__.py
@@ -8,4 +8,3 @@
from . import test_model_job_function
from . import test_queue_job_protected_write
from . import test_wizards
-from . import test_requeue_dead_job
diff --git a/test_queue_job/__manifest__.py b/test_queue_job/__manifest__.py
index c3a29bf0c5..42c5851dbe 100644
--- a/test_queue_job/__manifest__.py
+++ b/test_queue_job/__manifest__.py
@@ -3,7 +3,7 @@
{
"name": "Queue Job Tests",
- "version": "16.0.2.4.0",
+ "version": "16.0.2.5.0",
"author": "Camptocamp,Odoo Community Association (OCA)",
"license": "LGPL-3",
"category": "Generic Modules",
@@ -13,6 +13,8 @@
"data/queue_job_channel_data.xml",
"data/queue_job_function_data.xml",
"security/ir.model.access.csv",
+ "data/queue_job_test_job.xml",
],
+ "maintainers": ["sbidoul"],
"installable": True,
}
diff --git a/test_queue_job/data/queue_job_test_job.xml b/test_queue_job/data/queue_job_test_job.xml
new file mode 100644
index 0000000000..8a28ab70a0
--- /dev/null
+++ b/test_queue_job/data/queue_job_test_job.xml
@@ -0,0 +1,18 @@
+
+
+
+
+
+
diff --git a/test_queue_job/models/test_models.py b/test_queue_job/models/test_models.py
index 03fa792137..585e2fd593 100644
--- a/test_queue_job/models/test_models.py
+++ b/test_queue_job/models/test_models.py
@@ -1,6 +1,8 @@
# Copyright 2016 Camptocamp SA
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
+from datetime import datetime, timedelta
+
from odoo import api, fields, models
from odoo.addons.queue_job.delay import chain
@@ -29,6 +31,35 @@ def testing_related__url(self, **kwargs):
"url": kwargs["url"].format(subject=subject),
}
+ @api.model
+ def _create_test_started_job(self, uuid):
+ """Create started jobs to be used within tests"""
+ self.env["queue.job"].with_context(
+ _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL,
+ ).create(
+ {
+ "uuid": uuid,
+ "state": "started",
+ "model_name": "queue.job",
+ "method_name": "write",
+ }
+ )
+
+ @api.model
+ def _create_test_enqueued_job(self, uuid):
+ """Create enqueued jobs to be used within tests"""
+ self.env["queue.job"].with_context(
+ _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL,
+ ).create(
+ {
+ "uuid": uuid,
+ "state": "enqueued",
+ "model_name": "queue.job",
+ "method_name": "write",
+ "date_enqueued": datetime.now() - timedelta(minutes=1),
+ }
+ )
+
class ModelTestQueueJob(models.Model):
@@ -110,6 +141,13 @@ def _register_hook(self):
)
return super()._register_hook()
+ def _unregister_hook(self):
+ """Remove the patches installed by _register_hook()"""
+ self._revert_method("delay_me")
+ self._revert_method("delay_me_options")
+ self._revert_method("delay_me_context_key")
+ return super()._unregister_hook()
+
def _job_store_values(self, job):
value = "JUST_TESTING"
if job.state == "failed":
diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py
index 0405022ce0..0cfacebdf3 100644
--- a/test_queue_job/tests/__init__.py
+++ b/test_queue_job/tests/__init__.py
@@ -1,3 +1,4 @@
+from . import test_acquire_job
from . import test_autovacuum
from . import test_delayable
from . import test_dependencies
@@ -7,3 +8,4 @@
from . import test_job_function
from . import test_related_actions
from . import test_delay_mocks
+from . import test_requeue_dead_job
diff --git a/test_queue_job/tests/common.py b/test_queue_job/tests/common.py
index a32fcc380a..c1f7d88ca0 100644
--- a/test_queue_job/tests/common.py
+++ b/test_queue_job/tests/common.py
@@ -20,3 +20,13 @@ def _create_job(self):
stored = Job.db_record_from_uuid(self.env, test_job.uuid)
self.assertEqual(len(stored), 1)
return stored
+
+ def _get_demo_job(self, uuid):
+ # job created during load of demo data
+ job = self.env["queue.job"].search([("uuid", "=", uuid)], limit=1)
+ self.assertTrue(
+ job,
+ f"Demo data queue job {uuid!r} should be loaded in order "
+ "to make this test work",
+ )
+ return job
diff --git a/test_queue_job/tests/test_acquire_job.py b/test_queue_job/tests/test_acquire_job.py
new file mode 100644
index 0000000000..3f0c92a2be
--- /dev/null
+++ b/test_queue_job/tests/test_acquire_job.py
@@ -0,0 +1,51 @@
+# Copyright 2026 ACSONE SA/NV
+# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
+import logging
+from unittest import mock
+
+from odoo.tests import tagged
+
+from odoo.addons.queue_job.controllers.main import RunJobController
+
+from .common import JobCommonCase
+
+
+@tagged("post_install", "-at_install")
+class TestRequeueDeadJob(JobCommonCase):
+ def test_acquire_enqueued_job(self):
+ job_record = self._get_demo_job(uuid="test_enqueued_job")
+ self.assertFalse(
+ self.env["queue.job.lock"].search(
+ [("queue_job_id", "=", job_record.id)],
+ ),
+ "A job lock record should not exist at this point",
+ )
+ with mock.patch.object(
+ self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all)
+ ) as mock_commit:
+ job = RunJobController._acquire_job(self.env, job_uuid="test_enqueued_job")
+ mock_commit.assert_called_once()
+ self.assertIsNotNone(job)
+ self.assertEqual(job.uuid, "test_enqueued_job")
+ self.assertEqual(job.state, "started")
+ self.assertTrue(
+ self.env["queue.job.lock"].search(
+ [("queue_job_id", "=", job_record.id)]
+ ),
+ "A job lock record should exist at this point",
+ )
+
+ def test_acquire_started_job(self):
+ with (
+ mock.patch.object(
+ self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all)
+ ) as mock_commit,
+ self.assertLogs(level=logging.WARNING) as logs,
+ ):
+ job = RunJobController._acquire_job(self.env, "test_started_job")
+ mock_commit.assert_not_called()
+ self.assertIsNone(job)
+ self.assertIn(
+ "was requested to run job test_started_job, but it does not exist",
+ logs.output[0],
+ )
diff --git a/test_queue_job/tests/test_autovacuum.py b/test_queue_job/tests/test_autovacuum.py
index 09730a4fea..97aebcba1e 100644
--- a/test_queue_job/tests/test_autovacuum.py
+++ b/test_queue_job/tests/test_autovacuum.py
@@ -28,12 +28,16 @@ def test_autovacuum(self):
date_done = datetime.now() - timedelta(days=29)
stored.write({"date_done": date_done})
self.env["queue.job"].autovacuum()
- self.assertEqual(len(self.env["queue.job"].search([])), 1)
+ self.assertEqual(
+ len(self.env["queue.job"].search([("channel", "!=", False)])), 1
+ )
date_done = datetime.now() - timedelta(days=31)
stored.write({"date_done": date_done})
self.env["queue.job"].autovacuum()
- self.assertEqual(len(self.env["queue.job"].search([])), 0)
+ self.assertEqual(
+ len(self.env["queue.job"].search([("channel", "!=", False)])), 0
+ )
def test_autovacuum_multi_channel(self):
root_channel = self.env.ref("queue_job.channel_root")
@@ -48,11 +52,17 @@ def test_autovacuum_multi_channel(self):
{"channel": channel_60days.complete_name, "date_done": date_done}
)
- self.assertEqual(len(self.env["queue.job"].search([])), 2)
+ self.assertEqual(
+ len(self.env["queue.job"].search([("channel", "!=", False)])), 2
+ )
self.env["queue.job"].autovacuum()
- self.assertEqual(len(self.env["queue.job"].search([])), 1)
+ self.assertEqual(
+ len(self.env["queue.job"].search([("channel", "!=", False)])), 1
+ )
date_done = datetime.now() - timedelta(days=61)
job_60days.write({"date_done": date_done})
self.env["queue.job"].autovacuum()
- self.assertEqual(len(self.env["queue.job"].search([])), 0)
+ self.assertEqual(
+ len(self.env["queue.job"].search([("channel", "!=", False)])), 0
+ )
diff --git a/queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py
similarity index 50%
rename from queue_job/tests/test_requeue_dead_job.py
rename to test_queue_job/tests/test_requeue_dead_job.py
index c6c82a2f4d..a267c43c87 100644
--- a/queue_job/tests/test_requeue_dead_job.py
+++ b/test_queue_job/tests/test_requeue_dead_job.py
@@ -3,33 +3,16 @@
from contextlib import closing
from datetime import datetime, timedelta
-from odoo.tests.common import TransactionCase
+from odoo.tests import tagged
from odoo.addons.queue_job.job import Job
from odoo.addons.queue_job.jobrunner.runner import Database
+from .common import JobCommonCase
-class TestRequeueDeadJob(TransactionCase):
- def create_dummy_job(self, uuid):
- """
- Create dummy job for tests
- """
- return (
- self.env["queue.job"]
- .with_context(
- _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL,
- )
- .create(
- {
- "uuid": uuid,
- "user_id": self.env.user.id,
- "state": "pending",
- "model_name": "queue.job",
- "method_name": "write",
- }
- )
- )
+@tagged("post_install", "-at_install")
+class TestRequeueDeadJob(JobCommonCase):
def get_locks(self, uuid, cr=None):
"""
Retrieve lock rows
@@ -52,7 +35,7 @@ def get_locks(self, uuid, cr=None):
WHERE
uuid = %s
)
- FOR UPDATE SKIP LOCKED
+ FOR NO KEY UPDATE SKIP LOCKED
""",
[uuid],
)
@@ -60,7 +43,8 @@ def get_locks(self, uuid, cr=None):
return cr.fetchall()
def test_add_lock_record(self):
- queue_job = self.create_dummy_job("test_add_lock")
+ queue_job = self._get_demo_job("test_started_job")
+ self.assertEqual(len(queue_job), 1)
job_obj = Job.load(self.env, queue_job.uuid)
job_obj.set_started()
@@ -71,19 +55,10 @@ def test_add_lock_record(self):
self.assertEqual(1, len(locks))
def test_lock(self):
- queue_job = self.create_dummy_job("test_lock")
+ queue_job = self._get_demo_job("test_started_job")
job_obj = Job.load(self.env, queue_job.uuid)
job_obj.set_started()
- job_obj.store()
-
- locks = self.get_locks(job_obj.uuid)
-
- self.assertEqual(1, len(locks))
-
- # commit to update queue_job records in DB
- self.env.cr.commit() # pylint: disable=E8102
-
job_obj.lock()
with closing(self.env.registry.cursor()) as new_cr:
@@ -92,42 +67,34 @@ def test_lock(self):
# Row should be locked
self.assertEqual(0, len(locks))
- # clean up
- queue_job.unlink()
-
- self.env.cr.commit() # pylint: disable=E8102
-
- # because we committed the cursor, the savepoint of the test method is
- # gone, and this would break TransactionCase cleanups
- self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id)
-
def test_requeue_dead_jobs(self):
- uuid = "test_requeue_dead_jobs"
-
- queue_job = self.create_dummy_job(uuid)
+ queue_job = self._get_demo_job("test_enqueued_job")
job_obj = Job.load(self.env, queue_job.uuid)
job_obj.set_enqueued()
- # simulate enqueuing was in the past
- job_obj.date_enqueued = datetime.now() - timedelta(minutes=1)
job_obj.set_started()
-
+ job_obj.date_enqueued = datetime.now() - timedelta(minutes=1)
job_obj.store()
- self.env.cr.commit() # pylint: disable=E8102
# requeue dead jobs using current cursor
query = Database(self.env.cr.dbname)._query_requeue_dead_jobs()
self.env.cr.execute(query)
uuids_requeued = self.env.cr.fetchall()
+ self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued)
- self.assertEqual(len(uuids_requeued), 1)
- self.assertEqual(uuids_requeued[0][0], uuid)
+ def test_requeue_orphaned_jobs(self):
+ queue_job = self._get_demo_job("test_enqueued_job")
+ job_obj = Job.load(self.env, queue_job.uuid)
- # clean up
- queue_job.unlink()
- self.env.cr.commit() # pylint: disable=E8102
+ # Only enqueued job, don't set it to started to simulate the scenario
+ # that system shutdown before job is starting
+ job_obj.set_enqueued()
+ job_obj.date_enqueued = datetime.now() - timedelta(minutes=1)
+ job_obj.store()
- # because we committed the cursor, the savepoint of the test method is
- # gone, and this would break TransactionCase cleanups
- self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id)
+ # job is now picked up by the requeue query (which includes orphaned jobs)
+ query = Database(self.env.cr.dbname)._query_requeue_dead_jobs()
+ self.env.cr.execute(query)
+ uuids_requeued = self.env.cr.fetchall()
+ self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued)