Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions packit_service/celerizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,43 @@
from celery import Celery
from lazy_object_proxy import Proxy

from packit_service.constants import (
REDIS_DEFAULT_CELERY_BACKEND,
REDIS_DEFAULT_DB,
REDIS_DEFAULT_HOST,
REDIS_DEFAULT_PASSWORD,
REDIS_DEFAULT_PORT,
)
from packit_service.sentry_integration import configure_sentry


def get_redis_config():
"""
Get Redis connection configuration from environment variables.

Returns:
dict: Redis configuration with keys: host, password, port, db, celery_backend
"""
return {
"host": getenv("REDIS_SERVICE_HOST", REDIS_DEFAULT_HOST),
"password": getenv("REDIS_PASSWORD", REDIS_DEFAULT_PASSWORD),
"port": getenv("REDIS_SERVICE_PORT", REDIS_DEFAULT_PORT),
"db": getenv("REDIS_SERVICE_DB", REDIS_DEFAULT_DB),
"celery_backend": getenv("REDIS_CELERY_BACKEND")
or getenv("REDIS_CELERY_BECKEND", REDIS_DEFAULT_CELERY_BACKEND),
}


class Celerizer:
def __init__(self):
self._celery_app = None

@property
def celery_app(self):
if self._celery_app is None:
host = getenv("REDIS_SERVICE_HOST", "redis")
password = getenv("REDIS_PASSWORD", "")
port = getenv("REDIS_SERVICE_PORT", "6379")
db = getenv("REDIS_SERVICE_DB", "0")
celery_beckend = getenv("REDIS_CELERY_BECKEND", "1")
broker_url = f"redis://:{password}@{host}:{port}/{db}"
backend_url = f"redis://:{password}@{host}:{port}/{celery_beckend}"
redis_config = get_redis_config()
broker_url = f"redis://:{redis_config['password']}@{redis_config['host']}:{redis_config['port']}/{redis_config['db']}"
backend_url = f"redis://:{redis_config['password']}@{redis_config['host']}:{redis_config['port']}/{redis_config['celery_backend']}"

# http://docs.celeryq.dev/en/stable/reference/celery.html#celery.Celery
self._celery_app = Celery(backend=backend_url, broker=broker_url)
Expand Down
5 changes: 5 additions & 0 deletions packit_service/celery_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
"schedule": 10800.0,
"options": {"queue": "long-running", "time_limit": 3600},
},
"cleanup-orphaned-pidbox-queues": {
"task": "packit_service.worker.tasks.cleanup_orphaned_pidbox_queues",
"schedule": crontab(minute=30, hour=0), # nightly at 12:30 AM
"options": {"queue": "short-running"},
},
}

# http://mher.github.io/flower/prometheus-integration.html#set-up-your-celery-application
Expand Down
10 changes: 10 additions & 0 deletions packit_service/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,13 @@ def from_number(number: int):

# Default URL of the logdetective-packit interface server for sending the Log Detective requests.
LOGDETECTIVE_PACKIT_SERVER_URL = "https://logdetective01.fedorainfracloud.org"

# Redis configuration defaults
REDIS_DEFAULT_HOST = "redis"
REDIS_DEFAULT_PORT = "6379"
REDIS_DEFAULT_DB = "0"
REDIS_DEFAULT_PASSWORD = ""
REDIS_DEFAULT_CELERY_BACKEND = "1"

# TTL for orphaned Celery pidbox reply queues (in seconds)
REDIS_PIDBOX_TTL_SECONDS = 3600 # 1 hour
10 changes: 9 additions & 1 deletion packit_service/worker/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import os

from prometheus_client import CollectorRegistry, Counter, Histogram, push_to_gateway
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, push_to_gateway

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -265,6 +265,14 @@ def __init__(self):
buckets=(5, 15, 20, 25, 30, 40, 60, float("inf")),
)

# Redis/Valkey health metrics
self.redis_keys_total = Gauge(
"redis_keys_total",
"Total number of keys in Redis/Valkey database. "
"If this grows indefinitely, there's a key accumulation/leak issue.",
registry=self.registry,
)

def push(self):
if not (self.pushgateway_address and self.worker_name):
logger.debug("Pushgateway address or worker name not defined.")
Expand Down
83 changes: 82 additions & 1 deletion packit_service/worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from os import getenv
from typing import ClassVar, Optional

import redis
from celery import Task
from celery._state import get_current_task
from celery.signals import after_setup_logger
Expand All @@ -19,11 +20,12 @@
from syslog_rfc5424_formatter import RFC5424Formatter

from packit_service import __version__ as ps_version
from packit_service.celerizer import celery_app
from packit_service.celerizer import celery_app, get_redis_config
from packit_service.constants import (
CELERY_DEFAULT_MAIN_TASK_NAME,
DEFAULT_RETRY_BACKOFF,
DEFAULT_RETRY_LIMIT,
REDIS_PIDBOX_TTL_SECONDS,
USAGE_CURRENT_DATE,
USAGE_DATE_IN_THE_PAST,
USAGE_DATE_IN_THE_PAST_STR,
Expand Down Expand Up @@ -102,6 +104,7 @@
update_vm_image_build,
)
from packit_service.worker.jobs import SteveJobs
from packit_service.worker.monitoring import Pushgateway
from packit_service.worker.result import TaskResults

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -994,3 +997,81 @@ def get_usage_statistics() -> None:
logger.debug(f"Getting usage data from datetime_from {day}.")
get_usage_data(datetime_from=day)
logger.debug("Got usage data.")


@celery_app.task
def cleanup_orphaned_pidbox_queues() -> None:
"""
Clean up orphaned Celery pidbox reply queues that don't have TTL set.

Celery workers create pidbox (control) reply queues for control commands
(inspect, ping, stats, etc.). These queues should be temporary but can be
orphaned when workers crash or restart improperly.

This task:
- Scans for *.reply.celery.pidbox keys
- Sets a 1-hour TTL on keys without expiry (TTL = -1)
- Counts total keys in database for monitoring
- Exports metrics to Prometheus

Runs periodically via Celery beat to prevent disk/memory leaks.
"""
logger.info("Starting cleanup of orphaned pidbox reply queues")

pushgateway = Pushgateway()

try:
# Get Redis connection from Celery's broker
redis_config = get_redis_config()

redis_client = redis.Redis(
host=redis_config["host"],
port=int(redis_config["port"]),
db=int(redis_config["db"]),
password=redis_config["password"],
decode_responses=True,
)

# Scan for pidbox reply queue keys
cursor = 0
keys_processed = 0
keys_with_ttl_set = 0
pattern = "*.reply.celery.pidbox"

while True:
cursor, keys = redis_client.scan(
cursor=cursor,
match=pattern,
count=100,
)

for key in keys:
keys_processed += 1

# Set TTL if key exists but has no expiry (TTL = -1)
if redis_client.ttl(key) == -1:
redis_client.expire(key, REDIS_PIDBOX_TTL_SECONDS)
keys_with_ttl_set += 1
logger.debug(f"Set TTL on pidbox key: {key}")

# Break when cursor returns to 0 (full scan complete)
if cursor == 0:
break

# Get total number of keys in database for monitoring
total_keys = redis_client.dbsize()

logger.info(
f"Pidbox cleanup complete: scanned {keys_processed} pidbox keys, "
f"set TTL on {keys_with_ttl_set} orphaned queues. "
f"Total keys in DB: {total_keys}"
)

# Export metrics to Prometheus
pushgateway.redis_keys_total.set(total_keys)
pushgateway.push()

except redis.RedisError as e:
logger.error(f"Redis error during pidbox cleanup: {e}")
except Exception as e:
logger.error(f"Unexpected error during pidbox cleanup: {e}")
43 changes: 42 additions & 1 deletion tests/unit/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@

import prometheus_client
import pytest
import redis
from celery.app.task import Task
from flexmock import flexmock
from packit.exceptions import PackitException

from packit_service.constants import REDIS_PIDBOX_TTL_SECONDS
from packit_service.worker.handlers import CoprBuildHandler
from packit_service.worker.tasks import run_copr_build_handler
from packit_service.worker.monitoring import Pushgateway
from packit_service.worker.tasks import cleanup_orphaned_pidbox_queues, run_copr_build_handler


def test_autoretry():
Expand All @@ -21,3 +24,41 @@ def test_autoretry():
flexmock(Task).should_receive("retry").and_raise(PackitException).once()
with pytest.raises(PackitException):
run_copr_build_handler({}, {}, {})


def test_cleanup_orphaned_pidbox_queues():
"""Test that pidbox cleanup scans keys, sets TTL, and pushes metrics."""
# Mock Redis client
redis_client = flexmock()
redis_client.should_receive("scan").with_args(
cursor=0,
match="*.reply.celery.pidbox",
count=100,
).and_return((0, ["key1.reply.celery.pidbox", "key2.reply.celery.pidbox"])).once()

# key1 has no TTL (-1), key2 already has TTL
redis_client.should_receive("ttl").with_args("key1.reply.celery.pidbox").and_return(-1).once()
redis_client.should_receive("ttl").with_args("key2.reply.celery.pidbox").and_return(1800).once()

# Only key1 should get TTL set
redis_client.should_receive("expire").with_args(
"key1.reply.celery.pidbox",
REDIS_PIDBOX_TTL_SECONDS,
).once()

redis_client.should_receive("dbsize").and_return(42).once()

# Mock Redis constructor
flexmock(redis).should_receive("Redis").and_return(redis_client).once()

# Mock Pushgateway
gauge = flexmock()
gauge.should_receive("set").with_args(42).once()

pushgateway = flexmock(redis_keys_total=gauge)
pushgateway.should_receive("push").once()

flexmock(Pushgateway).new_instances(pushgateway)
flexmock(prometheus_client).should_receive("push_to_gateway")

cleanup_orphaned_pidbox_queues()
Loading