From 14d3b832e8884f0e0119c6aef9861c70ace29c50 Mon Sep 17 00:00:00 2001 From: Maja Massarini Date: Wed, 1 Apr 2026 09:52:13 +0200 Subject: [PATCH] Add periodic cleanup for orphaned Celery pidbox queues Problem: Celery workers create pidbox (control) reply queues for worker management commands (inspect, ping, stats, etc.). These queues accumulate when workers crash or restart improperly, leading to: - 1,693+ orphaned *.reply.celery.pidbox keys in production - Keys with no TTL (TTL = -1) that persist indefinitely Root cause: Celery's Redis transport does not provide a native way to set TTL on pidbox reply queues when they're created. These are internal implementation details of Celery's broadcast/control mechanism, and there's no configuration option to automatically expire them. Solution: Heartbeat cleanup task Since we cannot tell Celery to natively set TTL on pidbox messages, we implement a periodic heartbeat task that: - Runs nightly at 12:30 AM via Celery beat - Scans for *.reply.celery.pidbox keys without TTL - Sets 1-hour expiration on orphaned queues - Tracks total Redis keys via Prometheus for monitoring Related to: https://github.com/packit/deployment/pull/701 Should fix: https://github.com/packit/packit-service/issues/2983 Assisted-By: Claude Sonnet 4.5 Assisted-By: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- packit_service/celerizer.py | 34 +++++++++--- packit_service/celery_config.py | 5 ++ packit_service/constants.py | 10 ++++ packit_service/worker/monitoring.py | 10 +++- packit_service/worker/tasks.py | 83 ++++++++++++++++++++++++++++- tests/unit/test_tasks.py | 43 ++++++++++++++- 6 files changed, 175 insertions(+), 10 deletions(-) diff --git a/packit_service/celerizer.py b/packit_service/celerizer.py index 99ff8b066..f8b3608bc 100644 --- a/packit_service/celerizer.py +++ b/packit_service/celerizer.py @@ -6,9 +6,33 @@ from celery import Celery from lazy_object_proxy import Proxy +from packit_service.constants import ( + REDIS_DEFAULT_CELERY_BACKEND, + REDIS_DEFAULT_DB, + REDIS_DEFAULT_HOST, + REDIS_DEFAULT_PASSWORD, + REDIS_DEFAULT_PORT, +) from packit_service.sentry_integration import configure_sentry +def get_redis_config(): + """ + Get Redis connection configuration from environment variables. + + Returns: + dict: Redis configuration with keys: host, password, port, db, celery_backend + """ + return { + "host": getenv("REDIS_SERVICE_HOST", REDIS_DEFAULT_HOST), + "password": getenv("REDIS_PASSWORD", REDIS_DEFAULT_PASSWORD), + "port": getenv("REDIS_SERVICE_PORT", REDIS_DEFAULT_PORT), + "db": getenv("REDIS_SERVICE_DB", REDIS_DEFAULT_DB), + "celery_backend": getenv("REDIS_CELERY_BACKEND") + or getenv("REDIS_CELERY_BECKEND", REDIS_DEFAULT_CELERY_BACKEND), + } + + class Celerizer: def __init__(self): self._celery_app = None @@ -16,13 +40,9 @@ def __init__(self): @property def celery_app(self): if self._celery_app is None: - host = getenv("REDIS_SERVICE_HOST", "redis") - password = getenv("REDIS_PASSWORD", "") - port = getenv("REDIS_SERVICE_PORT", "6379") - db = getenv("REDIS_SERVICE_DB", "0") - celery_beckend = getenv("REDIS_CELERY_BECKEND", "1") - broker_url = f"redis://:{password}@{host}:{port}/{db}" - backend_url = f"redis://:{password}@{host}:{port}/{celery_beckend}" + redis_config = get_redis_config() + broker_url = f"redis://:{redis_config['password']}@{redis_config['host']}:{redis_config['port']}/{redis_config['db']}" + backend_url = f"redis://:{redis_config['password']}@{redis_config['host']}:{redis_config['port']}/{redis_config['celery_backend']}" # http://docs.celeryq.dev/en/stable/reference/celery.html#celery.Celery self._celery_app = Celery(backend=backend_url, broker=broker_url) diff --git a/packit_service/celery_config.py b/packit_service/celery_config.py index b17855c41..8c585aa7c 100644 --- a/packit_service/celery_config.py +++ b/packit_service/celery_config.py @@ -54,6 +54,11 @@ "schedule": 10800.0, "options": {"queue": "long-running", "time_limit": 3600}, }, + "cleanup-orphaned-pidbox-queues": { + "task": "packit_service.worker.tasks.cleanup_orphaned_pidbox_queues", + "schedule": crontab(minute=30, hour=0), # nightly at 12:30 AM + "options": {"queue": "short-running"}, + }, } # http://mher.github.io/flower/prometheus-integration.html#set-up-your-celery-application diff --git a/packit_service/constants.py b/packit_service/constants.py index 2b05f985e..4f9fedf54 100644 --- a/packit_service/constants.py +++ b/packit_service/constants.py @@ -366,3 +366,13 @@ def from_number(number: int): # Default URL of the logdetective-packit interface server for sending the Log Detective requests. LOGDETECTIVE_PACKIT_SERVER_URL = "https://logdetective01.fedorainfracloud.org" + +# Redis configuration defaults +REDIS_DEFAULT_HOST = "redis" +REDIS_DEFAULT_PORT = "6379" +REDIS_DEFAULT_DB = "0" +REDIS_DEFAULT_PASSWORD = "" +REDIS_DEFAULT_CELERY_BACKEND = "1" + +# TTL for orphaned Celery pidbox reply queues (in seconds) +REDIS_PIDBOX_TTL_SECONDS = 3600 # 1 hour diff --git a/packit_service/worker/monitoring.py b/packit_service/worker/monitoring.py index 7beb8a8f9..ccd36dfed 100644 --- a/packit_service/worker/monitoring.py +++ b/packit_service/worker/monitoring.py @@ -4,7 +4,7 @@ import logging import os -from prometheus_client import CollectorRegistry, Counter, Histogram, push_to_gateway +from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, push_to_gateway logger = logging.getLogger(__name__) @@ -265,6 +265,14 @@ def __init__(self): buckets=(5, 15, 20, 25, 30, 40, 60, float("inf")), ) + # Redis/Valkey health metrics + self.redis_keys_total = Gauge( + "redis_keys_total", + "Total number of keys in Redis/Valkey database. " + "If this grows indefinitely, there's a key accumulation/leak issue.", + registry=self.registry, + ) + def push(self): if not (self.pushgateway_address and self.worker_name): logger.debug("Pushgateway address or worker name not defined.") diff --git a/packit_service/worker/tasks.py b/packit_service/worker/tasks.py index 00737959b..a4706dd55 100644 --- a/packit_service/worker/tasks.py +++ b/packit_service/worker/tasks.py @@ -7,6 +7,7 @@ from os import getenv from typing import ClassVar, Optional +import redis from celery import Task from celery._state import get_current_task from celery.signals import after_setup_logger @@ -19,11 +20,12 @@ from syslog_rfc5424_formatter import RFC5424Formatter from packit_service import __version__ as ps_version -from packit_service.celerizer import celery_app +from packit_service.celerizer import celery_app, get_redis_config from packit_service.constants import ( CELERY_DEFAULT_MAIN_TASK_NAME, DEFAULT_RETRY_BACKOFF, DEFAULT_RETRY_LIMIT, + REDIS_PIDBOX_TTL_SECONDS, USAGE_CURRENT_DATE, USAGE_DATE_IN_THE_PAST, USAGE_DATE_IN_THE_PAST_STR, @@ -102,6 +104,7 @@ update_vm_image_build, ) from packit_service.worker.jobs import SteveJobs +from packit_service.worker.monitoring import Pushgateway from packit_service.worker.result import TaskResults logger = logging.getLogger(__name__) @@ -994,3 +997,81 @@ def get_usage_statistics() -> None: logger.debug(f"Getting usage data from datetime_from {day}.") get_usage_data(datetime_from=day) logger.debug("Got usage data.") + + +@celery_app.task +def cleanup_orphaned_pidbox_queues() -> None: + """ + Clean up orphaned Celery pidbox reply queues that don't have TTL set. + + Celery workers create pidbox (control) reply queues for control commands + (inspect, ping, stats, etc.). These queues should be temporary but can be + orphaned when workers crash or restart improperly. + + This task: + - Scans for *.reply.celery.pidbox keys + - Sets a 1-hour TTL on keys without expiry (TTL = -1) + - Counts total keys in database for monitoring + - Exports metrics to Prometheus + + Runs periodically via Celery beat to prevent disk/memory leaks. + """ + logger.info("Starting cleanup of orphaned pidbox reply queues") + + pushgateway = Pushgateway() + + try: + # Get Redis connection from Celery's broker + redis_config = get_redis_config() + + redis_client = redis.Redis( + host=redis_config["host"], + port=int(redis_config["port"]), + db=int(redis_config["db"]), + password=redis_config["password"], + decode_responses=True, + ) + + # Scan for pidbox reply queue keys + cursor = 0 + keys_processed = 0 + keys_with_ttl_set = 0 + pattern = "*.reply.celery.pidbox" + + while True: + cursor, keys = redis_client.scan( + cursor=cursor, + match=pattern, + count=100, + ) + + for key in keys: + keys_processed += 1 + + # Set TTL if key exists but has no expiry (TTL = -1) + if redis_client.ttl(key) == -1: + redis_client.expire(key, REDIS_PIDBOX_TTL_SECONDS) + keys_with_ttl_set += 1 + logger.debug(f"Set TTL on pidbox key: {key}") + + # Break when cursor returns to 0 (full scan complete) + if cursor == 0: + break + + # Get total number of keys in database for monitoring + total_keys = redis_client.dbsize() + + logger.info( + f"Pidbox cleanup complete: scanned {keys_processed} pidbox keys, " + f"set TTL on {keys_with_ttl_set} orphaned queues. " + f"Total keys in DB: {total_keys}" + ) + + # Export metrics to Prometheus + pushgateway.redis_keys_total.set(total_keys) + pushgateway.push() + + except redis.RedisError as e: + logger.error(f"Redis error during pidbox cleanup: {e}") + except Exception as e: + logger.error(f"Unexpected error during pidbox cleanup: {e}") diff --git a/tests/unit/test_tasks.py b/tests/unit/test_tasks.py index ac5ff6717..b5e0841d3 100644 --- a/tests/unit/test_tasks.py +++ b/tests/unit/test_tasks.py @@ -3,12 +3,15 @@ import prometheus_client import pytest +import redis from celery.app.task import Task from flexmock import flexmock from packit.exceptions import PackitException +from packit_service.constants import REDIS_PIDBOX_TTL_SECONDS from packit_service.worker.handlers import CoprBuildHandler -from packit_service.worker.tasks import run_copr_build_handler +from packit_service.worker.monitoring import Pushgateway +from packit_service.worker.tasks import cleanup_orphaned_pidbox_queues, run_copr_build_handler def test_autoretry(): @@ -21,3 +24,41 @@ def test_autoretry(): flexmock(Task).should_receive("retry").and_raise(PackitException).once() with pytest.raises(PackitException): run_copr_build_handler({}, {}, {}) + + +def test_cleanup_orphaned_pidbox_queues(): + """Test that pidbox cleanup scans keys, sets TTL, and pushes metrics.""" + # Mock Redis client + redis_client = flexmock() + redis_client.should_receive("scan").with_args( + cursor=0, + match="*.reply.celery.pidbox", + count=100, + ).and_return((0, ["key1.reply.celery.pidbox", "key2.reply.celery.pidbox"])).once() + + # key1 has no TTL (-1), key2 already has TTL + redis_client.should_receive("ttl").with_args("key1.reply.celery.pidbox").and_return(-1).once() + redis_client.should_receive("ttl").with_args("key2.reply.celery.pidbox").and_return(1800).once() + + # Only key1 should get TTL set + redis_client.should_receive("expire").with_args( + "key1.reply.celery.pidbox", + REDIS_PIDBOX_TTL_SECONDS, + ).once() + + redis_client.should_receive("dbsize").and_return(42).once() + + # Mock Redis constructor + flexmock(redis).should_receive("Redis").and_return(redis_client).once() + + # Mock Pushgateway + gauge = flexmock() + gauge.should_receive("set").with_args(42).once() + + pushgateway = flexmock(redis_keys_total=gauge) + pushgateway.should_receive("push").once() + + flexmock(Pushgateway).new_instances(pushgateway) + flexmock(prometheus_client).should_receive("push_to_gateway") + + cleanup_orphaned_pidbox_queues()