diff --git a/CHANGES.rst b/CHANGES.rst index 75ceeef..b4a596a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -4,6 +4,14 @@ Changelog 4.3 (unreleased) ---------------- +- Ensure connections always reconnect deterministically after a server + disconnect (or crash). + Previously the pool may have harboured long-running connections that only + got reconnected when the pool felt like returning it to the application. + Those connections can easily have lingered for a long time, causing + user-visible errors long after the original problem was fixed, e.g. when + the server has restarted a couple of hours ago. + - Add support for Python 3.13. - Drop support for Python 3.7 and 3.8. diff --git a/src/Products/ZPsycopgDA/db.py b/src/Products/ZPsycopgDA/db.py index 267a56e..f58dc9a 100644 --- a/src/Products/ZPsycopgDA/db.py +++ b/src/Products/ZPsycopgDA/db.py @@ -45,6 +45,7 @@ class DB(TM): _p_oid = _p_changed = None _registered = False _sort_key = '1' + _conn = None def __init__(self, dsn, tilevel, typecasts, enc='utf-8'): self.dsn = dsn @@ -58,31 +59,52 @@ def __init__(self, dsn, tilevel, typecasts, enc='utf-8'): self.calls = 0 self.make_mappings() - def getconn(self, init='ignored', retry=100): - conn = pool.getconn(self.dsn) - _pool = pool.getpool(self.dsn, create=False) - if id(conn) not in _pool._initialized: + def getconn(self, init='ignored'): + if self._conn: + # As the TM is a short-lived object, keep a reference to the + # connection - we really expect the pool's getconn() to reliably + # return the same connection anyway. This is needed to + # differentiate between an initial connection where its fine to + # keep going through the pool and fetching a new one versus an + # existing connection in a running transaction where e.g. a failure + # on the server side has aborted the transaction and we can't e.g. + # run `select 1` any longer but we also need the original + # connection to clean up correctly within Zope's transaction + # management. Theoretically this could be placed in `_register()` + # but I'm not 100% sure someone might be using getconn() without + # the TM integration thus going around `_register()` ... + return self._conn + + _pool = pool.getpool(self.dsn, create=True) + + # Loop to support cleaning up potentially all `maxconn` faulty + # connections. Add 1 more to force a fresh connection at least once. + tries = max([_pool.maxconn + 1, 1]) + for _ in range(tries): + conn = pool.getconn(self.dsn, create=False) try: - conn.set_session(isolation_level=int(self.tilevel)) - except psycopg2.InterfaceError: - # we got a closed connection from a poisoned pool -> - # close it and retry: - pool.putconn(self.dsn, conn, True) - if retry <= 0: - raise ConflictError("InterfaceError from psycopg2") - return self.getconn(retry=retry - 1) + cursor = conn.cursor() + cursor.execute("SELECT 1") + conn.rollback() + break + except (psycopg2.InterfaceError, psycopg2.OperationalError): + pool.putconn(self.dsn, conn, close=True) + + if id(conn) not in _pool._initialized: + conn.set_session(isolation_level=int(self.tilevel)) conn.set_client_encoding(self.encoding) for tc in self.typecasts: register_type(tc, conn) _pool._initialized.add(id(conn)) + + self._conn = conn return conn def putconn(self, close=False): - try: - conn = pool.getconn(self.dsn, False) - except AttributeError: - pass - pool.putconn(self.dsn, conn, close) + if not self._conn: + return + pool.putconn(self.dsn, self._conn, close) + self._conn = None def getcursor(self): conn = self.getconn()