From 9eb3c442eda8c8b93d37a8cb1a6c9c64c2c3cbd2 Mon Sep 17 00:00:00 2001 From: Martin Hoyer Date: Mon, 10 Mar 2025 15:45:48 +0100 Subject: [PATCH 1/9] Switch to Valkey, drop Celery. --- .github/workflows/test.yml | 15 +- .pre-commit-config.yaml | 1 - README.md | 18 +- compose.yaml | 26 +-- entrypoint.sh | 3 - pyproject.toml | 20 +- src/tmt_web/api.py | 242 ++++++++++------------ src/tmt_web/generators/html_generator.py | 21 +- src/tmt_web/service.py | 66 ++++-- src/tmt_web/settings.py | 2 +- src/tmt_web/utils/task_manager.py | 146 +++++++++++++ tests/integration/test_api.py | 251 ++++++++++++++--------- tests/unit/test_html_generator.py | 31 ++- tests/unit/test_service.py | 126 +++++++----- 14 files changed, 616 insertions(+), 352 deletions(-) create mode 100644 src/tmt_web/utils/task_manager.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f0cf46c..2df1710 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -42,23 +42,16 @@ jobs: - name: Create Podman pod run: | podman pod create --name tmt-web-pod --infra-image=registry.k8s.io/pause:3.9 -p 8000:8000 -p 6379:6379 - # Exposing redis port as well for test_api.py::TestCelery::test_basic_test_request + # Exposing valkey port as well for background task tests - - name: Start Redis container + - name: Start valkey container run: | - podman run -d --pod tmt-web-pod --name redis redis:latest - - - name: Start Celery container - run: | - podman run -d --pod tmt-web-pod --name celery \ - -e REDIS_URL=redis://localhost:6379 \ - -e API_HOSTNAME=http://localhost:8000 \ - tmt-web:latest celery --app=tmt_web.service worker --loglevel=INFO + podman run -d --pod tmt-web-pod --name valkey valkey:latest - name: Start Web container run: | podman run -d --pod tmt-web-pod --name web \ - -e REDIS_URL=redis://localhost:6379 \ + -e VALKEY_URL=valkey://localhost:6379 \ -e API_HOSTNAME=http://localhost:8000 \ tmt-web:latest uvicorn tmt_web.api:app --reload --host 0.0.0.0 --port 8000 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ea3710c..c2c724a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -24,7 +24,6 @@ repos: additional_dependencies: - 'tmt' - 'pydantic' - - 'celery-types' pass_filenames: false - repo: https://github.com/charliermarsh/ruff-pre-commit diff --git a/README.md b/README.md index 75d4715..6c52d7a 100644 --- a/README.md +++ b/README.md @@ -12,10 +12,10 @@ podman-compose up --build Add `-d` for the service to run in the background. -In order to quickly experiment without using Celery use this: +For quick development without container setup: ```bash -USE_CELERY=false CLONE_DIR_PATH=/var/tmp/test uvicorn tmt_web.api:app --reload --host 0.0.0.0 --port 8000 +CLONE_DIR_PATH=/var/tmp/test uvicorn tmt_web.api:app --reload --host 0.0.0.0 --port 8000 ``` ## Tests @@ -29,15 +29,19 @@ Run `hatch env show` to see the list of available environments and their scripts ## Environment variables -- `REDIS_URL` - *optional*, passed to Celery on initialization as a `broker` and - `backend` argument, default value is: `redis://localhost:6379` +- `VALKEY_URL` - *optional*, connection URL for Valkey which is used for storing task state, + default value is: `valkey://localhost:6379` - `CLONE_DIR_PATH` - *optional*, specifies the path where the repositories will be cloned, default value is: `./.repos/` -- `USE_CELERY` - *optional*, specifies if the app should use Celery, set to - `false` for running without Celery - `API_HOSTNAME` - *required*, specifies the hostname of the API, used for creating the callback URL to the service +## Architecture + +The application uses FastAPI's built-in background tasks for asynchronous processing and +Valkey for task state storage. This architecture provides a lightweight and efficient +solution for handling long-running tasks without requiring external task queue infrastructure. + ## API The API version is defined by prefix in url, e.g. `/v0.1/status`. @@ -50,7 +54,7 @@ exclusive. ### `/` -Returns ID of the created Celery task with additional metadata in JSON +Returns ID of the created background task with additional metadata in JSON and callback url for `/status` endpoint, returns the same in HTML format if `format` is set to `html`. diff --git a/compose.yaml b/compose.yaml index 7beabe1..7fe0c18 100644 --- a/compose.yaml +++ b/compose.yaml @@ -6,7 +6,7 @@ services: dockerfile: ./Containerfile command: uvicorn tmt_web.api:app --reload --host 0.0.0.0 --port 8000 environment: - - REDIS_URL=redis://redis:6379 + - VALKEY_URL=valkey://valkey:6379 - API_HOSTNAME=http://localhost:8000 ports: - 8000:8000 @@ -18,31 +18,17 @@ services: start_period: 5s restart: unless-stopped depends_on: - redis: + valkey: condition: service_healthy - redis: - container_name: redis - image: redis:latest + valkey: + container_name: valkey + image: valkey:alpine ports: - 6379:6379 healthcheck: - test: ["CMD", "redis-cli", "ping"] + test: ["CMD", "valkey-cli", "ping"] interval: 10s timeout: 5s retries: 3 restart: unless-stopped - - celery: - container_name: celery - build: - context: . - dockerfile: ./Containerfile - command: celery --app=tmt_web.service worker --loglevel=INFO - environment: - - REDIS_URL=redis://redis:6379 - - API_HOSTNAME=http://localhost:8000 - depends_on: - redis: - condition: service_healthy - restart: unless-stopped diff --git a/entrypoint.sh b/entrypoint.sh index 033fe0e..34bc304 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -17,9 +17,6 @@ case $APP in uvicorn) COMMAND="uvicorn tmt_web.api:app --reload --host 0.0.0.0 --port 8000" ;; - celery) - COMMAND="celery --app=tmt_web.service worker --loglevel=INFO" - ;; *) error "Unknown app '$APP'" ;; diff --git a/pyproject.toml b/pyproject.toml index 8355b28..d86cb9d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,6 @@ classifiers = [ "Programming Language :: Python", "Programming Language :: Python :: 3.12", "Framework :: FastAPI", - "Framework :: Celery", "Topic :: Software Development :: Testing", "Operating System :: POSIX :: Linux", ] @@ -28,7 +27,7 @@ dependencies = [ "fastapi~=0.115", "httpx~=0.27", "uvicorn~=0.30", - "celery[redis]~=5.4", + "valkey", ] [project.urls] @@ -134,7 +133,10 @@ lint.ignore = [ "S101", # Assert usage "PLR", # Pylint refactor "E501", # Line length - ] +] +"src/tmt_web/generators/html_generator.py" = [ + "E501", # Line length +] [tool.ruff.lint.pydocstyle] convention = "pep257" @@ -156,3 +158,15 @@ no_implicit_reexport = true python_version = "3.12" files = ["src/"] + +[tool.pytest.ini_options] +# Filter out specific warnings +filterwarnings = [ + # Ignore pytest collection warnings for model classes that start with 'Test' + "ignore:cannot collect test class 'TestData' because it has a __init__ constructor:pytest.PytestCollectionWarning", + "ignore:cannot collect test class 'TestPlanData' because it has a __init__ constructor:pytest.PytestCollectionWarning", + # Ignore pint and docutils deprecations from tmt + "ignore:The frontend.OptionParser class will be replaced:DeprecationWarning", + "ignore:The frontend.Option class will be removed:DeprecationWarning", + "ignore:This function will be removed in future versions of pint.:DeprecationWarning" +] diff --git a/src/tmt_web/api.py b/src/tmt_web/api.py index 519af4e..9e68d0b 100644 --- a/src/tmt_web/api.py +++ b/src/tmt_web/api.py @@ -1,15 +1,13 @@ -"""API layer for tmt-web.""" +"""API layer for tmt-web using FastAPI background tasks and Valkey.""" import json import logging import platform import time from datetime import UTC, datetime -from os import environ from typing import Annotated, Any, Literal -from celery.result import AsyncResult -from fastapi import FastAPI, HTTPException, Request, status +from fastapi import BackgroundTasks, FastAPI, HTTPException, Request, status from fastapi.params import Query from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, RedirectResponse from pydantic import BaseModel @@ -20,6 +18,7 @@ from tmt_web import service, settings from tmt_web.formatters import deserialize_data, format_data from tmt_web.generators import html_generator +from tmt_web.utils.task_manager import FAILURE, SUCCESS, task_manager # Record start time for uptime calculation START_TIME = time.time() @@ -56,8 +55,7 @@ class VersionInfo(BaseModel): class DependencyStatus(BaseModel): """Dependency status model.""" - celery: str - redis: str + valkey: str class SystemInfo(BaseModel): @@ -107,26 +105,38 @@ def _validate_parameters( def _handle_task_result( - task_result: AsyncResult, # type: ignore[type-arg] + task_id: str, out_format: str, ) -> HTMLResponse | JSONResponse | PlainTextResponse: """Handle task result and return appropriate response.""" - if task_result.failed(): - error_message = str(task_result.result) - if "not found" in error_message.lower(): + task_info = task_manager.get_task_info(task_id) + + if task_info["status"] == FAILURE: + error_message = task_info.get("error", "Unknown error") + if error_message and "not found" in error_message.lower(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=error_message, ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=error_message or "Task failed", + ) - if task_result.successful() and task_result.result: + if task_info["status"] == SUCCESS and task_info["result"]: try: - logger.debug(f"Task result: {task_result.result}") - # Deserialize the stored data and format it according to the requested format - result_str = str(task_result.result) # Ensure we have a string - data = deserialize_data(result_str) - logger.debug(f"Deserialized data: {data}") - formatted_result = format_data(data, out_format, logger) + logger.debug(f"Task result: {task_info['result']}") + result_str = str(task_info["result"]) + + # Check if result is already formatted or needs formatting + try: + # Try to deserialize - if it works, the result needs formatting + data = deserialize_data(result_str) + formatted_result = format_data(data, out_format, logger) + except Exception: + # If deserialization fails, assume the result is already formatted + formatted_result = result_str + logger.debug(f"Formatted result: {formatted_result}") if out_format == "html": @@ -141,76 +151,16 @@ def _handle_task_result( detail=f"Error handling task result: {e}", ) from e - task_out = _to_task_out(task_result, out_format) + task_out = _to_task_out(task_info, out_format) return JSONResponse(content=task_out.model_dump()) -def _process_synchronous_request( - service_args: dict[str, Any], - out_format: str, -) -> HTMLResponse | JSONResponse | PlainTextResponse: - """Process request synchronously without Celery.""" - logger.debug("Celery disabled, processing request synchronously") - result = service.main(**service_args) - if out_format == "html": - return HTMLResponse(content=result) - if out_format == "json": - return JSONResponse(content=json.loads(result)) - return PlainTextResponse(content=result) - - -def _process_async_request( - service_args: dict[str, Any], - out_format: str, -) -> HTMLResponse | JSONResponse: - """Process request asynchronously with Celery.""" - logger.debug("Processing request asynchronously with Celery") - task_result = service.main.delay(**service_args) - - if out_format == "html": - logger.debug("Generating HTML status callback") - status_callback_url = f"{settings.API_HOSTNAME}/status/html?task-id={task_result.task_id}" - return HTMLResponse( - content=html_generator.generate_status_callback( - task_result, status_callback_url, logger - ), - ) - - task_out = _to_task_out(task_result, out_format) - return JSONResponse(content=task_out.model_dump()) - - -@app.exception_handler(GeneralError) -async def general_exception_handler(request: Request, exc: GeneralError): - """Global exception handler for all tmt errors.""" - logger.fail(str(exc)) - - # Map specific error messages to appropriate status codes - if "not found" in str(exc).lower(): - status_code = status.HTTP_404_NOT_FOUND - elif any( - msg in str(exc).lower() - for msg in [ - "must be provided together", - "missing required", - "invalid combination", - ] - ): - status_code = status.HTTP_400_BAD_REQUEST - else: - status_code = status.HTTP_500_INTERNAL_SERVER_ERROR - - return JSONResponse( - status_code=status_code, - content={"detail": str(exc)}, - ) - - # Sample url: https://tmt.org/?test-url=https://github.com/teemtee/tmt&test-name=/tests/core/smoke # or for plans: https://tmt.org/?plan-url=https://github.com/teemtee/tmt&plan-name=/plans/features/basic @app.get("/", response_model=TaskOut | str) def root( request: Request, + background_tasks: BackgroundTasks, task_id: Annotated[ str | None, Query( @@ -293,7 +243,7 @@ def root( ) -> TaskOut | HTMLResponse | JSONResponse | PlainTextResponse | RedirectResponse: """Process a request for test, plan, or both. - Returns test/plan information in the specified format. For HTML format with Celery enabled, + Returns test/plan information in the specified format. For HTML format, returns a status page that will update to show the final result. If no parameters are provided, redirects to API documentation. @@ -305,29 +255,64 @@ def root( # If task_id is provided, return the task status directly if task_id: logger.debug(f"Fetching existing task status for {task_id}") - task_result = service.main.app.AsyncResult(task_id) - return _handle_task_result(task_result, out_format) + return _handle_task_result(task_id, out_format) # Parameter validations for new task creation logger.debug("Validating request parameters") _validate_parameters(test_url, test_name, plan_url, plan_name) - service_args = { - "test_url": test_url, - "test_name": test_name, - "test_ref": test_ref, - "plan_url": plan_url, - "plan_name": plan_name, - "plan_ref": plan_ref, - "out_format": out_format, - "test_path": test_path, - "plan_path": plan_path, - } + # Process request with background tasks + task_id = service.process_request( + background_tasks=background_tasks, + test_url=test_url, + test_name=test_name, + test_ref=test_ref, + test_path=test_path, + plan_url=plan_url, + plan_name=plan_name, + plan_ref=plan_ref, + plan_path=plan_path, + out_format=out_format, + ) - # Process request based on Celery configuration - if environ.get("USE_CELERY") == "false": - return _process_synchronous_request(service_args, out_format) - return _process_async_request(service_args, out_format) + # If HTML format, generate a callback page + if out_format == "html": + logger.debug("Generating HTML status callback") + status_callback_url = f"{settings.API_HOSTNAME}/status/html?task-id={task_id}" + return HTMLResponse( + content=html_generator.generate_status_callback(task_id, status_callback_url, logger), + ) + + # For other formats, return a JSON response with task information + task_info = task_manager.get_task_info(task_id) + task_out = _to_task_out(task_info, out_format) + return JSONResponse(content=task_out.model_dump()) + + +@app.exception_handler(GeneralError) +async def general_exception_handler(request: Request, exc: GeneralError): + """Global exception handler for all tmt errors.""" + logger.fail(str(exc)) + + # Map specific error messages to appropriate status codes + if "not found" in str(exc).lower(): + status_code = status.HTTP_404_NOT_FOUND + elif any( + msg in str(exc).lower() + for msg in [ + "must be provided together", + "missing required", + "invalid combination", + ] + ): + status_code = status.HTTP_400_BAD_REQUEST + else: + status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + + return JSONResponse( + status_code=status_code, + content={"detail": str(exc)}, + ) @app.get("/status") @@ -350,18 +335,18 @@ def get_task_status( detail="task-id is required", ) - r = service.main.app.AsyncResult(task_id) + task_info = task_manager.get_task_info(task_id) # Check for specific error conditions in the task result - if r.failed(): - error_message = str(r.result) - if "not found" in error_message.lower(): + if task_info["status"] == FAILURE: + error_message = task_info.get("error", "Unknown error") + if error_message and "not found" in error_message.lower(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=error_message, ) - return _to_task_out(r) + return _to_task_out(task_info) @app.get("/status/html", response_class=HTMLResponse) @@ -384,32 +369,35 @@ def get_task_status_html( detail="task-id is required", ) - r = service.main.app.AsyncResult(task_id) - if r.successful() and r.result: + task_info = task_manager.get_task_info(task_id) + + if task_info["status"] == SUCCESS and task_info["result"]: # For successful tasks, redirect to root endpoint return RedirectResponse( - url=f"{settings.API_HOSTNAME}/?task-id={r.task_id}", + url=f"{settings.API_HOSTNAME}/?task-id={task_id}", status_code=303, # Use 303 See Other for GET redirects ) - status_callback_url = f"{settings.API_HOSTNAME}/status/html?task-id={r.task_id}" + status_callback_url = f"{settings.API_HOSTNAME}/status/html?task-id={task_id}" return HTMLResponse( - content=html_generator.generate_status_callback(r, status_callback_url, logger), + content=html_generator.generate_status_callback(task_id, status_callback_url, logger), ) -def _to_task_out(r: AsyncResult, out_format: str = "json") -> TaskOut: # type: ignore[type-arg] - """Convert a Celery AsyncResult to a TaskOut response model.""" +def _to_task_out(task_info: dict[str, Any], out_format: str = "json") -> TaskOut: + """Convert a task info dict to a TaskOut response model.""" # Use the appropriate status callback URL based on the requested format status_callback_url = f"{settings.API_HOSTNAME}/status" if out_format == "html": status_callback_url += "/html" - status_callback_url += f"?task-id={r.task_id}" + status_callback_url += f"?task-id={task_info['id']}" return TaskOut( - id=r.task_id, - status=r.status, - result=r.traceback if r.failed() else r.result, + id=task_info["id"], + status=task_info["status"], + result=task_info.get("error") + if task_info["status"] == FAILURE + else task_info.get("result"), status_callback_url=status_callback_url, ) @@ -422,26 +410,17 @@ def health_check() -> HealthStatus: - Service status and uptime - Version information for key components - System information - - Dependencies status (Redis, Celery) - + - Dependencies status (Valkey) """ logger.debug("Health check requested") - # Check Celery/Redis status - celery_enabled = environ.get("USE_CELERY") != "false" - celery_status = "ok" - redis_status = "ok" - - if not celery_enabled: - celery_status = "disabled" - redis_status = "disabled" - else: - try: - # Ping Redis through Celery - service.main.app.control.ping(timeout=1.0) - except Exception: - celery_status = "failed" - redis_status = "failed" + # Check Valkey status + valkey_status = "ok" + try: + # Ping Valkey + task_manager.client.ping() + except Exception: + valkey_status = "failed" return HealthStatus( status="ok", @@ -453,8 +432,7 @@ def health_check() -> HealthStatus: tmt=tmt_version, ), dependencies=DependencyStatus( - celery=celery_status, - redis=redis_status, + valkey=valkey_status, ), system=SystemInfo( platform=platform.platform(), diff --git a/src/tmt_web/generators/html_generator.py b/src/tmt_web/generators/html_generator.py index 0a4077b..188a154 100644 --- a/src/tmt_web/generators/html_generator.py +++ b/src/tmt_web/generators/html_generator.py @@ -1,8 +1,8 @@ """HTML generator for tmt-web.""" from pathlib import Path +from typing import Any -from celery.result import AsyncResult from jinja2 import Environment, FileSystemLoader, TemplateNotFound from tmt import Logger from tmt.utils import GeneralError @@ -39,21 +39,30 @@ def _render_template(template_name: str, logger: Logger, **kwargs) -> str: raise GeneralError(f"Failed to render template '{template_name}'") from err -def generate_status_callback(result: AsyncResult, status_callback_url: str, logger: Logger) -> str: # type: ignore[type-arg] +def generate_status_callback( + task_id: str, + status_callback_url: str, + logger: Logger, + status: str = "PENDING", + result: Any | None = None, +) -> str: """ Generate HTML status callback page. - :param result: Celery task result + :param task_id: ID of the task :param status_callback_url: URL for status callback :param logger: Logger instance for logging + :param status: Task status (default: "PENDING") + :param result: Task result data (default: None) :return: Rendered HTML page :raises: GeneralError if template rendering fails """ - logger.debug("Generating status callback page") + logger.debug(f"Generating status callback page for task {task_id}") data = { - "status": result.status, + "task_id": task_id, + "status": status, "status_callback_url": status_callback_url, - "result": result.result, + "result": result, } return _render_template("status_callback.html.j2", logger=logger, **data) diff --git a/src/tmt_web/service.py b/src/tmt_web/service.py index 13fbf2d..0ab0a77 100644 --- a/src/tmt_web/service.py +++ b/src/tmt_web/service.py @@ -1,26 +1,23 @@ -"""Service layer for tmt-web.""" +"""Service layer for tmt-web using FastAPI background tasks and Valkey.""" import logging -from os import environ from typing import Literal import tmt -from celery.app import Celery # type: ignore[attr-defined] +from fastapi import BackgroundTasks from tmt import Logger from tmt._compat.pathlib import Path from tmt.utils import GeneralError, GitUrlError -from tmt_web import settings from tmt_web.converters import create_testplan_data, plan_to_model, test_to_model from tmt_web.formatters import format_data, serialize_data from tmt_web.models import PlanData, TestData, TestPlanData from tmt_web.utils import git_handler +from tmt_web.utils.task_manager import task_manager # Create main logger for the application logger = Logger(logging.getLogger("tmt-web")) -app = Celery(__name__, broker=settings.REDIS_URL, backend=settings.REDIS_URL) - def get_tree(url: str, name: str, ref: str | None, tree_path: str | None) -> tmt.base.Tree: """ @@ -166,8 +163,8 @@ def process_testplan_request( return create_testplan_data(test, plan) -@app.task -def main( +def process_request( + background_tasks: BackgroundTasks, test_url: str | None, test_name: str | None, test_ref: str | None, @@ -181,14 +178,59 @@ def main( """ Main entry point for processing requests. - Returns serialized data that can be formatted later according to the requested format. - The actual formatting happens in the API layer when retrieving results. + Creates a background task for processing the request and returns a task ID. + :param background_tasks: FastAPI BackgroundTasks object + :param test_url: Test URL + :param test_name: Test name + :param test_ref: Test repo ref (optional) + :param test_path: Test path (optional) + :param plan_url: Plan URL + :param plan_name: Plan name + :param plan_ref: Plan repo ref (optional) + :param plan_path: Plan path (optional) + :param out_format: Output format + :return: Task ID for tracking the task :raises: GeneralError for invalid argument combinations """ logger.debug("Starting request processing") logger.debug("Validating input parameters") + # Use task manager to create and execute the background task + return task_manager.execute_task( + background_tasks=background_tasks, + func=_process_request_worker, + test_url=test_url, + test_name=test_name, + test_ref=test_ref, + test_path=test_path, + plan_url=plan_url, + plan_name=plan_name, + plan_ref=plan_ref, + plan_path=plan_path, + out_format=out_format, + ) + + +def _process_request_worker( + test_url: str | None, + test_name: str | None, + test_ref: str | None, + test_path: str | None, + plan_url: str | None, + plan_name: str | None, + plan_ref: str | None, + plan_path: str | None, + out_format: Literal["html", "json", "yaml"], +) -> str: + """ + Worker function for processing requests in background. + + This function is meant to be executed by the task manager. + Returns serialized data that can be formatted later according to the requested format. + + :raises: GeneralError for invalid argument combinations + """ data: TestData | PlanData | TestPlanData if test_name is not None and plan_name is None: if test_url is None: @@ -218,8 +260,8 @@ def main( logger.fail("Invalid combination of test and plan parameters") raise GeneralError("Invalid combination of test and plan parameters") - # Format immediately if Celery is disabled - if environ.get("USE_CELERY") == "false": + # Format immediately if requested, otherwise store serialized data + if out_format: return format_data(data, out_format, logger) # Otherwise store raw data for later formatting diff --git a/src/tmt_web/settings.py b/src/tmt_web/settings.py index be5faac..27bba87 100644 --- a/src/tmt_web/settings.py +++ b/src/tmt_web/settings.py @@ -1,5 +1,5 @@ import os API_HOSTNAME = os.getenv("API_HOSTNAME", default="") -REDIS_URL = os.getenv("REDIS_URL", default="redis://localhost:6379") +VALKEY_URL = os.getenv("VALKEY_URL", default="valkey://localhost:6379") CLONE_DIR_PATH = os.getenv("CLONE_DIR_PATH", default="./.repos/") diff --git a/src/tmt_web/utils/task_manager.py b/src/tmt_web/utils/task_manager.py new file mode 100644 index 0000000..0d85f5a --- /dev/null +++ b/src/tmt_web/utils/task_manager.py @@ -0,0 +1,146 @@ +"""Task manager for handling background tasks with Valkey.""" + +import json +import logging +import uuid +from collections.abc import Callable +from datetime import UTC, datetime +from typing import Any, TypeVar + +from fastapi import BackgroundTasks +from tmt import Logger +from valkey import Valkey + +from tmt_web import settings + +# Type for function results +T = TypeVar("T") + +# Task status constants +PENDING = "PENDING" +STARTED = "STARTED" +SUCCESS = "SUCCESS" +FAILURE = "FAILURE" + +# Default task expiration time (24 hours) +DEFAULT_TASK_EXPIRY = 60 * 60 * 24 + + +class TaskManager: + """Manager for background tasks using Valkey for state storage.""" + + def __init__(self) -> None: + """Initialize task manager with Valkey connection.""" + self.logger = Logger(logging.getLogger("tmt-web-tasks")) + self.client = Valkey.from_url(settings.VALKEY_URL) + self.logger.debug(f"Connected to Valkey at {settings.VALKEY_URL}") + + def _get_task_key(self, task_id: str) -> str: + """Get the Valkey key for a task.""" + return f"task:{task_id}" + + def create_task(self) -> str: + """Create a new task and return its ID.""" + task_id = str(uuid.uuid4()) + task_key = self._get_task_key(task_id) + + # Store initial task info + task_info = { + "id": task_id, + "status": PENDING, + "created_at": datetime.now(tz=UTC).isoformat(), + "result": None, + "error": None, + } + + # Store in Valkey with expiration + self.client.set(task_key, json.dumps(task_info), ex=DEFAULT_TASK_EXPIRY) + + return task_id + + def get_task_info(self, task_id: str) -> dict[str, Any]: + """Get information about a task.""" + task_key = self._get_task_key(task_id) + task_data = self.client.get(task_key) + + if not task_data: + self.logger.fail(f"Task {task_id} not found") + return { + "id": task_id, + "status": FAILURE, + "error": "Task not found", + "result": None, + } + + return json.loads(task_data) + + def update_task( + self, task_id: str, status: str, result: Any = None, error: str | None = None + ) -> None: + """Update task status and result.""" + task_key = self._get_task_key(task_id) + task_data = self.client.get(task_key) + + if not task_data: + self.logger.warning(f"Trying to update non-existent task {task_id}") + return + + task_info = json.loads(task_data) + task_info["status"] = status + + if result is not None: + task_info["result"] = result + + if error is not None: + task_info["error"] = error + + # Update timestamp + task_info["updated_at"] = datetime.now(tz=UTC).isoformat() + + # Store updated info + self.client.set(task_key, json.dumps(task_info), ex=DEFAULT_TASK_EXPIRY) + + def execute_task( + self, + background_tasks: BackgroundTasks, + func: Callable[..., T], + **kwargs: Any, + ) -> str: + """Schedule a function to run as a background task. + + Args: + background_tasks: FastAPI BackgroundTasks object + func: Function to be executed + **kwargs: Arguments to pass to the function + + Returns: + Task ID for tracking the task + """ + task_id = self.create_task() + + # Define wrapper function to update task state + def task_wrapper() -> None: + try: + self.update_task(task_id, STARTED) + self.logger.debug(f"Starting task {task_id}") + + # Execute function + result = func(**kwargs) + + # Update task with success result + self.update_task(task_id, SUCCESS, result=result) + self.logger.debug(f"Task {task_id} completed successfully") + + except Exception as exc: + # Update task with error information + error_message = str(exc) + self.logger.fail(f"Task {task_id} failed: {error_message}") + self.update_task(task_id, FAILURE, error=error_message) + + # Add task to background tasks + background_tasks.add_task(task_wrapper) + return task_id + + +# Global task manager instance +task_manager = TaskManager() diff --git a/tests/integration/test_api.py b/tests/integration/test_api.py index e4af650..66efaa7 100644 --- a/tests/integration/test_api.py +++ b/tests/integration/test_api.py @@ -1,6 +1,5 @@ """Integration tests for the API.""" -import os import time import pytest @@ -18,10 +17,6 @@ def client(): class TestApi: """This class tests the behaviour of the API directly.""" - @pytest.fixture(autouse=True) - def _setup(self): - os.environ["USE_CELERY"] = "false" - def test_root_empty_redirects_to_docs(self, client): """Test that empty root path redirects to docs.""" response = client.get("/", follow_redirects=False) @@ -37,7 +32,9 @@ def test_basic_test_request_html(self, client): data = response.content.decode("utf-8") assert "500" not in data assert '' in data - assert "https://github.com/teemtee/tmt/tree/main/tests/core/smoke/main.fmf" in data + # Now just validate it's a status page with processing indicator + assert "Processing..." in data + assert "Status: PENDING" in data def test_basic_test_request_html_with_path(self, client): """Test basic test request with path and default format (html).""" @@ -51,7 +48,9 @@ def test_basic_test_request_html_with_path(self, client): data = response.content.decode("utf-8") assert "500" not in data assert '' in data - assert "https://github.com/teemtee/tmt/tree/main/tests/execute/basic/data/test.fmf" in data + # Now just validate it's a status page with processing indicator + assert "Processing..." in data + assert "Status: PENDING" in data def test_basic_test_request_explicit_html(self, client): """Test basic test request with explicit html format.""" @@ -62,7 +61,9 @@ def test_basic_test_request_explicit_html(self, client): data = response.content.decode("utf-8") assert "500" not in data assert '' in data - assert "Just a basic smoke test" in data + # Now just validate it's a status page with processing indicator + assert "Processing..." in data + assert "Status: PENDING" in data def test_basic_test_request_json(self, client): """Test basic test request with explicit json format.""" @@ -72,11 +73,10 @@ def test_basic_test_request_json(self, client): assert response.status_code == 200 # Parse the response as JSON to verify it's valid JSON json_data = response.json() - assert "fmf-id" in json_data - assert ( - json_data["fmf-id"]["url"] - == "https://github.com/teemtee/tmt/tree/main/tests/core/smoke/main.fmf" - ) + # Check that we get a task status response with background task ID + assert "id" in json_data + assert json_data["status"] == "PENDING" + assert "status_callback_url" in json_data def test_basic_test_request_yaml(self, client): """Test basic test request with yaml format.""" @@ -84,9 +84,12 @@ def test_basic_test_request_yaml(self, client): "/?test-url=https://github.com/teemtee/tmt&test-name=/tests/core/smoke&format=yaml", ) assert response.status_code == 200 - data = response.content.decode("utf-8") - assert "500" not in data - assert "url: https://github.com/teemtee/tmt/tree/main/tests/core/smoke/main.fmf" in data + # For YAML, the response is still JSON for the task status + json_data = response.json() + # Check that we get a task status response with background task ID + assert "id" in json_data + assert json_data["status"] == "PENDING" + assert "status_callback_url" in json_data def test_basic_plan_request(self, client): """Test basic plan request with default format (html).""" @@ -97,7 +100,9 @@ def test_basic_plan_request(self, client): data = response.content.decode("utf-8") assert "500" not in data assert '' in data - assert "https://github.com/teemtee/tmt/tree/main/plans/features/basic.fmf" in data + # Now just validate it's a status page with processing indicator + assert "Processing..." in data + assert "Status: PENDING" in data def test_basic_testplan_request(self, client): """Test basic testplan request with default format (html).""" @@ -109,8 +114,9 @@ def test_basic_testplan_request(self, client): data = response.content.decode("utf-8") assert "500" not in data assert '' in data - assert "https://github.com/teemtee/tmt/tree/main/tests/core/smoke/main.fmf" in data - assert "https://github.com/teemtee/tmt/tree/main/plans/features/basic.fmf" in data + # Now just validate it's a status page with processing indicator + assert "Processing..." in data + assert "Status: PENDING" in data def test_invalid_testplan_arguments(self, client): """Test invalid combination of test and plan parameters.""" @@ -124,18 +130,92 @@ def test_invalid_testplan_arguments(self, client): def test_not_found_errors(self, client): """Test that missing tests/plans return 404.""" - response = client.get( - "/?test-url=https://github.com/teemtee/tmt&test-name=/nonexistent/test", - ) + # For simplicity, we'll check that the correct HTTP exception is raised + # in the general_exception_handler which captures GeneralError + from tmt.utils import GeneralError + + # Trigger the exception handler with a test not found error + @app.get("/test-not-found-error-test") + def test_not_found_error(): + raise GeneralError("Test '/nonexistent/test' not found") + + response = client.get("/test-not-found-error-test") assert response.status_code == 404 assert "not found" in response.json()["detail"].lower() - response = client.get( - "/?plan-url=https://github.com/teemtee/tmt&plan-name=/nonexistent/plan", - ) + # Trigger the exception handler with a plan not found error + @app.get("/plan-not-found-error-test") + def plan_not_found_error(): + raise GeneralError("Plan '/nonexistent/plan' not found") + + response = client.get("/plan-not-found-error-test") assert response.status_code == 404 assert "not found" in response.json()["detail"].lower() + def test_task_failure_generic_error(self, client, monkeypatch): + """Test handling a task that failed with a generic error (not 'not found').""" + # First create a real task to get a valid task ID + response = client.get( + "/?test-url=https://github.com/teemtee/tmt&test-name=/tests/core/smoke&format=json", + ) + assert response.status_code == 200 + task_data = response.json() + task_id = task_data["id"] + + # Mock the task manager get_task_info to return a generic failure + def mock_get_task_info(tid): + return { + "id": tid, + "status": "FAILURE", + "error": "Some generic error occurred", + "result": None, + } + + monkeypatch.setattr( + "tmt_web.utils.task_manager.task_manager.get_task_info", mock_get_task_info + ) + + # Now try to get the result, should return 500 with the error + response = client.get(f"/?task-id={task_id}&format=json") + assert response.status_code == 500 + assert "Some generic error occurred" in response.json()["detail"] + + def test_task_pending_status(self, client, monkeypatch): + """Test getting PENDING task status (covers the _to_task_out default path).""" + # First create a real task to get a valid task ID + response = client.get( + "/?test-url=https://github.com/teemtee/tmt&test-name=/tests/core/smoke&format=json", + ) + assert response.status_code == 200 + task_data = response.json() + task_id = task_data["id"] + + # Mock the task manager get_task_info to return PENDING status + def mock_get_task_info(tid): + return { + "id": tid, + "status": "PENDING", + "result": None, + } + + monkeypatch.setattr( + "tmt_web.utils.task_manager.task_manager.get_task_info", mock_get_task_info + ) + + # Test with JSON format + response = client.get(f"/status?task-id={task_id}") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "PENDING" + assert "/status?task-id=" in data["status_callback_url"] + + # Test with HTML format to cover line 392 + response = client.get(f"/?task-id={task_id}&format=html") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "PENDING" + assert "/status/html?task-id=" in data["status_callback_url"] + def test_status_endpoint_no_task_id(self, client): """Test /status endpoint with no task_id parameter.""" response = client.get("/status") @@ -159,42 +239,33 @@ def test_health_check(self, client): assert "tmt" in data["version"] # Check dependencies - assert "celery" in data["dependencies"] - assert "redis" in data["dependencies"] - # When Celery is disabled, dependencies should be marked as disabled - assert data["dependencies"]["celery"] == "disabled" - assert data["dependencies"]["redis"] == "disabled" + assert "valkey" in data["dependencies"] + assert data["dependencies"]["valkey"] in ["ok", "failed"] # Check system info assert "platform" in data["system"] assert "hostname" in data["system"] assert "python_implementation" in data["system"] - def test_health_check_redis_error(self, client, monkeypatch): - """Test health check when Redis ping fails.""" - # Enable Celery for this test - os.environ["USE_CELERY"] = "true" + def test_health_check_valkey_error(self, client, monkeypatch): + """Test health check when Valkey ping fails.""" - # Mock Redis ping to fail + # Mock Valkey ping to fail def mock_ping(*args, **kwargs): - raise Exception("Redis connection failed") + raise Exception("Valkey connection failed") - monkeypatch.setattr("tmt_web.service.main.app.control.ping", mock_ping) + monkeypatch.setattr("tmt_web.utils.task_manager.task_manager.client.ping", mock_ping) response = client.get("/health") assert response.status_code == 200 data = response.json() - assert data["dependencies"]["celery"] == "failed" - assert data["dependencies"]["redis"] == "failed" - + assert data["dependencies"]["valkey"] == "failed" + assert data["status"] == "ok" # Overall status should still be ok -class TestCelery: - """This class tests the API with the Celery instance.""" - @pytest.fixture(autouse=True) - def _setup(self): - os.environ["USE_CELERY"] = "true" +class TestBackgroundTasks: + """This class tests the API with background tasks.""" def test_basic_test_request_json(self, client): """Test basic test request with explicit json format.""" @@ -397,13 +468,21 @@ def test_root_endpoint_with_task_id_html(self, client): break time.sleep(0.1) - def test_root_endpoint_with_invalid_task_id(self, client): + def test_root_endpoint_with_invalid_task_id(self, client, monkeypatch): """Test root endpoint with invalid task ID.""" + + # Mock the task manager to handle invalid task IDs + def mock_get_task_info(tid): + return {"id": tid, "status": "FAILURE", "error": "Task not found", "result": None} + + monkeypatch.setattr( + "tmt_web.utils.task_manager.task_manager.get_task_info", mock_get_task_info + ) + response = client.get("/?task-id=invalid-task-id") - assert response.status_code == 200 # Returns 200 with pending status + assert response.status_code == 404 # Should return 404 not found data = response.json() - assert data["status"] == "PENDING" - assert data["id"] == "invalid-task-id" + assert "Task not found" in data["detail"] def test_status_html_endpoint_missing_task_id(self, client): response = client.get("/status/html") @@ -431,8 +510,8 @@ def test_status_html_endpoint_invalid_task_id(self, client): assert "Task Status" in data assert "Status: PENDING" in data - def test_not_found_with_celery(self, client): - """Test that missing tests/plans return 404 with Celery.""" + def test_not_found_with_background_tasks(self, client): + """Test that missing tests/plans return 404 with background tasks.""" # Initial request creates a task - use explicit JSON format response = client.get( "/?test-url=https://github.com/teemtee/tmt&test-name=/nonexistent/test&format=json", @@ -466,15 +545,14 @@ def test_not_found_with_celery(self, client): else: pytest.fail(f"Unexpected status code: {response.status_code}") - def test_health_check_with_celery(self, client): - """Test health check endpoint with Celery enabled.""" + def test_health_check_with_valkey(self, client): + """Test health check endpoint with Valkey.""" response = client.get("/health") assert response.status_code == 200 data = response.json() - # Dependencies should be ok when Celery is enabled and running - assert data["dependencies"]["celery"] == "ok" - assert data["dependencies"]["redis"] == "ok" + # Dependencies should be ok when Valkey is running + assert data["dependencies"]["valkey"] == "ok" def test_task_result_error_handling(self, client, monkeypatch): """Test error handling in _handle_task_result when deserialization fails.""" @@ -486,25 +564,17 @@ def test_task_result_error_handling(self, client, monkeypatch): task_data = response.json() task_id = task_data["id"] - # Create a real AsyncResult first to understand its properties - from celery.result import AsyncResult - - AsyncResult(task_id) + # Mock the task manager get_task_info to return a malformed result + def mock_get_task_info(tid): + return { + "id": tid, + "status": "SUCCESS", + "result": '{"corrupt": "json with malformed structure', # Malformed JSON + } - # Now create a mock that better represents a real task result - from unittest.mock import MagicMock - - mock_result = MagicMock(spec=AsyncResult) - mock_result.task_id = task_id - mock_result.failed.return_value = False - mock_result.successful.return_value = True - - # Set a result that looks like valid serialized data but will cause - # deserialize_data to fail when trying to process it - mock_result.result = '{"corrupt": "json with malformed structure' - - # Use the mock to replace the real AsyncResult when it's fetched by task_id - monkeypatch.setattr("tmt_web.service.main.app.AsyncResult", lambda tid: mock_result) + monkeypatch.setattr( + "tmt_web.utils.task_manager.task_manager.get_task_info", mock_get_task_info + ) # Now try to get the result, which should trigger the error handling response = client.get(f"/?task-id={task_id}&format=json") @@ -521,31 +591,22 @@ def test_task_result_with_not_found_error(self, client, monkeypatch): task_data = response.json() task_id = task_data["id"] - # Create a real AsyncResult first to understand its properties - from celery.result import AsyncResult - - AsyncResult(task_id) - - # Study the actual error format from service.py: - # GeneralError(f"Test '{test_name}' not found") from line 86 - # GeneralError(f"Plan '{plan_name}' not found") from line 118 - - # Setup a task that failed with a "not found" error in the expected format - from unittest.mock import MagicMock - + # Mock the task manager get_task_info to return a not found error from tmt.utils import GeneralError - # Create the exact error format that would be raised in service.py error = GeneralError("Test '/nonexistent/test' not found") - mock_result = MagicMock(spec=AsyncResult) - mock_result.task_id = task_id - mock_result.failed.return_value = True - mock_result.successful.return_value = False - # Use str(error) to get the exact string format that would be seen in the API - mock_result.result = str(error) + def mock_get_task_info(tid): + return { + "id": tid, + "status": "FAILURE", + "error": str(error), + "result": None, + } - monkeypatch.setattr("tmt_web.service.main.app.AsyncResult", lambda tid: mock_result) + monkeypatch.setattr( + "tmt_web.utils.task_manager.task_manager.get_task_info", mock_get_task_info + ) # Now try to get the result, which should trigger a 404 error response = client.get(f"/?task-id={task_id}&format=json") @@ -607,7 +668,7 @@ def test_invalid_parameter_combination(self, client, monkeypatch): """Test the error for invalid combination of test and plan parameters.""" from tmt.utils import GeneralError - # Since we're in TestCelery class and USE_CELERY=true, we need to mock the API's _validate_parameters + # Since we're testing asynchronous behavior, we need to mock the API's _validate_parameters # function instead, which is called before service.main def simulate_invalid_combination(*args, **kwargs): raise GeneralError("Invalid combination of test and plan parameters") diff --git a/tests/unit/test_html_generator.py b/tests/unit/test_html_generator.py index 4d65c3b..539ec09 100644 --- a/tests/unit/test_html_generator.py +++ b/tests/unit/test_html_generator.py @@ -1,7 +1,6 @@ """Unit tests for HTML generator.""" import logging -from unittest.mock import Mock import pytest import tmt @@ -123,10 +122,12 @@ def test_generate_testplan_html(self, test_data, plan_data, logger): def test_generate_status_callback_pending(self, logger): """Test status callback page for pending task.""" - result = Mock(status="PENDING", result=None) + task_id = "123-abc" callback_url = "http://example.com/status" - data = html_generator.generate_status_callback(result, callback_url, logger) + data = html_generator.generate_status_callback( + task_id, callback_url, logger, status="PENDING", result=None + ) assert "" in data assert "

Processing...

" in data @@ -137,10 +138,12 @@ def test_generate_status_callback_pending(self, logger): def test_generate_status_callback_retrying(self, logger): """Test status callback page for retrying task.""" - result = Mock(status="RETRYING", result=None) + task_id = "123-abc" callback_url = "http://example.com/status" - data = html_generator.generate_status_callback(result, callback_url, logger) + data = html_generator.generate_status_callback( + task_id, callback_url, logger, status="RETRYING", result=None + ) assert "" in data assert "

Retrying...

" in data @@ -150,11 +153,13 @@ def test_generate_status_callback_retrying(self, logger): def test_generate_status_callback_success(self, logger): """Test status callback page for successful task.""" + task_id = "123-abc" html_result = "
Test Result
" - result = Mock(status="SUCCESS", result=html_result) callback_url = "http://example.com/status" - data = html_generator.generate_status_callback(result, callback_url, logger) + data = html_generator.generate_status_callback( + task_id, callback_url, logger, status="SUCCESS", result=html_result + ) assert "" in data assert html_result in data # HTML should be included directly @@ -162,11 +167,13 @@ def test_generate_status_callback_success(self, logger): def test_generate_status_callback_failure(self, logger): """Test status callback page for failed task.""" + task_id = "123-abc" error_msg = "Test failed: Something went wrong" - result = Mock(status="FAILURE", result=error_msg) callback_url = "http://example.com/status" - data = html_generator.generate_status_callback(result, callback_url, logger) + data = html_generator.generate_status_callback( + task_id, callback_url, logger, status="FAILURE", result=error_msg + ) assert "" in data assert "

Task Failed

" in data @@ -176,10 +183,12 @@ def test_generate_status_callback_failure(self, logger): def test_generate_status_callback_unknown(self, logger): """Test status callback page for unknown status.""" - result = Mock(status="UNKNOWN", result="Some result") + task_id = "123-abc" callback_url = "http://example.com/status" - data = html_generator.generate_status_callback(result, callback_url, logger) + data = html_generator.generate_status_callback( + task_id, callback_url, logger, status="UNKNOWN", result="Some result" + ) assert "" in data assert "

Task Status

" in data diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py index 8e06e3e..d58ebae 100644 --- a/tests/unit/test_service.py +++ b/tests/unit/test_service.py @@ -11,11 +11,12 @@ from tmt_web.models import PlanData, TestData, TestPlanData from tmt_web.service import ( get_tree, - main, process_plan_request, + process_request, process_test_request, process_testplan_request, ) +from tmt_web.utils.task_manager import task_manager @pytest.fixture @@ -381,35 +382,32 @@ def test_process_testplan_request_with_all_attributes(mocker): def test_main_invalid_parameters(): - """Test main with invalid parameter combinations.""" + """Test directly the worker function that validates parameters.""" + from tmt_web.service import _process_request_worker + # Missing test URL with pytest.raises(GeneralError, match="Missing required test parameters"): - main(None, "test", None, None, None, None, None, None, "json") + _process_request_worker(None, "test", None, None, None, None, None, None, "json") # Missing plan URL with pytest.raises(GeneralError, match="Missing required plan parameters"): - main(None, None, None, None, None, "plan", None, None, "json") + _process_request_worker(None, None, None, None, None, "plan", None, None, "json") # Missing test URL in combined request with pytest.raises(GeneralError, match="Missing required test/plan parameters"): - main(None, "test", None, None, "url", "plan", None, None, "json") + _process_request_worker(None, "test", None, None, "url", "plan", None, None, "json") # Missing plan URL in combined request with pytest.raises(GeneralError, match="Missing required test/plan parameters"): - main("url", "test", None, None, None, "plan", None, None, "json") + _process_request_worker("url", "test", None, None, None, "plan", None, None, "json") # Invalid combination (neither test nor plan) with pytest.raises(GeneralError, match="Invalid combination of test and plan parameters"): - main(None, None, None, None, None, None, None, None, "json") - + _process_request_worker(None, None, None, None, None, None, None, None, "json") -def test_main_with_celery_disabled(mocker): - """Test main when Celery is disabled.""" - import os - - # Set environment variable to disable Celery - os.environ["USE_CELERY"] = "false" +def test_main_direct_processing(mocker): + """Test main direct processing.""" # Create a mock test test_data = TestData(name="test", summary="Test summary") @@ -417,62 +415,90 @@ def test_main_with_celery_disabled(mocker): mocker.patch("tmt_web.service.process_test_request", return_value=test_data) # Mock format_data to verify it's called directly - mock_format = mocker.patch("tmt_web.service.format_data", return_value="formatted_data") + mocker.patch("tmt_web.service.format_data", return_value="formatted_data") # Mock serialize_data to ensure it's NOT called - mock_serialize = mocker.patch("tmt_web.service.serialize_data") + mocker.patch("tmt_web.service.serialize_data") + + # Create a mock BackgroundTasks object + mock_bg_tasks = mocker.Mock() # Call main with test parameters - result = main("url", "test", None, None, None, None, None, None, "json") + result = process_request( + mock_bg_tasks, "url", "test", None, None, None, None, None, None, "json" + ) - # Verify format_data was called directly - mock_format.assert_called_once_with(test_data, "json", mocker.ANY) + # Verify the task_manager.execute_task was called + # Note: We don't need to verify format_data/serialize_data directly + # as that's now handled inside the task manager + assert isinstance(result, str) # Should return a task ID - # Verify serialize_data was NOT called - mock_serialize.assert_not_called() - # Verify result is from format_data - assert result == "formatted_data" +def test_main_with_background_tasks(mocker): + """Test main with background tasks.""" + # Create a mock test + test_data = TestData(name="test", summary="Test summary") - # Reset environment - os.environ.pop("USE_CELERY", None) + # Mock process_test_request to return our test data + mocker.patch("tmt_web.service.process_test_request", return_value=test_data) + # Mock the task_manager.execute_task + mock_task_execute = mocker.patch( + "tmt_web.service.task_manager.execute_task", return_value="task-123" + ) -def test_main_with_celery_enabled(mocker): - """Test main when Celery is enabled (coverage for line 226).""" - import os + # Create a mock BackgroundTasks object + mock_bg_tasks = mocker.Mock() - # Ensure environment variable is set to enable Celery (or not set at all) - if "USE_CELERY" in os.environ: - old_value = os.environ["USE_CELERY"] - os.environ.pop("USE_CELERY") - else: - old_value = None + # Call main with test parameters + result = process_request( + mock_bg_tasks, "url", "test", None, None, None, None, None, None, "json" + ) - # Create a mock test + # Verify task_manager.execute_task was called with the right parameters + mock_task_execute.assert_called_once() + assert result == "task-123" + + +def test_process_request_worker_no_output_format(mocker): + """Test _process_request_worker when no output format is specified.""" + # Create mock data test_data = TestData(name="test", summary="Test summary") - # Mock process_test_request to return our test data + # Mock process_test_request mocker.patch("tmt_web.service.process_test_request", return_value=test_data) - # Mock format_data to verify it's NOT called directly - mock_format = mocker.patch("tmt_web.service.format_data") - - # Mock serialize_data to ensure it IS called and returns serialized data + # Mock serialize_data mock_serialize = mocker.patch("tmt_web.service.serialize_data", return_value="serialized_data") - # Call main with test parameters - result = main("url", "test", None, None, None, None, None, None, "json") + # Call worker with None as format + from tmt_web.service import _process_request_worker - # Verify format_data was NOT called - mock_format.assert_not_called() + result = _process_request_worker("url", "test", None, None, None, None, None, None, None) - # Verify serialize_data WAS called with the test data + # Verify serialize_data was called instead of format_data mock_serialize.assert_called_once_with(test_data) - - # Verify result is from serialize_data assert result == "serialized_data" - # Restore environment if needed - if old_value is not None: - os.environ["USE_CELERY"] = old_value + +def test_task_manager_update_nonexistent_task(mocker): + """Test updating a non-existent task in the task manager.""" + # Mock Valkey client get method to return None (task doesn't exist) + mocker.patch.object(task_manager.client, "get", return_value=None) + + # Mock the set method to check it's not called + mock_set = mocker.patch.object(task_manager.client, "set") + + # Mock logger to verify warning is called + mock_logger = mocker.patch.object(task_manager, "logger") + + # Try to update a non-existent task + task_manager.update_task("nonexistent-task-id", "SUCCESS", result="test result") + + # Verify warning was logged + mock_logger.warning.assert_called_once_with( + "Trying to update non-existent task nonexistent-task-id" + ) + + # Verify client.set was not called (early return) + mock_set.assert_not_called() From b6e5da3f8c5144ffdf9bd935c2b4608379574f2a Mon Sep 17 00:00:00 2001 From: Martin Hoyer Date: Tue, 18 Mar 2025 14:03:26 +0100 Subject: [PATCH 2/9] fixup! Switch to Valkey, drop Celery. --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2df1710..8a41dba 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -46,7 +46,7 @@ jobs: - name: Start valkey container run: | - podman run -d --pod tmt-web-pod --name valkey valkey:latest + podman run -d --pod tmt-web-pod --name valkey valkey/valkey:latest - name: Start Web container run: | From d99155b9759663943481ef394b3d5163452c5289 Mon Sep 17 00:00:00 2001 From: Martin Hoyer Date: Tue, 18 Mar 2025 14:11:59 +0100 Subject: [PATCH 3/9] Update src/tmt_web/utils/task_manager.py Co-authored-by: codiumai-pr-agent-free[bot] <138128286+codiumai-pr-agent-free[bot]@users.noreply.github.com> --- src/tmt_web/utils/task_manager.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/tmt_web/utils/task_manager.py b/src/tmt_web/utils/task_manager.py index 0d85f5a..5dd05f5 100644 --- a/src/tmt_web/utils/task_manager.py +++ b/src/tmt_web/utils/task_manager.py @@ -85,20 +85,25 @@ def update_task( self.logger.warning(f"Trying to update non-existent task {task_id}") return - task_info = json.loads(task_data) - task_info["status"] = status + try: + task_info = json.loads(task_data) + task_info["status"] = status - if result is not None: - task_info["result"] = result + if result is not None: + task_info["result"] = result - if error is not None: - task_info["error"] = error + if error is not None: + task_info["error"] = error - # Update timestamp - task_info["updated_at"] = datetime.now(tz=UTC).isoformat() + # Update timestamp + task_info["updated_at"] = datetime.now(tz=UTC).isoformat() - # Store updated info - self.client.set(task_key, json.dumps(task_info), ex=DEFAULT_TASK_EXPIRY) + # Store updated info + self.client.set(task_key, json.dumps(task_info), ex=DEFAULT_TASK_EXPIRY) + except json.JSONDecodeError: + self.logger.fail(f"Failed to decode task data for {task_id}") + except (TypeError, ValueError) as e: + self.logger.fail(f"Failed to encode task data for {task_id}: {e}") def execute_task( self, From 1d7fda6408f61bade112c7e1d608609872ab8ec5 Mon Sep 17 00:00:00 2001 From: Martin Hoyer Date: Tue, 18 Mar 2025 14:12:33 +0100 Subject: [PATCH 4/9] Update src/tmt_web/utils/task_manager.py Co-authored-by: codiumai-pr-agent-free[bot] <138128286+codiumai-pr-agent-free[bot]@users.noreply.github.com> --- src/tmt_web/utils/task_manager.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/tmt_web/utils/task_manager.py b/src/tmt_web/utils/task_manager.py index 5dd05f5..fdfac87 100644 --- a/src/tmt_web/utils/task_manager.py +++ b/src/tmt_web/utils/task_manager.py @@ -72,7 +72,16 @@ def get_task_info(self, task_id: str) -> dict[str, Any]: "result": None, } - return json.loads(task_data) + try: + return json.loads(task_data) + except json.JSONDecodeError: + self.logger.fail(f"Corrupted task data for {task_id}") + return { + "id": task_id, + "status": FAILURE, + "error": "Corrupted task data", + "result": None, + } def update_task( self, task_id: str, status: str, result: Any = None, error: str | None = None From 4ccc5a1c378cc80cfa354f15fa6782e6d21783f0 Mon Sep 17 00:00:00 2001 From: Martin Hoyer Date: Tue, 18 Mar 2025 14:24:45 +0100 Subject: [PATCH 5/9] squash: status callback --- src/tmt_web/api.py | 14 +++++++- tests/integration/test_api.py | 4 ++- tests/unit/test_service.py | 63 +++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/src/tmt_web/api.py b/src/tmt_web/api.py index 9e68d0b..e2b7ce8 100644 --- a/src/tmt_web/api.py +++ b/src/tmt_web/api.py @@ -379,8 +379,20 @@ def get_task_status_html( ) status_callback_url = f"{settings.API_HOSTNAME}/status/html?task-id={task_id}" + + # For FAILURE status, use the error message if available + result = task_info.get("result") + if task_info["status"] == FAILURE and task_info.get("error"): + result = task_info["error"] + return HTMLResponse( - content=html_generator.generate_status_callback(task_id, status_callback_url, logger), + content=html_generator.generate_status_callback( + task_id, + status_callback_url, + logger, + status=task_info["status"], + result=result, + ), ) diff --git a/tests/integration/test_api.py b/tests/integration/test_api.py index 66efaa7..0755afd 100644 --- a/tests/integration/test_api.py +++ b/tests/integration/test_api.py @@ -508,7 +508,9 @@ def test_status_html_endpoint_invalid_task_id(self, client): assert response.status_code == 200 # Still returns 200 as it shows a status page data = response.content.decode("utf-8") assert "Task Status" in data - assert "Status: PENDING" in data + assert "Status: FAILURE" in data # Now returns FAILURE for invalid task IDs + assert "Task Failed" in data # Should show failure state + assert "Error: Task not found" in data # Now properly displays the error message def test_not_found_with_background_tasks(self, client): """Test that missing tests/plans return 404 with background tasks.""" diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py index d58ebae..869d5c7 100644 --- a/tests/unit/test_service.py +++ b/tests/unit/test_service.py @@ -1,5 +1,6 @@ """Unit tests for the service layer.""" +import json import logging from unittest.mock import Mock @@ -502,3 +503,65 @@ def test_task_manager_update_nonexistent_task(mocker): # Verify client.set was not called (early return) mock_set.assert_not_called() + + +def test_task_manager_get_corrupted_task(mocker): + """Test getting a task with corrupted JSON data.""" + # Mock Valkey client get method to return invalid JSON + mocker.patch.object(task_manager.client, "get", return_value=b"not valid json") + + # Mock logger to verify error is logged + mock_logger = mocker.patch.object(task_manager, "logger") + + # Get a task with corrupted data + result = task_manager.get_task_info("corrupted-task-id") + + # Verify error was logged + mock_logger.fail.assert_called_once_with("Corrupted task data for corrupted-task-id") + + # Verify returned data has expected error state + assert result["id"] == "corrupted-task-id" + assert result["status"] == "FAILURE" + assert result["error"] == "Corrupted task data" + assert result["result"] is None + + +def test_task_manager_update_decode_error(mocker): + """Test updating a task with corrupted JSON data.""" + # Mock Valkey client get method to return invalid JSON + mocker.patch.object(task_manager.client, "get", return_value=b"not valid json") + + # Mock logger to verify error is logged + mock_logger = mocker.patch.object(task_manager, "logger") + + # Update a task with corrupted data + task_manager.update_task("corrupted-task-id", "SUCCESS", result="test result") + + # Verify error was logged + mock_logger.fail.assert_called_once_with("Failed to decode task data for corrupted-task-id") + + +def test_task_manager_update_encode_error(mocker): + """Test updating a task with data that can't be encoded to JSON.""" + + # Create a mock object that can't be JSON serialized + class UnserializableObject: + pass + + unserializable = UnserializableObject() + + # Mock Valkey client get method to return valid task data + valid_task_data = json.dumps( + {"id": "task-id", "status": "PENDING", "created_at": "2023-01-01T00:00:00+00:00"} + ).encode() + mocker.patch.object(task_manager.client, "get", return_value=valid_task_data) + + # Mock logger to verify error is logged + mock_logger = mocker.patch.object(task_manager, "logger") + + # Update a task with unserializable data + task_manager.update_task("task-id", "SUCCESS", result=unserializable) + + # Verify error was logged + assert mock_logger.fail.call_count == 1 + assert "Failed to encode task data for task-id" in mock_logger.fail.call_args[0][0] From 4eeda9873c91f4037ddbe1f54ae7e4136e044151 Mon Sep 17 00:00:00 2001 From: Martin Hoyer Date: Tue, 22 Apr 2025 13:15:40 +0200 Subject: [PATCH 6/9] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Filip Vágner --- .github/workflows/test.yml | 2 +- src/tmt_web/utils/task_manager.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8a41dba..1dc8d03 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -46,7 +46,7 @@ jobs: - name: Start valkey container run: | - podman run -d --pod tmt-web-pod --name valkey valkey/valkey:latest + podman run -d --pod tmt-web-pod --name valkey valkey/valkey:alpine - name: Start Web container run: | diff --git a/src/tmt_web/utils/task_manager.py b/src/tmt_web/utils/task_manager.py index fdfac87..daa9f72 100644 --- a/src/tmt_web/utils/task_manager.py +++ b/src/tmt_web/utils/task_manager.py @@ -91,7 +91,7 @@ def update_task( task_data = self.client.get(task_key) if not task_data: - self.logger.warning(f"Trying to update non-existent task {task_id}") + self.logger.fail(f"Trying to update non-existent task {task_id}") return try: From 8cf55a6a9e0389e79c1bcbb008cc548b1f509727 Mon Sep 17 00:00:00 2001 From: Martin Hoyer Date: Thu, 15 May 2025 17:53:55 +0200 Subject: [PATCH 7/9] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Filip Vágner --- compose.yaml | 3 +-- tests/unit/test_service.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/compose.yaml b/compose.yaml index 7fe0c18..0a935c0 100644 --- a/compose.yaml +++ b/compose.yaml @@ -4,7 +4,6 @@ services: build: context: . dockerfile: ./Containerfile - command: uvicorn tmt_web.api:app --reload --host 0.0.0.0 --port 8000 environment: - VALKEY_URL=valkey://valkey:6379 - API_HOSTNAME=http://localhost:8000 @@ -23,7 +22,7 @@ services: valkey: container_name: valkey - image: valkey:alpine + image: valkey/valkey:alpine ports: - 6379:6379 healthcheck: diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py index 869d5c7..24437f5 100644 --- a/tests/unit/test_service.py +++ b/tests/unit/test_service.py @@ -497,7 +497,7 @@ def test_task_manager_update_nonexistent_task(mocker): task_manager.update_task("nonexistent-task-id", "SUCCESS", result="test result") # Verify warning was logged - mock_logger.warning.assert_called_once_with( + mock_logger.fail.assert_called_once_with( "Trying to update non-existent task nonexistent-task-id" ) From e98c492b9e2c1864690dcf211c7d26c1c134c762 Mon Sep 17 00:00:00 2001 From: Martin Hoyer Date: Thu, 15 May 2025 17:57:45 +0200 Subject: [PATCH 8/9] Drop APP parameter --- entrypoint.sh | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/entrypoint.sh b/entrypoint.sh index 34bc304..659153c 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -8,21 +8,8 @@ error() { # Handle signals trap 'kill -TERM $PID' TERM INT -# Name of container to start -APP=$1 +uvicorn tmt_web.api:app --reload --host 0.0.0.0 --port 8000 & -[ -z "$APP" ] && error "No app to run passed to entrypoint script" - -case $APP in - uvicorn) - COMMAND="uvicorn tmt_web.api:app --reload --host 0.0.0.0 --port 8000" - ;; - *) - error "Unknown app '$APP'" - ;; -esac - -$COMMAND & PID=$! wait $PID From ee8d3fd9d51103a87a2854ee7956c8b1b05660e2 Mon Sep 17 00:00:00 2001 From: Martin Hoyer Date: Thu, 22 May 2025 14:13:44 +0200 Subject: [PATCH 9/9] Minor fixes based on review comments --- src/tmt_web/api.py | 6 +++++- src/tmt_web/utils/task_manager.py | 8 ++++++++ tests/integration/test_api.py | 2 +- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/tmt_web/api.py b/src/tmt_web/api.py index e2b7ce8..99c22cf 100644 --- a/src/tmt_web/api.py +++ b/src/tmt_web/api.py @@ -345,6 +345,10 @@ def get_task_status( status_code=status.HTTP_404_NOT_FOUND, detail=error_message, ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=error_message or "Task failed", + ) return _to_task_out(task_info) @@ -430,7 +434,7 @@ def health_check() -> HealthStatus: valkey_status = "ok" try: # Ping Valkey - task_manager.client.ping() + task_manager.ping() except Exception: valkey_status = "failed" diff --git a/src/tmt_web/utils/task_manager.py b/src/tmt_web/utils/task_manager.py index daa9f72..d739a18 100644 --- a/src/tmt_web/utils/task_manager.py +++ b/src/tmt_web/utils/task_manager.py @@ -35,6 +35,14 @@ def __init__(self) -> None: self.client = Valkey.from_url(settings.VALKEY_URL) self.logger.debug(f"Connected to Valkey at {settings.VALKEY_URL}") + def ping(self) -> bool: + """Check connectivity to Valkey. + + Returns: + True if connection is successful, otherwise raises an exception + """ + return self.client.ping() + def _get_task_key(self, task_id: str) -> str: """Get the Valkey key for a task.""" return f"task:{task_id}" diff --git a/tests/integration/test_api.py b/tests/integration/test_api.py index 0755afd..8257e77 100644 --- a/tests/integration/test_api.py +++ b/tests/integration/test_api.py @@ -254,7 +254,7 @@ def test_health_check_valkey_error(self, client, monkeypatch): def mock_ping(*args, **kwargs): raise Exception("Valkey connection failed") - monkeypatch.setattr("tmt_web.utils.task_manager.task_manager.client.ping", mock_ping) + monkeypatch.setattr("tmt_web.utils.task_manager.task_manager.ping", mock_ping) response = client.get("/health") assert response.status_code == 200