diff --git a/README.md b/README.md index 902aef02e1..4fa5be4e2b 100644 --- a/README.md +++ b/README.md @@ -22,11 +22,11 @@ Available addons addon | version | maintainers | summary --- | --- | --- | --- [base_import_async](base_import_async/) | 17.0.1.0.0 | | Import CSV files in the background -[queue_job](queue_job/) | 17.0.1.4.3 | guewen | Job Queue +[queue_job](queue_job/) | 17.0.1.5.0 | guewen | Job Queue [queue_job_cron](queue_job_cron/) | 17.0.1.1.0 | | Scheduled Actions as Queue Jobs [queue_job_cron_jobrunner](queue_job_cron_jobrunner/) | 17.0.1.1.0 | ivantodorovich | Run jobs without a dedicated JobRunner [queue_job_subscribe](queue_job_subscribe/) | 17.0.1.0.0 | | Control which users are subscribed to queue job notifications -[test_queue_job](test_queue_job/) | 17.0.1.1.0 | | Queue Job Tests +[test_queue_job](test_queue_job/) | 17.0.1.2.0 | | Queue Job Tests [//]: # (end addons) diff --git a/queue_job/README.rst b/queue_job/README.rst index 37e9529ff2..c6950adcc9 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -11,7 +11,7 @@ Job Queue !! This file is generated by oca-gen-addon-readme !! !! changes will be overwritten. !! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! - !! source digest: sha256:5c7d6692e11a4d5d03e2fd23cfcf6068d23e685be5e1bc7f760de482512c3083 + !! source digest: sha256:10e03ffe452b93247cdca483f5d4597ae8d6f572bc00de63bcc7f7238d2ce33d !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! .. |badge1| image:: https://img.shields.io/badge/maturity-Mature-brightgreen.png diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index c61f234e8c..b552e7d52a 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -2,7 +2,7 @@ { "name": "Job Queue", - "version": "17.0.1.4.3", + "version": "17.0.1.5.0", "author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)", "website": "https://github.com/OCA/queue", "license": "LGPL-3", diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index f9d54fed0f..5bebf823ca 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -74,7 +74,13 @@ def _enqueue_dependent_jobs(self, env, job): else: break - @http.route("/queue_job/runjob", type="http", auth="none", save_session=False) + @http.route( + "/queue_job/runjob", + type="http", + auth="none", + save_session=False, + readonly=False, + ) def runjob(self, db, job_uuid, **kw): http.request.session.db = db env = http.request.env(user=SUPERUSER_ID) diff --git a/queue_job/i18n/de.po b/queue_job/i18n/de.po index f575a55297..500a6574b3 100644 --- a/queue_job/i18n/de.po +++ b/queue_job/i18n/de.po @@ -661,6 +661,7 @@ msgstr "Ausstehend" #. module: queue_job #: model:ir.model.fields,field_description:queue_job.field_queue_job__priority +#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search msgid "Priority" msgstr "Priorität" @@ -901,6 +902,11 @@ msgstr "" msgid "Time required to execute this job in seconds. Average when grouped." msgstr "" +#. module: queue_job +#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search +msgid "Tried many times" +msgstr "" + #. module: queue_job #: model:ir.model.fields,help:queue_job.field_queue_job__activity_exception_decoration msgid "Type of the exception activity on record." diff --git a/queue_job/i18n/es.po b/queue_job/i18n/es.po index 5d599f54c9..d92889f6c1 100644 --- a/queue_job/i18n/es.po +++ b/queue_job/i18n/es.po @@ -669,6 +669,7 @@ msgstr "Pendiente" #. module: queue_job #: model:ir.model.fields,field_description:queue_job.field_queue_job__priority +#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search msgid "Priority" msgstr "Prioridad" @@ -913,6 +914,11 @@ msgstr "" "Tiempo requerido para ejecutar este trabajo en segundos. Promedio cuando se " "agrupa." +#. module: queue_job +#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search +msgid "Tried many times" +msgstr "" + #. module: queue_job #: model:ir.model.fields,help:queue_job.field_queue_job__activity_exception_decoration msgid "Type of the exception activity on record." diff --git a/queue_job/i18n/it.po b/queue_job/i18n/it.po index 9697a44886..454b235a9d 100644 --- a/queue_job/i18n/it.po +++ b/queue_job/i18n/it.po @@ -668,6 +668,7 @@ msgstr "In attesa" #. module: queue_job #: model:ir.model.fields,field_description:queue_job.field_queue_job__priority +#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search msgid "Priority" msgstr "Priorità" @@ -910,6 +911,11 @@ msgid "Time required to execute this job in seconds. Average when grouped." msgstr "" "Tempo in secondi richiesto per eseguire il lavoro. Medio quando raggruppati." +#. module: queue_job +#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search +msgid "Tried many times" +msgstr "" + #. module: queue_job #: model:ir.model.fields,help:queue_job.field_queue_job__activity_exception_decoration msgid "Type of the exception activity on record." diff --git a/queue_job/i18n/queue_job.pot b/queue_job/i18n/queue_job.pot index fc8e2bbbdb..682dc3d201 100644 --- a/queue_job/i18n/queue_job.pot +++ b/queue_job/i18n/queue_job.pot @@ -651,6 +651,7 @@ msgstr "" #. module: queue_job #: model:ir.model.fields,field_description:queue_job.field_queue_job__priority +#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search msgid "Priority" msgstr "" @@ -877,6 +878,11 @@ msgstr "" msgid "Time required to execute this job in seconds. Average when grouped." msgstr "" +#. module: queue_job +#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search +msgid "Tried many times" +msgstr "" + #. module: queue_job #: model:ir.model.fields,help:queue_job.field_queue_job__activity_exception_decoration msgid "Type of the exception activity on record." diff --git a/queue_job/i18n/zh_CN.po b/queue_job/i18n/zh_CN.po index 804ca86780..af9d046e6a 100644 --- a/queue_job/i18n/zh_CN.po +++ b/queue_job/i18n/zh_CN.po @@ -666,6 +666,7 @@ msgstr "等待" #. module: queue_job #: model:ir.model.fields,field_description:queue_job.field_queue_job__priority +#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search msgid "Priority" msgstr "优先级" @@ -904,6 +905,11 @@ msgstr "时间(秒)" msgid "Time required to execute this job in seconds. Average when grouped." msgstr "以秒为单位执行此任务所需的时间。分组时为平均值。" +#. module: queue_job +#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search +msgid "Tried many times" +msgstr "" + #. module: queue_job #: model:ir.model.fields,help:queue_job.field_queue_job__activity_exception_decoration msgid "Type of the exception activity on record." diff --git a/queue_job/job.py b/queue_job/job.py index a473be5cd0..594f1948ab 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -9,7 +9,6 @@ import uuid import weakref from datetime import datetime, timedelta -from functools import total_ordering from random import randint import odoo @@ -104,7 +103,6 @@ def identity_exact_hasher(job_): return hasher -@total_ordering class Job: """A Job is a task to execute. It is the in-memory representation of a job. @@ -367,65 +365,6 @@ def job_record_with_same_identity_key(self): ) return existing - # TODO to deprecate (not called anymore) - @classmethod - def enqueue( - cls, - func, - args=None, - kwargs=None, - priority=None, - eta=None, - max_retries=None, - description=None, - channel=None, - identity_key=None, - ): - """Create a Job and enqueue it in the queue. Return the job uuid. - - This expects the arguments specific to the job to be already extracted - from the ones to pass to the job function. - - If the identity key is the same than the one in a pending job, - no job is created and the existing job is returned - - """ - new_job = cls( - func=func, - args=args, - kwargs=kwargs, - priority=priority, - eta=eta, - max_retries=max_retries, - description=description, - channel=channel, - identity_key=identity_key, - ) - return new_job._enqueue_job() - - # TODO to deprecate (not called anymore) - def _enqueue_job(self): - if self.identity_key: - existing = self.job_record_with_same_identity_key() - if existing: - _logger.debug( - "a job has not been enqueued due to having " - "the same identity key (%s) than job %s", - self.identity_key, - existing.uuid, - ) - return Job._load_from_db_record(existing) - self.store() - _logger.debug( - "enqueued %s:%s(*%r, **%r) with uuid: %s", - self.recordset, - self.method_name, - self.args, - self.kwargs, - self.uuid, - ) - return self - @staticmethod def db_record_from_uuid(env, job_uuid): # TODO remove in 15.0 or 16.0 @@ -749,16 +688,6 @@ def __eq__(self, other): def __hash__(self): return self.uuid.__hash__() - def sorting_key(self): - return self.eta, self.priority, self.date_created, self.seq - - def __lt__(self, other): - if self.eta and not other.eta: - return True - elif not self.eta and other.eta: - return False - return self.sorting_key() < other.sorting_key() - def db_record(self): return self.db_records_from_uuids(self.env, [self.uuid]) diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index 6e33a73189..cb0a23f01f 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -2,6 +2,7 @@ # Copyright 2015-2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) import logging +from collections import namedtuple from functools import total_ordering from heapq import heappop, heappush from weakref import WeakValueDictionary @@ -10,6 +11,7 @@ from ..job import CANCELLED, DONE, ENQUEUED, FAILED, PENDING, STARTED, WAIT_DEPENDENCIES NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED) +JobSortingKey = namedtuple("SortingKey", "eta priority date_created seq") _logger = logging.getLogger(__name__) @@ -108,7 +110,7 @@ class ChannelJob: job that are necessary to prioritise them. Channel jobs are comparable according to the following rules: - * jobs with an eta come before all other jobs + * jobs with an eta cannot be compared with jobs without * then jobs with a smaller eta come first * then jobs with a smaller priority come first * then jobs with a smaller creation time come first @@ -135,14 +137,18 @@ class ChannelJob: >>> j3 < j1 True - j4 and j5 comes even before j3, because they have an eta + j4 and j5 have an eta, they cannot be compared with j3 >>> j4 = ChannelJob(None, None, 4, ... seq=0, date_created=4, priority=9, eta=9) >>> j5 = ChannelJob(None, None, 5, ... seq=0, date_created=5, priority=9, eta=9) - >>> j4 < j5 < j3 + >>> j4 < j5 True + >>> j4 < j3 + Traceback (most recent call last): + ... + TypeError: '<' not supported between instances of 'int' and 'NoneType' j6 has same date_created and priority as j5 but a smaller eta @@ -153,7 +159,7 @@ class ChannelJob: Here is the complete suite: - >>> j6 < j4 < j5 < j3 < j1 < j2 + >>> j6 < j4 < j5 and j3 < j1 < j2 True j0 has the same properties as j1 but they are not considered @@ -173,14 +179,13 @@ class ChannelJob: """ + __slots__ = ("db_name", "channel", "uuid", "_sorting_key", "__weakref__") + def __init__(self, db_name, channel, uuid, seq, date_created, priority, eta): self.db_name = db_name self.channel = channel self.uuid = uuid - self.seq = seq - self.date_created = date_created - self.priority = priority - self.eta = eta + self._sorting_key = JobSortingKey(eta, priority, date_created, seq) def __repr__(self): return "" % self.uuid @@ -191,18 +196,36 @@ def __eq__(self, other): def __hash__(self): return id(self) + def set_no_eta(self): + self._sorting_key = JobSortingKey(None, *self._sorting_key[1:]) + + @property + def seq(self): + return self._sorting_key.seq + + @property + def date_created(self): + return self._sorting_key.date_created + + @property + def priority(self): + return self._sorting_key.priority + + @property + def eta(self): + return self._sorting_key.eta + def sorting_key(self): - return self.eta, self.priority, self.date_created, self.seq + # DEPRECATED + return self._sorting_key def sorting_key_ignoring_eta(self): - return self.priority, self.date_created, self.seq + return self._sorting_key[1:] def __lt__(self, other): - if self.eta and not other.eta: - return True - elif not self.eta and other.eta: - return False - return self.sorting_key() < other.sorting_key() + # Do not compare job where ETA is set with job where it is not + # If one job 'eta' is set, and the other is None, it raises TypeError + return self._sorting_key < other._sorting_key class ChannelQueue: @@ -312,7 +335,7 @@ def remove(self, job): def pop(self, now): while self._eta_queue and self._eta_queue[0].eta <= now: eta_job = self._eta_queue.pop() - eta_job.eta = None + eta_job.set_no_eta() self._queue.add(eta_job) if self.sequential and self._eta_queue and self._queue: eta_job = self._eta_queue[0] diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index a0db6751db..bb3556d60f 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -438,6 +438,17 @@ def __init__( self._stop = False self._stop_pipe = os.pipe() + def __del__(self): + # pylint: disable=except-pass + try: + os.close(self._stop_pipe[0]) + except OSError: + pass + try: + os.close(self._stop_pipe[1]) + except OSError: + pass + @classmethod def from_environ_or_config(cls): scheme = os.environ.get("ODOO_QUEUE_JOB_SCHEME") or queue_job_config.get( diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 55ee7e526c..42ddf8c930 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -6,7 +6,7 @@ from datetime import datetime, timedelta from odoo import _, api, exceptions, fields, models -from odoo.tools import config, html_escape +from odoo.tools import config, html_escape, index_exists from odoo.addons.base_sparse_field.models.fields import Serialized @@ -91,7 +91,7 @@ class QueueJob(models.Model): func_string = fields.Char(string="Task", readonly=True) state = fields.Selection(STATES, readonly=True, required=True, index=True) - priority = fields.Integer() + priority = fields.Integer(group_operator=False) exc_name = fields.Char(string="Exception", readonly=True) exc_message = fields.Char(string="Exception Message", readonly=True, tracking=True) exc_info = fields.Text(string="Exception Info", readonly=True) @@ -130,16 +130,21 @@ class QueueJob(models.Model): worker_pid = fields.Integer(readonly=True) def init(self): - self._cr.execute( - "SELECT indexname FROM pg_indexes WHERE indexname = %s ", - ("queue_job_identity_key_state_partial_index",), - ) - if not self._cr.fetchone(): + index_1 = "queue_job_identity_key_state_partial_index" + index_2 = "queue_job_channel_date_done_date_created_index" + if not index_exists(self._cr, index_1): + # Used by Job.job_record_with_same_identity_key self._cr.execute( "CREATE INDEX queue_job_identity_key_state_partial_index " "ON queue_job (identity_key) WHERE state in ('pending', " "'enqueued', 'wait_dependencies') AND identity_key IS NOT NULL;" ) + if not index_exists(self._cr, index_2): + # Used by .autovacuum + self._cr.execute( + "CREATE INDEX queue_job_channel_date_done_date_created_index " + "ON queue_job (channel, date_done, date_created);" + ) @api.depends("records") def _compute_record_ids(self): @@ -405,6 +410,7 @@ def autovacuum(self): ("date_cancelled", "<=", deadline), ("channel", "=", channel.complete_name), ], + order="date_done, date_created", limit=1000, ) if jobs: diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index 5d64e6a9d2..627abec65a 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -372,7 +372,7 @@

Job Queue

!! This file is generated by oca-gen-addon-readme !! !! changes will be overwritten. !! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -!! source digest: sha256:5c7d6692e11a4d5d03e2fd23cfcf6068d23e685be5e1bc7f760de482512c3083 +!! source digest: sha256:10e03ffe452b93247cdca483f5d4597ae8d6f572bc00de63bcc7f7238d2ce33d !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -->

Mature License: LGPL-3 OCA/queue Translate me on Weblate Try me on Runboat

This addon adds an integrated Job Queue to Odoo.

diff --git a/queue_job/tests/common.py b/queue_job/tests/common.py index ccd63cff15..434eadfc34 100644 --- a/queue_job/tests/common.py +++ b/queue_job/tests/common.py @@ -255,6 +255,7 @@ def _add_job(self, *args, **kwargs): if not job.identity_key or all( j.identity_key != job.identity_key for j in self.enqueued_jobs ): + self._prepare_context(job) self.enqueued_jobs.append(job) patcher = mock.patch.object(job, "store") @@ -273,6 +274,13 @@ def _add_job(self, *args, **kwargs): ) return job + def _prepare_context(self, job): + # pylint: disable=context-overridden + job_model = job.job_model.with_context({}) + field_records = job_model._fields["records"] + # Filter the context to simulate store/load of the job + job.recordset = field_records.convert_to_write(job.recordset, job_model) + def __enter__(self): return self diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py index c6486e27ef..131ce6322d 100644 --- a/queue_job/tests/test_runner_runner.py +++ b/queue_job/tests/test_runner_runner.py @@ -3,8 +3,57 @@ # pylint: disable=odoo-addons-relative-import # we are testing, we want to test as we were an external consumer of the API +import os + +from odoo.tests import BaseCase, tagged + from odoo.addons.queue_job.jobrunner import runner from .common import load_doctests load_tests = load_doctests(runner) + + +@tagged("-at_install", "post_install") +class TestRunner(BaseCase): + @classmethod + def _is_open_file_descriptor(cls, fd): + try: + os.fstat(fd) + return True + except OSError: + return False + + def test_runner_file_descriptor(self): + a_runner = runner.QueueJobRunner.from_environ_or_config() + + read_fd, write_fd = a_runner._stop_pipe + self.assertTrue(self._is_open_file_descriptor(read_fd)) + self.assertTrue(self._is_open_file_descriptor(write_fd)) + + del a_runner + + self.assertFalse(self._is_open_file_descriptor(read_fd)) + self.assertFalse(self._is_open_file_descriptor(write_fd)) + + def test_runner_file_closed_read_descriptor(self): + a_runner = runner.QueueJobRunner.from_environ_or_config() + + read_fd, write_fd = a_runner._stop_pipe + os.close(read_fd) + + del a_runner + + self.assertFalse(self._is_open_file_descriptor(read_fd)) + self.assertFalse(self._is_open_file_descriptor(write_fd)) + + def test_runner_file_closed_write_descriptor(self): + a_runner = runner.QueueJobRunner.from_environ_or_config() + + read_fd, write_fd = a_runner._stop_pipe + os.close(write_fd) + + del a_runner + + self.assertFalse(self._is_open_file_descriptor(read_fd)) + self.assertFalse(self._is_open_file_descriptor(write_fd)) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index be12b4294b..d39d5aeb0a 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -25,7 +25,7 @@ />