diff --git a/README.md b/README.md index e56aeb4292..57ff3177fd 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Available addons ---------------- addon | version | maintainers | summary --- | --- | --- | --- -[queue_job](queue_job/) | 18.0.1.2.2 | [![guewen](https://github.com/guewen.png?size=30px)](https://github.com/guewen) | Job Queue +[queue_job](queue_job/) | 18.0.1.3.0 | [![guewen](https://github.com/guewen.png?size=30px)](https://github.com/guewen) | Job Queue [queue_job_batch](queue_job_batch/) | 18.0.1.0.0 | | Job Queue Batch [queue_job_cron](queue_job_cron/) | 18.0.1.1.0 | | Scheduled Actions as Queue Jobs [queue_job_cron_jobrunner](queue_job_cron_jobrunner/) | 18.0.1.0.0 | [![ivantodorovich](https://github.com/ivantodorovich.png?size=30px)](https://github.com/ivantodorovich) | Run jobs without a dedicated JobRunner diff --git a/queue_job/README.rst b/queue_job/README.rst index ce1acbcf1d..18aecbe3c9 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -7,7 +7,7 @@ Job Queue !! This file is generated by oca-gen-addon-readme !! !! changes will be overwritten. !! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! - !! source digest: sha256:457b3cda8ba43e54d1aa390b5e3be8291b6f58bb375d2fde4bc07b97634432df + !! source digest: sha256:3fac52655772f6aa01506c81cba64db296a7d130745d8fa74e6d9ace0d8124c7 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! .. |badge1| image:: https://img.shields.io/badge/maturity-Mature-brightgreen.png diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 1e66cd92d4..e10fe95a92 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -2,7 +2,7 @@ { "name": "Job Queue", - "version": "18.0.1.2.2", + "version": "18.0.1.3.0", "author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)", "website": "https://github.com/OCA/queue", "license": "LGPL-3", diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index c1565c1767..46cb3b3d82 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -159,12 +159,17 @@ SELECT_TIMEOUT = 60 ERROR_RECOVERY_DELAY = 5 +PG_ADVISORY_LOCK_ID = 2293787760715711918 _logger = logging.getLogger(__name__) select = selectors.DefaultSelector +class MasterElectionLost(Exception): + pass + + # Unfortunately, it is not possible to extend the Odoo # server command line arguments, so we resort to environment variables # to configure the runner (channels mostly). @@ -262,10 +267,15 @@ def __init__(self, db_name): self.db_name = db_name connection_info = _connection_info_for(db_name) self.conn = psycopg2.connect(**connection_info) - self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - self.has_queue_job = self._has_queue_job() - if self.has_queue_job: - self._initialize() + try: + self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + self.has_queue_job = self._has_queue_job() + if self.has_queue_job: + self._acquire_master_lock() + self._initialize() + except BaseException: + self.close() + raise def close(self): # pylint: disable=except-pass @@ -278,6 +288,14 @@ def close(self): pass self.conn = None + def _acquire_master_lock(self): + """Acquire the master runner lock or raise MasterElectionLost""" + with closing(self.conn.cursor()) as cr: + cr.execute("SELECT pg_try_advisory_lock(%s)", (PG_ADVISORY_LOCK_ID,)) + if not cr.fetchone()[0]: + msg = f"could not acquire master runner lock on {self.db_name}" + raise MasterElectionLost(msg) + def _has_queue_job(self): with closing(self.conn.cursor()) as cr: cr.execute( @@ -413,7 +431,8 @@ def close_databases(self, remove_jobs=True): self.db_by_name = {} def initialize_databases(self): - for db_name in self.get_db_names(): + for db_name in sorted(self.get_db_names()): + # sorting is important to avoid deadlocks in acquiring the master lock db = Database(db_name) if db.has_queue_job: self.db_by_name[db_name] = db @@ -421,6 +440,8 @@ def initialize_databases(self): for job_data in cr: self.channel_manager.notify(db_name, *job_data) _logger.info("queue job runner ready for db %s", db_name) + else: + db.close() def run_jobs(self): now = _odoo_now() @@ -507,7 +528,7 @@ def run(self): while not self._stop: # outer loop does exception recovery try: - _logger.info("initializing database connections") + _logger.debug("initializing database connections") # TODO: how to detect new databases or databases # on which queue_job is installed after server start? self.initialize_databases() @@ -522,6 +543,14 @@ def run(self): except InterruptedError: # Interrupted system call, i.e. KeyboardInterrupt during select self.stop() + except MasterElectionLost as e: + _logger.debug( + "master election lost: %s, sleeping %ds and retrying", + e, + ERROR_RECOVERY_DELAY, + ) + self.close_databases() + time.sleep(ERROR_RECOVERY_DELAY) except Exception: _logger.exception( "exception: sleeping %ds and retrying", ERROR_RECOVERY_DELAY diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index fe7239f15a..673cdc1e6a 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -367,7 +367,7 @@

Job Queue

!! This file is generated by oca-gen-addon-readme !! !! changes will be overwritten. !! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -!! source digest: sha256:457b3cda8ba43e54d1aa390b5e3be8291b6f58bb375d2fde4bc07b97634432df +!! source digest: sha256:3fac52655772f6aa01506c81cba64db296a7d130745d8fa74e6d9ace0d8124c7 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -->

Mature License: LGPL-3 OCA/queue Translate me on Weblate Try me on Runboat

This addon adds an integrated Job Queue to Odoo.