diff --git a/opentakserver/blueprints/ots_api/__init__.py b/opentakserver/blueprints/ots_api/__init__.py index d142d7fd..44dc6ed9 100644 --- a/opentakserver/blueprints/ots_api/__init__.py +++ b/opentakserver/blueprints/ots_api/__init__.py @@ -15,6 +15,7 @@ from opentakserver.blueprints.ots_api.group_api import group_api from opentakserver.blueprints.ots_api.eud_stats_api import eud_stats_blueprint from opentakserver.blueprints.ots_api.plugin_api import plugin_blueprint +from opentakserver.blueprints.ots_api.health_api import health_api ots_api = Blueprint("ots_api", __name__) ots_api.register_blueprint(api_blueprint) @@ -33,3 +34,4 @@ ots_api.register_blueprint(eud_stats_blueprint) ots_api.register_blueprint(plugin_blueprint) ots_api.register_blueprint(token_api_blueprint) +ots_api.register_blueprint(health_api) diff --git a/opentakserver/blueprints/ots_api/health_api.py b/opentakserver/blueprints/ots_api/health_api.py new file mode 100644 index 00000000..2c69b7cc --- /dev/null +++ b/opentakserver/blueprints/ots_api/health_api.py @@ -0,0 +1,55 @@ +from flask import Blueprint, jsonify, request +from flask_security import auth_required + +from opentakserver.health import cot_parser, eud_handler + +# Blueprint for health endpoints +health_api = Blueprint("health_api", __name__) + + +@health_api.route("/api/health/ots") +@auth_required() +def health_ots(): + """Placeholder health check for OTS.""" + return jsonify({"status": "ok"}) + + +@health_api.route("/api/health/cot") +@auth_required() +def health_cot(): + """Health check for the CoT parser service.""" + service_state = cot_parser.query_systemd() + log_lines = cot_parser.tail_ots_log_for_cot_parser_entries() + log_errors = cot_parser.find_errors(log_lines) + rabbit_ok = cot_parser.rabbitmq_check() + + status = cot_parser.compute_status(service_state, log_errors, rabbit_ok) + status["timestamp"] = cot_parser.current_timestamp() + + strict = request.args.get("strict", "false").lower() == "true" + code = 200 + if strict and status["overall"] != "healthy": + code = 503 + + return jsonify(status), code + + +@health_api.route("/api/health/eud") +@auth_required() +def health_eud(): + """Health check for the EUD handler service.""" + service_state = eud_handler.query_systemd() + log_lines = eud_handler.tail_ots_log_for_eud_handler_entries() + log_errors = eud_handler.find_errors(log_lines) + rabbit_ok = eud_handler.rabbitmq_check() + + status = eud_handler.compute_status(service_state, log_errors, rabbit_ok) + status["timestamp"] = eud_handler.current_timestamp() + + strict = request.args.get("strict", "false").lower() == "true" + code = 200 + if strict and status["overall"] != "healthy": + code = 503 + + return jsonify(status), code + diff --git a/opentakserver/health/__init__.py b/opentakserver/health/__init__.py new file mode 100644 index 00000000..b48ae5e7 --- /dev/null +++ b/opentakserver/health/__init__.py @@ -0,0 +1,2 @@ +"""Health utilities for OpenTAKServer.""" + diff --git a/opentakserver/health/cot_parser.py b/opentakserver/health/cot_parser.py new file mode 100644 index 00000000..d9782812 --- /dev/null +++ b/opentakserver/health/cot_parser.py @@ -0,0 +1,122 @@ +import os +import re +import socket +import subprocess +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterable, List + +COT_PARSER_SERVICE = os.getenv("COT_PARSER_SERVICE", "cot_parser.service") +OTS_DATA_FOLDER = os.getenv("OTS_DATA_FOLDER", os.path.join(Path.home(), "ots")) +COT_PARSER_LOG = os.getenv( + "COT_PARSER_LOG", + os.path.join(OTS_DATA_FOLDER, "logs", "opentakserver.log"), +) +RABBIT_HOST = os.getenv("RABBIT_HOST", "localhost") +RABBIT_PORT = int(os.getenv("RABBIT_PORT", "5672")) +ERROR_PATTERN = os.getenv("COT_PARSER_ERROR_REGEX", r"(ERROR|Exception|Traceback)") +ERROR_REGEX = re.compile(ERROR_PATTERN, re.IGNORECASE) +LOG_TAG = "cot_parser" + + +def query_systemd(service: str = COT_PARSER_SERVICE) -> str: + """ + Returns one of: active, inactive, failed, activating, deactivating, reloading, unknown + """ + # First try: is-active (simplest, stable output) + try: + completed = subprocess.run( + ["systemctl", "is-active", service], + check=False, + capture_output=True, + text=True, + ) + state = completed.stdout.strip() + if state: # active/inactive/failed/... + return state + except Exception: + pass + + # Fallback: show ActiveState + try: + completed = subprocess.run( + ["systemctl", "show", service, "--property=ActiveState", "--value"], + check=True, + capture_output=True, + text=True, + ) + return completed.stdout.strip() + except Exception as exc: + return f"error: {exc}" + + +def tail_ots_log_for_cot_parser_entries( + path: str = COT_PARSER_LOG, lines: int = 100, tag: str = LOG_TAG +) -> List[str]: + """Return the last ``lines`` from the OTS log produced by ``cot_parser``.""" + try: + with open(path, "rb") as fh: + fh.seek(0, os.SEEK_END) + size = fh.tell() + block = 1024 + data = bytearray() + while size > 0 and data.count(b"\n") <= lines: + step = min(block, size) + size -= step + fh.seek(size) + data = fh.read(step) + data + log_lines = data.decode(errors="ignore").splitlines() + return [line for line in log_lines if tag in line][-lines:] + except OSError as exc: # pragma: no cover - exercised via tests + return [f"error: {exc}"] + + +def find_errors(lines: Iterable[str]) -> List[str]: + """Filter log lines that match ``ERROR_REGEX``.""" + return [line for line in lines if ERROR_REGEX.search(line)] + + +def rabbitmq_check(host: str = RABBIT_HOST, port: int = RABBIT_PORT, timeout: float = 1.0) -> bool: + """Attempt a TCP connection to RabbitMQ and return whether it succeeded.""" + try: + with socket.create_connection((host, port), timeout=timeout): + return True + except OSError: # pragma: no cover - exercised via tests + return False + + +def compute_status(service_state: str, log_errors: List[str], rabbitmq_ok: bool) -> dict: + """Compute component and overall health status.""" + components = { + "service": service_state, + "logs": "errors" if log_errors else "error-free", + "rabbitmq": "up" if rabbitmq_ok else "down", + } + problems: List[str] = [] + if service_state != "active": + problems.append("cot_parser service inactive") + if log_errors: + problems.append("errors detected in log") + if not rabbitmq_ok: + problems.append("rabbitmq unreachable") + + # Determine operational status + overall = "non-operational" + if rabbitmq_ok and service_state == "active": + overall = "operational-" + if log_errors: + overall += "errors" + else: + overall += "healthy" + + return { + "status": overall, + "components": components, + "problems": problems, + } + + +def current_timestamp() -> str: + """Return an ISO 8601 UTC timestamp.""" + return datetime.now(timezone.utc).isoformat() + diff --git a/opentakserver/health/eud_handler.py b/opentakserver/health/eud_handler.py new file mode 100644 index 00000000..d4a5f6a9 --- /dev/null +++ b/opentakserver/health/eud_handler.py @@ -0,0 +1,119 @@ +import os +import re +import socket +import subprocess +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterable, List + +EUD_HANDLER_SERVICE = os.getenv("EUD_HANDLER_SERVICE", "eud_handler.service") +OTS_DATA_FOLDER = os.getenv("OTS_DATA_FOLDER", os.path.join(Path.home(), "ots")) +EUD_HANDLER_LOG = os.getenv( + "EUD_HANDLER_LOG", + os.path.join(OTS_DATA_FOLDER, "logs", "opentakserver.log"), +) +RABBIT_HOST = os.getenv("RABBIT_HOST", "localhost") +RABBIT_PORT = int(os.getenv("RABBIT_PORT", "5672")) +ERROR_PATTERN = os.getenv("EUD_HANDLER_ERROR_REGEX", r"(ERROR|Exception|Traceback)") +ERROR_REGEX = re.compile(ERROR_PATTERN, re.IGNORECASE) +LOG_TAG = "eud_handler" + + +def query_systemd(service: str = EUD_HANDLER_SERVICE) -> str: + """Returns one of: active, inactive, failed, activating, deactivating, reloading, unknown""" + # First try: is-active (simplest, stable output) + try: + completed = subprocess.run( + ["systemctl", "is-active", service], + check=False, + capture_output=True, + text=True, + ) + state = completed.stdout.strip() + if state: # active/inactive/failed/... + return state + except Exception: + pass + + # Fallback: show ActiveState + try: + completed = subprocess.run( + ["systemctl", "show", service, "--property=ActiveState", "--value"], + check=True, + capture_output=True, + text=True, + ) + return completed.stdout.strip() + except Exception as exc: + return f"error: {exc}" + + +def tail_ots_log_for_eud_handler_entries( + path: str = EUD_HANDLER_LOG, lines: int = 100, tag: str = LOG_TAG +) -> List[str]: + """Return the last ``lines`` from the OTS log produced by ``eud_handler``.""" + try: + with open(path, "rb") as fh: + fh.seek(0, os.SEEK_END) + size = fh.tell() + block = 1024 + data = bytearray() + while size > 0 and data.count(b"\n") <= lines: + step = min(block, size) + size -= step + fh.seek(size) + data = fh.read(step) + data + log_lines = data.decode(errors="ignore").splitlines() + return [line for line in log_lines if tag in line][-lines:] + except OSError as exc: # pragma: no cover - exercised via tests + return [f"error: {exc}"] + + +def find_errors(lines: Iterable[str]) -> List[str]: + """Filter log lines that match ``ERROR_REGEX``.""" + return [line for line in lines if ERROR_REGEX.search(line)] + + +def rabbitmq_check(host: str = RABBIT_HOST, port: int = RABBIT_PORT, timeout: float = 1.0) -> bool: + """Attempt a TCP connection to RabbitMQ and return whether it succeeded.""" + try: + with socket.create_connection((host, port), timeout=timeout): + return True + except OSError: # pragma: no cover - exercised via tests + return False + + +def compute_status(service_state: str, log_errors: List[str], rabbitmq_ok: bool) -> dict: + """Compute component and overall health status.""" + components = { + "service": service_state, + "logs": "errors" if log_errors else "error-free", + "rabbitmq": "up" if rabbitmq_ok else "down", + } + problems: List[str] = [] + if service_state != "active": + problems.append("eud_handler service inactive") + if log_errors: + problems.append("errors detected in log") + if not rabbitmq_ok: + problems.append("rabbitmq unreachable") + + # Determine operational status + overall = "non-operational" + if rabbitmq_ok and service_state == "active": + overall = "operational-" + if log_errors: + overall += "errors" + else: + overall += "healthy" + + return { + "status": overall, + "components": components, + "problems": problems, + } + + +def current_timestamp() -> str: + """Return an ISO 8601 UTC timestamp.""" + return datetime.now(timezone.utc).isoformat() diff --git a/tests/test_health_api.py b/tests/test_health_api.py new file mode 100644 index 00000000..00aad254 --- /dev/null +++ b/tests/test_health_api.py @@ -0,0 +1,75 @@ +from unittest.mock import patch + + +def test_health_endpoints(auth): + for endpoint in ("ots", "eud"): + response = auth.get(f"/api/health/{endpoint}") + assert response.status_code == 200 + + +def test_health_cot_healthy(auth): + with patch("opentakserver.health.cot_parser.query_systemd", return_value="active"), \ + patch( + "opentakserver.health.cot_parser.tail_ots_log_for_cot_parser_entries", + return_value=["all good"], + ), \ + patch("opentakserver.health.cot_parser.find_errors", return_value=[]), \ + patch("opentakserver.health.cot_parser.rabbitmq_check", return_value=True): + response = auth.get("/api/health/cot") + assert response.status_code == 200 + data = response.json + assert data["overall"] == "healthy" + assert data["problems"] == [] + assert "timestamp" in data + + +def test_health_cot_unhealthy_strict(auth): + with patch("opentakserver.health.cot_parser.query_systemd", return_value="inactive"), \ + patch( + "opentakserver.health.cot_parser.tail_ots_log_for_cot_parser_entries", + return_value=["error"], + ), \ + patch("opentakserver.health.cot_parser.find_errors", return_value=["error"]), \ + patch("opentakserver.health.cot_parser.rabbitmq_check", return_value=False): + response = auth.get("/api/health/cot?strict=true") + assert response.status_code == 503 + data = response.json + assert data["overall"] == "unhealthy" + assert data["problems"] + + +def test_health_eud_healthy(auth): + with patch("opentakserver.health.eud_handler.query_systemd", return_value="active"), \ + patch( + "opentakserver.health.eud_handler.tail_ots_log_for_eud_handler_entries", + return_value=["all good"], + ), \ + patch("opentakserver.health.eud_handler.find_errors", return_value=[]), \ + patch("opentakserver.health.eud_handler.rabbitmq_check", return_value=True): + response = auth.get("/api/health/eud") + assert response.status_code == 200 + data = response.json + assert data["overall"] == "healthy" + assert data["problems"] == [] + assert "timestamp" in data + + +def test_health_eud_unhealthy_strict(auth): + with patch("opentakserver.health.eud_handler.query_systemd", return_value="inactive"), \ + patch( + "opentakserver.health.eud_handler.tail_ots_log_for_eud_handler_entries", + return_value=["error"], + ), \ + patch("opentakserver.health.eud_handler.find_errors", return_value=["error"]), \ + patch("opentakserver.health.eud_handler.rabbitmq_check", return_value=False): + response = auth.get("/api/health/eud?strict=true") + assert response.status_code == 503 + data = response.json + assert data["overall"] == "unhealthy" + assert data["problems"] + + +def test_health_requires_auth(client): + for endpoint in ("ots", "cot", "eud"): + response = client.get(f"/api/health/{endpoint}") + assert response.status_code in (401, 302)