diff --git a/README.md b/README.md index 7f7cf199d8..d6bcef339c 100644 --- a/README.md +++ b/README.md @@ -23,13 +23,13 @@ addon | version | maintainers | summary --- | --- | --- | --- [base_export_async](base_export_async/) | 16.0.1.2.0 | | Asynchronous export with job queue [base_import_async](base_import_async/) | 16.0.1.2.1 | | Import CSV files in the background -[queue_job](queue_job/) | 16.0.2.12.0 | guewen | Job Queue +[queue_job](queue_job/) | 16.0.2.13.0 | guewen sbidoul | Job Queue [queue_job_batch](queue_job_batch/) | 16.0.1.0.1 | | Job Queue Batch [queue_job_cron](queue_job_cron/) | 16.0.2.1.0 | | Scheduled Actions as Queue Jobs [queue_job_cron_jobrunner](queue_job_cron_jobrunner/) | 16.0.1.1.0 | ivantodorovich | Run jobs without a dedicated JobRunner [queue_job_subscribe](queue_job_subscribe/) | 16.0.1.1.0 | | Control which users are subscribed to queue job notifications [queue_job_web_notify](queue_job_web_notify/) | 16.0.1.0.0 | | This module allows to display a notification to the related user of a failed job. It uses the web_notify notification feature. -[test_queue_job](test_queue_job/) | 16.0.2.4.0 | | Queue Job Tests +[test_queue_job](test_queue_job/) | 16.0.2.5.0 | sbidoul | Queue Job Tests [test_queue_job_batch](test_queue_job_batch/) | 16.0.1.0.0 | | Test Job Queue Batch diff --git a/queue_job/README.rst b/queue_job/README.rst index f22fd7bc10..ec1ea67d01 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -11,7 +11,7 @@ Job Queue !! This file is generated by oca-gen-addon-readme !! !! changes will be overwritten. !! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! - !! source digest: sha256:b92d06dbbf161572f2bf02e0c6a59282cea11cc5e903378094bead986f0125de + !! source digest: sha256:6846860017b2a564dba3eb31b31d701c7c08c9f2bda739e3e0c3a03e07ef22f9 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! .. |badge1| image:: https://img.shields.io/badge/maturity-Mature-brightgreen.png @@ -697,10 +697,13 @@ promote its widespread use. .. |maintainer-guewen| image:: https://github.com/guewen.png?size=40px :target: https://github.com/guewen :alt: guewen +.. |maintainer-sbidoul| image:: https://github.com/sbidoul.png?size=40px + :target: https://github.com/sbidoul + :alt: sbidoul -Current `maintainer `__: +Current `maintainers `__: -|maintainer-guewen| +|maintainer-guewen| |maintainer-sbidoul| This module is part of the `OCA/queue `_ project on GitHub. diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index f32b20e2e2..33e2e68ed8 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -2,7 +2,7 @@ { "name": "Job Queue", - "version": "16.0.2.12.0", + "version": "16.0.2.13.0", "author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)", "website": "https://github.com/OCA/queue", "license": "LGPL-3", @@ -29,7 +29,7 @@ }, "installable": True, "development_status": "Mature", - "maintainers": ["guewen"], + "maintainers": ["guewen", "sbidoul"], "post_init_hook": "post_init_hook", "post_load": "post_load", } diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 4addf1be23..adc450d52d 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -26,15 +26,48 @@ class RunJobController(http.Controller): - def _try_perform_job(self, env, job): - """Try to perform the job.""" + @classmethod + def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: + """Acquire a job for execution. + + - make sure it is in ENQUEUED state + - mark it as STARTED and commit the state change + - acquire the job lock + + If successful, return the Job instance, otherwise return None. This + function may fail to acquire the job is not in the expected state or is + already locked by another worker. + """ + env.cr.execute( + "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s " + "FOR NO KEY UPDATE SKIP LOCKED", + (job_uuid, ENQUEUED), + ) + if not env.cr.fetchone(): + _logger.warning( + "was requested to run job %s, but it does not exist, " + "or is not in state %s, or is being handled by another worker", + job_uuid, + ENQUEUED, + ) + return None + job = Job.load(env, job_uuid) + assert job and job.state == ENQUEUED job.set_started() job.store() env.cr.commit() - job.lock() + if not job.lock(): + _logger.warning( + "was requested to run job %s, but it could not be locked", + job_uuid, + ) + return None + return job + @classmethod + def _try_perform_job(cls, env, job): + """Try to perform the job, mark it done and commit if successful.""" _logger.debug("%s started", job) - job.perform() # Triggers any stored computed fields before calling 'set_done' # so that will be part of the 'exec_time' @@ -45,18 +78,20 @@ def _try_perform_job(self, env, job): env.cr.commit() _logger.debug("%s done", job) - def _enqueue_dependent_jobs(self, env, job): + @classmethod + def _enqueue_dependent_jobs(cls, env, job): tries = 0 while True: try: - job.enqueue_waiting() + with job.env.cr.savepoint(): + job.enqueue_waiting() except OperationalError as err: # Automatically retry the typical transaction serialization # errors if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: raise if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE: - _logger.info( + _logger.error( "%s, maximum number of tries reached to update dependencies", errorcodes.lookup(err.pgcode), ) @@ -74,17 +109,8 @@ def _enqueue_dependent_jobs(self, env, job): else: break - @http.route( - "/queue_job/runjob", - type="http", - auth="none", - save_session=False, - readonly=False, - ) - def runjob(self, db, job_uuid, **kw): - http.request.session.db = db - env = http.request.env(user=SUPERUSER_ID) - + @classmethod + def _runjob(cls, env: api.Environment, job: Job) -> None: def retry_postpone(job, message, seconds=None): job.env.clear() with registry(job.env.cr.dbname).cursor() as new_cr: @@ -93,26 +119,9 @@ def retry_postpone(job, message, seconds=None): job.set_pending(reset_retry=False) job.store() - # ensure the job to run is in the correct state and lock the record - env.cr.execute( - "SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE", - (job_uuid, ENQUEUED), - ) - if not env.cr.fetchone(): - _logger.warning( - "was requested to run job %s, but it does not exist, " - "or is not in state %s", - job_uuid, - ENQUEUED, - ) - return "" - - job = Job.load(env, job_uuid) - assert job and job.state == ENQUEUED - try: try: - self._try_perform_job(env, job) + cls._try_perform_job(env, job) except OperationalError as err: # Automatically retry the typical transaction serialization # errors @@ -141,7 +150,6 @@ def retry_postpone(job, message, seconds=None): # traceback in the logs we should have the traceback when all # retries are exhausted env.cr.rollback() - return "" except (FailedJobError, Exception) as orig_exception: buff = StringIO() @@ -151,19 +159,18 @@ def retry_postpone(job, message, seconds=None): job.env.clear() with registry(job.env.cr.dbname).cursor() as new_cr: job.env = job.env(cr=new_cr) - vals = self._get_failure_values(job, traceback_txt, orig_exception) + vals = cls._get_failure_values(job, traceback_txt, orig_exception) job.set_failed(**vals) job.store() buff.close() raise _logger.debug("%s enqueue depends started", job) - self._enqueue_dependent_jobs(env, job) + cls._enqueue_dependent_jobs(env, job) _logger.debug("%s enqueue depends done", job) - return "" - - def _get_failure_values(self, job, traceback_txt, orig_exception): + @classmethod + def _get_failure_values(cls, job, traceback_txt, orig_exception): """Collect relevant data from exception.""" exception_name = orig_exception.__class__.__name__ if hasattr(orig_exception, "__module__"): @@ -177,6 +184,22 @@ def _get_failure_values(self, job, traceback_txt, orig_exception): "exc_message": exc_message, } + @http.route( + "/queue_job/runjob", + type="http", + auth="none", + save_session=False, + readonly=False, + ) + def runjob(self, db, job_uuid, **kw): + http.request.session.db = db + env = http.request.env(user=SUPERUSER_ID) + job = self._acquire_job(env, job_uuid) + if not job: + return "" + self._runjob(env, job) + return "" + # flake8: noqa: C901 @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( @@ -187,6 +210,7 @@ def create_test_job( description="Test job", size=1, failure_rate=0, + job_duration=0, ): """Create test jobs @@ -207,6 +231,12 @@ def create_test_job( except (ValueError, TypeError): failure_rate = 0 + if job_duration is not None: + try: + job_duration = float(job_duration) + except (ValueError, TypeError): + job_duration = 0 + if not (0 <= failure_rate <= 1): raise BadRequest("failure_rate must be between 0 and 1") @@ -235,6 +265,7 @@ def create_test_job( channel=channel, description=description, failure_rate=failure_rate, + job_duration=job_duration, ) if size > 1: @@ -245,6 +276,7 @@ def create_test_job( channel=channel, description=description, failure_rate=failure_rate, + job_duration=job_duration, ) return "" @@ -256,6 +288,7 @@ def _create_single_test_job( description="Test job", size=1, failure_rate=0, + job_duration=0, ): delayed = ( http.request.env["queue.job"] @@ -265,7 +298,7 @@ def _create_single_test_job( channel=channel, description=description, ) - ._test_job(failure_rate=failure_rate) + ._test_job(failure_rate=failure_rate, job_duration=job_duration) ) return "job uuid: %s" % (delayed.db_record().uuid,) @@ -279,6 +312,7 @@ def _create_graph_test_jobs( channel=None, description="Test job", failure_rate=0, + job_duration=0, ): model = http.request.env["queue.job"] current_count = 0 @@ -301,7 +335,7 @@ def _create_graph_test_jobs( max_retries=max_retries, channel=channel, description="%s #%d" % (description, current_count), - )._test_job(failure_rate=failure_rate) + )._test_job(failure_rate=failure_rate, job_duration=job_duration) ) grouping = random.choice(possible_grouping_methods) diff --git a/queue_job/job.py b/queue_job/job.py index 790e07d90e..86407be3bb 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -236,7 +236,7 @@ def load_many(cls, env, job_uuids): recordset = cls.db_records_from_uuids(env, job_uuids) return {cls._load_from_db_record(record) for record in recordset} - def add_lock_record(self): + def add_lock_record(self) -> None: """ Create row in db to be locked while the job is being performed. """ @@ -256,13 +256,11 @@ def add_lock_record(self): [self.uuid], ) - def lock(self): - """ - Lock row of job that is being performed + def lock(self) -> bool: + """Lock row of job that is being performed. - If a job cannot be locked, - it means that the job wasn't started, - a RetryableJobError is thrown. + Return False if a job cannot be locked: it means that the job is not in + STARTED state or is already locked by another worker. """ self.env.cr.execute( """ @@ -278,18 +276,15 @@ def lock(self): queue_job WHERE uuid = %s - AND state='started' + AND state = %s ) - FOR UPDATE; + FOR NO KEY UPDATE SKIP LOCKED; """, - [self.uuid], + [self.uuid, STARTED], ) # 1 job should be locked - if 1 != len(self.env.cr.fetchall()): - raise RetryableJobError( - f"Trying to lock job that wasn't started, uuid: {self.uuid}" - ) + return bool(self.env.cr.fetchall()) @classmethod def _load_from_db_record(cls, job_db_record): diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 846682a666..586c251128 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -361,23 +361,26 @@ def _query_requeue_dead_jobs(self): ELSE exc_info END) WHERE - id in ( - SELECT - queue_job_id - FROM - queue_job_lock - WHERE - queue_job_id in ( - SELECT - id - FROM - queue_job - WHERE - state IN ('enqueued','started') - AND date_enqueued < - (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') - ) - FOR UPDATE SKIP LOCKED + state IN ('enqueued','started') + AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') + AND ( + id in ( + SELECT + queue_job_id + FROM + queue_job_lock + WHERE + queue_job_lock.queue_job_id = queue_job.id + FOR NO KEY UPDATE SKIP LOCKED + ) + OR NOT EXISTS ( + SELECT + 1 + FROM + queue_job_lock + WHERE + queue_job_lock.queue_job_id = queue_job.id + ) ) RETURNING uuid """ @@ -400,6 +403,12 @@ def requeue_dead_jobs(self): However, when the Odoo server crashes or is otherwise force-stopped, running jobs are interrupted while the runner has no chance to know they have been aborted. + + This also handles orphaned jobs (enqueued but never started, no lock). + This edge case occurs when the runner marks a job as 'enqueued' + but the HTTP request to start the job never reaches the Odoo server + (e.g., due to server shutdown/crash between setting enqueued and + the controller receiving the request). """ with closing(self.conn.cursor()) as cr: diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index df33e2c7c5..d538a2a75c 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -3,6 +3,7 @@ import logging import random +import time from datetime import datetime, timedelta from odoo import _, api, exceptions, fields, models @@ -104,6 +105,7 @@ class QueueJob(models.Model): exec_time = fields.Float( string="Execution Time (avg)", group_operator="avg", + readonly=True, help="Time required to execute this job in seconds. Average when grouped.", ) date_cancelled = fields.Datetime(readonly=True) @@ -457,7 +459,9 @@ def related_action_open_record(self): ) return action - def _test_job(self, failure_rate=0): + def _test_job(self, failure_rate=0, job_duration=0): _logger.info("Running test job.") if random.random() <= failure_rate: raise JobError("Job failed") + if job_duration: + time.sleep(job_duration) diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index 82bed11d0f..1f90212907 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -372,7 +372,7 @@

Job Queue

!! This file is generated by oca-gen-addon-readme !! !! changes will be overwritten. !! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -!! source digest: sha256:b92d06dbbf161572f2bf02e0c6a59282cea11cc5e903378094bead986f0125de +!! source digest: sha256:6846860017b2a564dba3eb31b31d701c7c08c9f2bda739e3e0c3a03e07ef22f9 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -->

Mature License: LGPL-3 OCA/queue Translate me on Weblate Try me on Runboat

This addon adds an integrated Job Queue to Odoo.

@@ -986,8 +986,8 @@

Maintainers

OCA, or the Odoo Community Association, is a nonprofit organization whose mission is to support the collaborative development of Odoo features and promote its widespread use.

-

Current maintainer:

-

guewen

+

Current maintainers:

+

guewen sbidoul

This module is part of the OCA/queue project on GitHub.

You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute.

diff --git a/queue_job/tests/__init__.py b/queue_job/tests/__init__.py index 1062acdc25..16bcdff96b 100644 --- a/queue_job/tests/__init__.py +++ b/queue_job/tests/__init__.py @@ -8,4 +8,3 @@ from . import test_model_job_function from . import test_queue_job_protected_write from . import test_wizards -from . import test_requeue_dead_job diff --git a/test_queue_job/__manifest__.py b/test_queue_job/__manifest__.py index c3a29bf0c5..42c5851dbe 100644 --- a/test_queue_job/__manifest__.py +++ b/test_queue_job/__manifest__.py @@ -3,7 +3,7 @@ { "name": "Queue Job Tests", - "version": "16.0.2.4.0", + "version": "16.0.2.5.0", "author": "Camptocamp,Odoo Community Association (OCA)", "license": "LGPL-3", "category": "Generic Modules", @@ -13,6 +13,8 @@ "data/queue_job_channel_data.xml", "data/queue_job_function_data.xml", "security/ir.model.access.csv", + "data/queue_job_test_job.xml", ], + "maintainers": ["sbidoul"], "installable": True, } diff --git a/test_queue_job/data/queue_job_test_job.xml b/test_queue_job/data/queue_job_test_job.xml new file mode 100644 index 0000000000..8a28ab70a0 --- /dev/null +++ b/test_queue_job/data/queue_job_test_job.xml @@ -0,0 +1,18 @@ + + + + + + diff --git a/test_queue_job/models/test_models.py b/test_queue_job/models/test_models.py index 03fa792137..585e2fd593 100644 --- a/test_queue_job/models/test_models.py +++ b/test_queue_job/models/test_models.py @@ -1,6 +1,8 @@ # Copyright 2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +from datetime import datetime, timedelta + from odoo import api, fields, models from odoo.addons.queue_job.delay import chain @@ -29,6 +31,35 @@ def testing_related__url(self, **kwargs): "url": kwargs["url"].format(subject=subject), } + @api.model + def _create_test_started_job(self, uuid): + """Create started jobs to be used within tests""" + self.env["queue.job"].with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ).create( + { + "uuid": uuid, + "state": "started", + "model_name": "queue.job", + "method_name": "write", + } + ) + + @api.model + def _create_test_enqueued_job(self, uuid): + """Create enqueued jobs to be used within tests""" + self.env["queue.job"].with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ).create( + { + "uuid": uuid, + "state": "enqueued", + "model_name": "queue.job", + "method_name": "write", + "date_enqueued": datetime.now() - timedelta(minutes=1), + } + ) + class ModelTestQueueJob(models.Model): @@ -110,6 +141,13 @@ def _register_hook(self): ) return super()._register_hook() + def _unregister_hook(self): + """Remove the patches installed by _register_hook()""" + self._revert_method("delay_me") + self._revert_method("delay_me_options") + self._revert_method("delay_me_context_key") + return super()._unregister_hook() + def _job_store_values(self, job): value = "JUST_TESTING" if job.state == "failed": diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index 0405022ce0..0cfacebdf3 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -1,3 +1,4 @@ +from . import test_acquire_job from . import test_autovacuum from . import test_delayable from . import test_dependencies @@ -7,3 +8,4 @@ from . import test_job_function from . import test_related_actions from . import test_delay_mocks +from . import test_requeue_dead_job diff --git a/test_queue_job/tests/common.py b/test_queue_job/tests/common.py index a32fcc380a..c1f7d88ca0 100644 --- a/test_queue_job/tests/common.py +++ b/test_queue_job/tests/common.py @@ -20,3 +20,13 @@ def _create_job(self): stored = Job.db_record_from_uuid(self.env, test_job.uuid) self.assertEqual(len(stored), 1) return stored + + def _get_demo_job(self, uuid): + # job created during load of demo data + job = self.env["queue.job"].search([("uuid", "=", uuid)], limit=1) + self.assertTrue( + job, + f"Demo data queue job {uuid!r} should be loaded in order " + "to make this test work", + ) + return job diff --git a/test_queue_job/tests/test_acquire_job.py b/test_queue_job/tests/test_acquire_job.py new file mode 100644 index 0000000000..3f0c92a2be --- /dev/null +++ b/test_queue_job/tests/test_acquire_job.py @@ -0,0 +1,51 @@ +# Copyright 2026 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +import logging +from unittest import mock + +from odoo.tests import tagged + +from odoo.addons.queue_job.controllers.main import RunJobController + +from .common import JobCommonCase + + +@tagged("post_install", "-at_install") +class TestRequeueDeadJob(JobCommonCase): + def test_acquire_enqueued_job(self): + job_record = self._get_demo_job(uuid="test_enqueued_job") + self.assertFalse( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)], + ), + "A job lock record should not exist at this point", + ) + with mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit: + job = RunJobController._acquire_job(self.env, job_uuid="test_enqueued_job") + mock_commit.assert_called_once() + self.assertIsNotNone(job) + self.assertEqual(job.uuid, "test_enqueued_job") + self.assertEqual(job.state, "started") + self.assertTrue( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)] + ), + "A job lock record should exist at this point", + ) + + def test_acquire_started_job(self): + with ( + mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit, + self.assertLogs(level=logging.WARNING) as logs, + ): + job = RunJobController._acquire_job(self.env, "test_started_job") + mock_commit.assert_not_called() + self.assertIsNone(job) + self.assertIn( + "was requested to run job test_started_job, but it does not exist", + logs.output[0], + ) diff --git a/test_queue_job/tests/test_autovacuum.py b/test_queue_job/tests/test_autovacuum.py index 09730a4fea..97aebcba1e 100644 --- a/test_queue_job/tests/test_autovacuum.py +++ b/test_queue_job/tests/test_autovacuum.py @@ -28,12 +28,16 @@ def test_autovacuum(self): date_done = datetime.now() - timedelta(days=29) stored.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 1) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 1 + ) date_done = datetime.now() - timedelta(days=31) stored.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 0) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 0 + ) def test_autovacuum_multi_channel(self): root_channel = self.env.ref("queue_job.channel_root") @@ -48,11 +52,17 @@ def test_autovacuum_multi_channel(self): {"channel": channel_60days.complete_name, "date_done": date_done} ) - self.assertEqual(len(self.env["queue.job"].search([])), 2) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 2 + ) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 1) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 1 + ) date_done = datetime.now() - timedelta(days=61) job_60days.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 0) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 0 + ) diff --git a/queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py similarity index 50% rename from queue_job/tests/test_requeue_dead_job.py rename to test_queue_job/tests/test_requeue_dead_job.py index c6c82a2f4d..a267c43c87 100644 --- a/queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -3,33 +3,16 @@ from contextlib import closing from datetime import datetime, timedelta -from odoo.tests.common import TransactionCase +from odoo.tests import tagged from odoo.addons.queue_job.job import Job from odoo.addons.queue_job.jobrunner.runner import Database +from .common import JobCommonCase -class TestRequeueDeadJob(TransactionCase): - def create_dummy_job(self, uuid): - """ - Create dummy job for tests - """ - return ( - self.env["queue.job"] - .with_context( - _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, - ) - .create( - { - "uuid": uuid, - "user_id": self.env.user.id, - "state": "pending", - "model_name": "queue.job", - "method_name": "write", - } - ) - ) +@tagged("post_install", "-at_install") +class TestRequeueDeadJob(JobCommonCase): def get_locks(self, uuid, cr=None): """ Retrieve lock rows @@ -52,7 +35,7 @@ def get_locks(self, uuid, cr=None): WHERE uuid = %s ) - FOR UPDATE SKIP LOCKED + FOR NO KEY UPDATE SKIP LOCKED """, [uuid], ) @@ -60,7 +43,8 @@ def get_locks(self, uuid, cr=None): return cr.fetchall() def test_add_lock_record(self): - queue_job = self.create_dummy_job("test_add_lock") + queue_job = self._get_demo_job("test_started_job") + self.assertEqual(len(queue_job), 1) job_obj = Job.load(self.env, queue_job.uuid) job_obj.set_started() @@ -71,19 +55,10 @@ def test_add_lock_record(self): self.assertEqual(1, len(locks)) def test_lock(self): - queue_job = self.create_dummy_job("test_lock") + queue_job = self._get_demo_job("test_started_job") job_obj = Job.load(self.env, queue_job.uuid) job_obj.set_started() - job_obj.store() - - locks = self.get_locks(job_obj.uuid) - - self.assertEqual(1, len(locks)) - - # commit to update queue_job records in DB - self.env.cr.commit() # pylint: disable=E8102 - job_obj.lock() with closing(self.env.registry.cursor()) as new_cr: @@ -92,42 +67,34 @@ def test_lock(self): # Row should be locked self.assertEqual(0, len(locks)) - # clean up - queue_job.unlink() - - self.env.cr.commit() # pylint: disable=E8102 - - # because we committed the cursor, the savepoint of the test method is - # gone, and this would break TransactionCase cleanups - self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) - def test_requeue_dead_jobs(self): - uuid = "test_requeue_dead_jobs" - - queue_job = self.create_dummy_job(uuid) + queue_job = self._get_demo_job("test_enqueued_job") job_obj = Job.load(self.env, queue_job.uuid) job_obj.set_enqueued() - # simulate enqueuing was in the past - job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) job_obj.set_started() - + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) job_obj.store() - self.env.cr.commit() # pylint: disable=E8102 # requeue dead jobs using current cursor query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() self.env.cr.execute(query) uuids_requeued = self.env.cr.fetchall() + self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued) - self.assertEqual(len(uuids_requeued), 1) - self.assertEqual(uuids_requeued[0][0], uuid) + def test_requeue_orphaned_jobs(self): + queue_job = self._get_demo_job("test_enqueued_job") + job_obj = Job.load(self.env, queue_job.uuid) - # clean up - queue_job.unlink() - self.env.cr.commit() # pylint: disable=E8102 + # Only enqueued job, don't set it to started to simulate the scenario + # that system shutdown before job is starting + job_obj.set_enqueued() + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.store() - # because we committed the cursor, the savepoint of the test method is - # gone, and this would break TransactionCase cleanups - self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) + # job is now picked up by the requeue query (which includes orphaned jobs) + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + uuids_requeued = self.env.cr.fetchall() + self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued)