From 919f1e389845e41b2dfedebb8e6e92d038d076cc Mon Sep 17 00:00:00 2001 From: level09 Date: Wed, 6 May 2026 19:09:44 +0300 Subject: [PATCH] fix: prevent imports stuck in Pending on stale DB connections etl_process_file's except handler called db.session.get(DataImport, ...) without rolling back first. When an upstream error poisoned the session (e.g. Postgres dropping an idle connection), the fail-marker query raised PendingRollbackError, masked the original exception, and left the import row stuck in 'Pending' forever. - Roll back the session before recovering, and guard the fail-marker so a follow-up failure doesn't replace the original exception. - Enable pool_pre_ping and pool_recycle on the SQLAlchemy engine so workers don't get handed dropped connections in the first place. --- enferno/settings.py | 7 +++++++ enferno/tasks/data_import.py | 13 +++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/enferno/settings.py b/enferno/settings.py index 0cb2dca5e..ea4bedd29 100644 --- a/enferno/settings.py +++ b/enferno/settings.py @@ -52,6 +52,13 @@ class Config(object): SQLALCHEMY_DATABASE_URI = f"postgresql:///{POSTGRES_DB}" SQLALCHEMY_TRACK_MODIFICATIONS = False + # Validate pooled connections before use and recycle them periodically so + # workers don't hand out a dropped connection (Postgres idle timeouts, + # NAT/conntrack drops, restarts). + SQLALCHEMY_ENGINE_OPTIONS = { + "pool_pre_ping": True, + "pool_recycle": int(os.environ.get("SQLALCHEMY_POOL_RECYCLE", 300)), + } # Redis REDIS_HOST = os.environ.get("REDIS_HOST", "localhost") diff --git a/enferno/tasks/data_import.py b/enferno/tasks/data_import.py index 50a00a5e4..4d4e75ff0 100644 --- a/enferno/tasks/data_import.py +++ b/enferno/tasks/data_import.py @@ -30,8 +30,17 @@ def etl_process_file( di.process(file) return "done" except Exception as e: - log = db.session.get(DataImport, data_import_id) - log.fail(e) + # Roll back any half-applied transaction so the session is usable below. + # Without this, errors that poison the session (e.g. dropped DB connection) + # cause the fail-marker query to raise PendingRollbackError, masking the + # original exception and leaving the import stuck in "Pending". + db.session.rollback() + try: + log = db.session.get(DataImport, data_import_id) + if log: + log.fail(e) + except Exception: + logger.exception("Could not mark data import %s as failed", data_import_id) raise # Re-raise for chord coordination