Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ Available addons
addon | version | maintainers | summary
--- | --- | --- | ---
[base_import_async](base_import_async/) | 17.0.1.0.0 | | Import CSV files in the background
[queue_job](queue_job/) | 17.0.1.4.3 | <a href='https://github.com/guewen'><img src='https://github.com/guewen.png' width='32' height='32' style='border-radius:50%;' alt='guewen'/></a> | Job Queue
[queue_job](queue_job/) | 17.0.1.5.0 | <a href='https://github.com/guewen'><img src='https://github.com/guewen.png' width='32' height='32' style='border-radius:50%;' alt='guewen'/></a> | Job Queue
[queue_job_cron](queue_job_cron/) | 17.0.1.1.0 | | Scheduled Actions as Queue Jobs
[queue_job_cron_jobrunner](queue_job_cron_jobrunner/) | 17.0.1.1.0 | <a href='https://github.com/ivantodorovich'><img src='https://github.com/ivantodorovich.png' width='32' height='32' style='border-radius:50%;' alt='ivantodorovich'/></a> | Run jobs without a dedicated JobRunner
[queue_job_subscribe](queue_job_subscribe/) | 17.0.1.0.0 | | Control which users are subscribed to queue job notifications
[test_queue_job](test_queue_job/) | 17.0.1.1.0 | | Queue Job Tests
[test_queue_job](test_queue_job/) | 17.0.1.2.0 | | Queue Job Tests

[//]: # (end addons)

Expand Down
2 changes: 1 addition & 1 deletion queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Job Queue
!! This file is generated by oca-gen-addon-readme !!
!! changes will be overwritten. !!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!! source digest: sha256:5c7d6692e11a4d5d03e2fd23cfcf6068d23e685be5e1bc7f760de482512c3083
!! source digest: sha256:10e03ffe452b93247cdca483f5d4597ae8d6f572bc00de63bcc7f7238d2ce33d
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

.. |badge1| image:: https://img.shields.io/badge/maturity-Mature-brightgreen.png
Expand Down
2 changes: 1 addition & 1 deletion queue_job/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{
"name": "Job Queue",
"version": "17.0.1.4.3",
"version": "17.0.1.5.0",
"author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)",
"website": "https://github.com/OCA/queue",
"license": "LGPL-3",
Expand Down
8 changes: 7 additions & 1 deletion queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,13 @@ def _enqueue_dependent_jobs(self, env, job):
else:
break

@http.route("/queue_job/runjob", type="http", auth="none", save_session=False)
@http.route(
"/queue_job/runjob",
type="http",
auth="none",
save_session=False,
readonly=False,
)
def runjob(self, db, job_uuid, **kw):
http.request.session.db = db
env = http.request.env(user=SUPERUSER_ID)
Expand Down
6 changes: 6 additions & 0 deletions queue_job/i18n/de.po
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ msgstr "Ausstehend"

#. module: queue_job
#: model:ir.model.fields,field_description:queue_job.field_queue_job__priority
#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search
msgid "Priority"
msgstr "Priorität"

Expand Down Expand Up @@ -901,6 +902,11 @@ msgstr ""
msgid "Time required to execute this job in seconds. Average when grouped."
msgstr ""

#. module: queue_job
#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search
msgid "Tried many times"
msgstr ""

#. module: queue_job
#: model:ir.model.fields,help:queue_job.field_queue_job__activity_exception_decoration
msgid "Type of the exception activity on record."
Expand Down
6 changes: 6 additions & 0 deletions queue_job/i18n/es.po
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ msgstr "Pendiente"

#. module: queue_job
#: model:ir.model.fields,field_description:queue_job.field_queue_job__priority
#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search
msgid "Priority"
msgstr "Prioridad"

Expand Down Expand Up @@ -913,6 +914,11 @@ msgstr ""
"Tiempo requerido para ejecutar este trabajo en segundos. Promedio cuando se "
"agrupa."

#. module: queue_job
#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search
msgid "Tried many times"
msgstr ""

#. module: queue_job
#: model:ir.model.fields,help:queue_job.field_queue_job__activity_exception_decoration
msgid "Type of the exception activity on record."
Expand Down
6 changes: 6 additions & 0 deletions queue_job/i18n/it.po
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ msgstr "In attesa"

#. module: queue_job
#: model:ir.model.fields,field_description:queue_job.field_queue_job__priority
#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search
msgid "Priority"
msgstr "Priorità"

Expand Down Expand Up @@ -910,6 +911,11 @@ msgid "Time required to execute this job in seconds. Average when grouped."
msgstr ""
"Tempo in secondi richiesto per eseguire il lavoro. Medio quando raggruppati."

#. module: queue_job
#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search
msgid "Tried many times"
msgstr ""

#. module: queue_job
#: model:ir.model.fields,help:queue_job.field_queue_job__activity_exception_decoration
msgid "Type of the exception activity on record."
Expand Down
6 changes: 6 additions & 0 deletions queue_job/i18n/queue_job.pot
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ msgstr ""

#. module: queue_job
#: model:ir.model.fields,field_description:queue_job.field_queue_job__priority
#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search
msgid "Priority"
msgstr ""

Expand Down Expand Up @@ -877,6 +878,11 @@ msgstr ""
msgid "Time required to execute this job in seconds. Average when grouped."
msgstr ""

#. module: queue_job
#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search
msgid "Tried many times"
msgstr ""

#. module: queue_job
#: model:ir.model.fields,help:queue_job.field_queue_job__activity_exception_decoration
msgid "Type of the exception activity on record."
Expand Down
6 changes: 6 additions & 0 deletions queue_job/i18n/zh_CN.po
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ msgstr "等待"

#. module: queue_job
#: model:ir.model.fields,field_description:queue_job.field_queue_job__priority
#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search
msgid "Priority"
msgstr "优先级"

Expand Down Expand Up @@ -904,6 +905,11 @@ msgstr "时间(秒)"
msgid "Time required to execute this job in seconds. Average when grouped."
msgstr "以秒为单位执行此任务所需的时间。分组时为平均值。"

#. module: queue_job
#: model_terms:ir.ui.view,arch_db:queue_job.view_queue_job_search
msgid "Tried many times"
msgstr ""

#. module: queue_job
#: model:ir.model.fields,help:queue_job.field_queue_job__activity_exception_decoration
msgid "Type of the exception activity on record."
Expand Down
71 changes: 0 additions & 71 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import uuid
import weakref
from datetime import datetime, timedelta
from functools import total_ordering
from random import randint

import odoo
Expand Down Expand Up @@ -104,7 +103,6 @@ def identity_exact_hasher(job_):
return hasher


@total_ordering
class Job:
"""A Job is a task to execute. It is the in-memory representation of a job.

Expand Down Expand Up @@ -367,65 +365,6 @@ def job_record_with_same_identity_key(self):
)
return existing

# TODO to deprecate (not called anymore)
@classmethod
def enqueue(
cls,
func,
args=None,
kwargs=None,
priority=None,
eta=None,
max_retries=None,
description=None,
channel=None,
identity_key=None,
):
"""Create a Job and enqueue it in the queue. Return the job uuid.

This expects the arguments specific to the job to be already extracted
from the ones to pass to the job function.

If the identity key is the same than the one in a pending job,
no job is created and the existing job is returned

"""
new_job = cls(
func=func,
args=args,
kwargs=kwargs,
priority=priority,
eta=eta,
max_retries=max_retries,
description=description,
channel=channel,
identity_key=identity_key,
)
return new_job._enqueue_job()

# TODO to deprecate (not called anymore)
def _enqueue_job(self):
if self.identity_key:
existing = self.job_record_with_same_identity_key()
if existing:
_logger.debug(
"a job has not been enqueued due to having "
"the same identity key (%s) than job %s",
self.identity_key,
existing.uuid,
)
return Job._load_from_db_record(existing)
self.store()
_logger.debug(
"enqueued %s:%s(*%r, **%r) with uuid: %s",
self.recordset,
self.method_name,
self.args,
self.kwargs,
self.uuid,
)
return self

@staticmethod
def db_record_from_uuid(env, job_uuid):
# TODO remove in 15.0 or 16.0
Expand Down Expand Up @@ -749,16 +688,6 @@ def __eq__(self, other):
def __hash__(self):
return self.uuid.__hash__()

def sorting_key(self):
return self.eta, self.priority, self.date_created, self.seq

def __lt__(self, other):
if self.eta and not other.eta:
return True
elif not self.eta and other.eta:
return False
return self.sorting_key() < other.sorting_key()

def db_record(self):
return self.db_records_from_uuids(self.env, [self.uuid])

Expand Down
55 changes: 39 additions & 16 deletions queue_job/jobrunner/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright 2015-2016 Camptocamp SA
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
import logging
from collections import namedtuple
from functools import total_ordering
from heapq import heappop, heappush
from weakref import WeakValueDictionary
Expand All @@ -10,6 +11,7 @@
from ..job import CANCELLED, DONE, ENQUEUED, FAILED, PENDING, STARTED, WAIT_DEPENDENCIES

NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED)
JobSortingKey = namedtuple("SortingKey", "eta priority date_created seq")

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -108,7 +110,7 @@ class ChannelJob:
job that are necessary to prioritise them.

Channel jobs are comparable according to the following rules:
* jobs with an eta come before all other jobs
* jobs with an eta cannot be compared with jobs without
* then jobs with a smaller eta come first
* then jobs with a smaller priority come first
* then jobs with a smaller creation time come first
Expand All @@ -135,14 +137,18 @@ class ChannelJob:
>>> j3 < j1
True

j4 and j5 comes even before j3, because they have an eta
j4 and j5 have an eta, they cannot be compared with j3

>>> j4 = ChannelJob(None, None, 4,
... seq=0, date_created=4, priority=9, eta=9)
>>> j5 = ChannelJob(None, None, 5,
... seq=0, date_created=5, priority=9, eta=9)
>>> j4 < j5 < j3
>>> j4 < j5
True
>>> j4 < j3
Traceback (most recent call last):
...
TypeError: '<' not supported between instances of 'int' and 'NoneType'

j6 has same date_created and priority as j5 but a smaller eta

Expand All @@ -153,7 +159,7 @@ class ChannelJob:

Here is the complete suite:

>>> j6 < j4 < j5 < j3 < j1 < j2
>>> j6 < j4 < j5 and j3 < j1 < j2
True

j0 has the same properties as j1 but they are not considered
Expand All @@ -173,14 +179,13 @@ class ChannelJob:

"""

__slots__ = ("db_name", "channel", "uuid", "_sorting_key", "__weakref__")

def __init__(self, db_name, channel, uuid, seq, date_created, priority, eta):
self.db_name = db_name
self.channel = channel
self.uuid = uuid
self.seq = seq
self.date_created = date_created
self.priority = priority
self.eta = eta
self._sorting_key = JobSortingKey(eta, priority, date_created, seq)

def __repr__(self):
return "<ChannelJob %s>" % self.uuid
Expand All @@ -191,18 +196,36 @@ def __eq__(self, other):
def __hash__(self):
return id(self)

def set_no_eta(self):
self._sorting_key = JobSortingKey(None, *self._sorting_key[1:])

@property
def seq(self):
return self._sorting_key.seq

@property
def date_created(self):
return self._sorting_key.date_created

@property
def priority(self):
return self._sorting_key.priority

@property
def eta(self):
return self._sorting_key.eta

def sorting_key(self):
return self.eta, self.priority, self.date_created, self.seq
# DEPRECATED
return self._sorting_key

def sorting_key_ignoring_eta(self):
return self.priority, self.date_created, self.seq
return self._sorting_key[1:]

def __lt__(self, other):
if self.eta and not other.eta:
return True
elif not self.eta and other.eta:
return False
return self.sorting_key() < other.sorting_key()
# Do not compare job where ETA is set with job where it is not
# If one job 'eta' is set, and the other is None, it raises TypeError
return self._sorting_key < other._sorting_key


class ChannelQueue:
Expand Down Expand Up @@ -312,7 +335,7 @@ def remove(self, job):
def pop(self, now):
while self._eta_queue and self._eta_queue[0].eta <= now:
eta_job = self._eta_queue.pop()
eta_job.eta = None
eta_job.set_no_eta()
self._queue.add(eta_job)
if self.sequential and self._eta_queue and self._queue:
eta_job = self._eta_queue[0]
Expand Down
11 changes: 11 additions & 0 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,17 @@ def __init__(
self._stop = False
self._stop_pipe = os.pipe()

def __del__(self):
# pylint: disable=except-pass
try:
os.close(self._stop_pipe[0])
except OSError:
pass
try:
os.close(self._stop_pipe[1])
except OSError:
pass

@classmethod
def from_environ_or_config(cls):
scheme = os.environ.get("ODOO_QUEUE_JOB_SCHEME") or queue_job_config.get(
Expand Down
Loading