From c9d5d61cf5082285d9a5c8aad6163cb07bf86df5 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 22 Mar 2026 23:30:51 +0530 Subject: [PATCH] refactor: replace if/elif routing chains with declarative route tables Replace 95-line _handle_get() and 29-line _handle_post() if/elif chains with four declarative routing tables: exact-match dicts for O(1) lookup and regex lists for parameterized paths. Extract handler functions for endpoints with validation logic. Add _BadRequest and _NotFound exceptions for clean error responses from handlers. Adding a new endpoint is now a one-liner in the route table instead of another elif branch with manual path parsing. --- py_src/taskito/dashboard.py | 380 +++++++++++++++++++----------------- 1 file changed, 206 insertions(+), 174 deletions(-) diff --git a/py_src/taskito/dashboard.py b/py_src/taskito/dashboard.py index fb58009..a32178c 100644 --- a/py_src/taskito/dashboard.py +++ b/py_src/taskito/dashboard.py @@ -15,6 +15,7 @@ import json import logging +import re import threading from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from importlib import resources @@ -27,6 +28,13 @@ from taskito.app import Queue +class _BadRequest(Exception): + """Raised by route handlers to signal a 400 response.""" + + def __init__(self, message: str) -> None: + self.message = message + + def _read_template(path: str) -> str: """Read a file from the bundled templates directory.""" return resources.files("taskito").joinpath(path).read_text(encoding="utf-8") @@ -50,21 +58,112 @@ def _get_spa_html() -> str: return _SPA_HTML +def _parse_int_qs(qs: dict, key: str, default: int) -> int: + """Parse an integer from query string, raising _BadRequest on invalid input.""" + try: + val = int(qs.get(key, [str(default)])[0]) + except (ValueError, IndexError): + raise _BadRequest(f"{key} must be an integer") from None + if val < 0: + raise _BadRequest(f"{key} must be non-negative") + return val + + +# ── Route handlers ──────────────────────────────────────────────────── +# +# Each handler takes (queue, qs) for GET or (queue, param) for parameterized +# routes and returns JSON-serializable data. Raise _BadRequest for 400s. + + +def _handle_list_jobs(queue: Queue, qs: dict) -> list[dict]: + status = qs.get("status", [None])[0] + q = qs.get("queue", [None])[0] + task = qs.get("task", [None])[0] + metadata_like = qs.get("metadata", [None])[0] + error_like = qs.get("error", [None])[0] + created_after = qs.get("created_after", [None])[0] + created_before = qs.get("created_before", [None])[0] + limit = _parse_int_qs(qs, "limit", 20) + offset = _parse_int_qs(qs, "offset", 0) + + if any(x is not None for x in [metadata_like, error_like, created_after, created_before]): + ca = int(created_after) if created_after else None + cb = int(created_before) if created_before else None + jobs = queue.list_jobs_filtered( + status=status, + queue=q, + task_name=task, + metadata_like=metadata_like, + error_like=error_like, + created_after=ca, + created_before=cb, + limit=limit, + offset=offset, + ) + else: + jobs = queue.list_jobs(status=status, queue=q, task_name=task, limit=limit, offset=offset) + return [j.to_dict() for j in jobs] + + +def _handle_dead_letters(queue: Queue, qs: dict) -> list: + limit = _parse_int_qs(qs, "limit", 20) + offset = _parse_int_qs(qs, "offset", 0) + return queue.dead_letters(limit=limit, offset=offset) + + +def _handle_metrics(queue: Queue, qs: dict) -> dict: + task = qs.get("task", [None])[0] + since = _parse_int_qs(qs, "since", 3600) + return queue.metrics(task_name=task, since=since) + + +def _handle_metrics_timeseries(queue: Queue, qs: dict) -> list: + task = qs.get("task", [None])[0] + since = _parse_int_qs(qs, "since", 3600) + bucket = _parse_int_qs(qs, "bucket", 60) + return queue.metrics_timeseries(task_name=task, since=since, bucket=bucket) + + +def _handle_logs(queue: Queue, qs: dict) -> list: + task = qs.get("task", [None])[0] + level = qs.get("level", [None])[0] + since = _parse_int_qs(qs, "since", 3600) + limit = _parse_int_qs(qs, "limit", 100) + return queue.query_logs(task_name=task, level=level, since=since, limit=limit) + + +def _handle_stats_queues(queue: Queue, qs: dict) -> dict: + q_name = qs.get("queue", [None])[0] + if q_name: + return queue.stats_by_queue(q_name) + return queue.stats_all_queues() + + +class _NotFound(Exception): + """Raised by route handlers to signal a 404 response.""" + + def __init__(self, message: str) -> None: + self.message = message + + +def _handle_get_job(queue: Queue, _qs: dict, job_id: str) -> dict: + job = queue.get_job(job_id) + if job is None: + raise _NotFound("Job not found") + return job.to_dict() + + +def _handle_replay_post(queue: Queue, job_id: str) -> dict: + result = queue.replay(job_id) + return {"replay_job_id": result.id} + + def build_scaler_response( queue: Queue, queue_name: str | None = None, target_queue_depth: int = 10, ) -> dict[str, Any]: - """Build KEDA-compatible scaler payload for a queue. - - Args: - queue: The Queue instance to query. - queue_name: Optional queue name to filter by. - target_queue_depth: Scaling target hint for KEDA. - - Returns: - Dict with metricName, metricValue, isActive, workerUtilization, etc. - """ + """Build KEDA-compatible scaler payload for a queue.""" stats = queue.stats() depth = stats.get("pending", 0) running = stats.get("running", 0) @@ -103,22 +202,6 @@ def build_scaler_response( return response -def _extract_path_segment(path: str, prefix: str, suffix: str = "") -> str: - """Extract the segment between prefix and suffix from a URL path.""" - if suffix: - return path[len(prefix) : -len(suffix)] - return path[len(prefix) :] - - -def _parse_int_qs(qs: dict, key: str, default: int) -> int | None: - """Parse an integer from query string, returning None on invalid input.""" - try: - val = int(qs.get(key, [str(default)])[0]) - return val if val >= 0 else None - except (ValueError, IndexError): - return None - - def serve_dashboard( queue: Queue, host: str = "127.0.0.1", @@ -131,7 +214,6 @@ def serve_dashboard( host: Bind address. port: Bind port. """ - handler = _make_handler(queue) server = ThreadingHTTPServer((host, port), handler) print(f"taskito dashboard → http://{host}:{port}") @@ -148,6 +230,64 @@ def serve_dashboard( def _make_handler(queue: Queue) -> type: """Create a request handler class bound to the given queue.""" + # ── Routing tables ──────────────────────────────────────────── + # + # Exact-match routes: path → handler(queue, qs) → JSON data + get_routes: dict[str, Any] = { + "/api/stats": lambda q, qs: q.stats(), + "/api/jobs": _handle_list_jobs, + "/api/dead-letters": _handle_dead_letters, + "/api/metrics": _handle_metrics, + "/api/metrics/timeseries": _handle_metrics_timeseries, + "/api/logs": _handle_logs, + "/api/circuit-breakers": lambda q, qs: q.circuit_breakers(), + "/api/workers": lambda q, qs: q.workers(), + "/api/resources": lambda q, qs: q.resource_status(), + "/api/proxy-stats": lambda q, qs: q.proxy_stats(), + "/api/interception-stats": lambda q, qs: q.interception_stats(), + "/api/queues/paused": lambda q, qs: q.paused_queues(), + "/api/stats/queues": _handle_stats_queues, + "/api/scaler": lambda q, qs: build_scaler_response( + q, queue_name=qs.get("queue", [None])[0] + ), + } + + # Parameterized routes: regex → handler(queue, qs, captured_id) → JSON data + # Order matters — more specific patterns first. + get_param_routes = [ + (re.compile(r"^/api/jobs/([^/]+)/errors$"), lambda q, qs, jid: q.job_errors(jid)), + (re.compile(r"^/api/jobs/([^/]+)/logs$"), lambda q, qs, jid: q.task_logs(jid)), + ( + re.compile(r"^/api/jobs/([^/]+)/replay-history$"), + lambda q, qs, jid: q.replay_history(jid), + ), + (re.compile(r"^/api/jobs/([^/]+)/dag$"), lambda q, qs, jid: q.job_dag(jid)), + (re.compile(r"^/api/jobs/([^/]+)$"), _handle_get_job), + ] + + # POST exact-match routes: path → handler(queue) → JSON data + post_routes: dict[str, Any] = { + "/api/dead-letters/purge": lambda q: {"purged": q.purge_dead(0)}, + } + + # POST parameterized routes: regex → handler(queue, captured_id) → JSON data + post_param_routes = [ + ( + re.compile(r"^/api/jobs/([^/]+)/cancel$"), + lambda q, jid: {"cancelled": q.cancel_job(jid)}, + ), + (re.compile(r"^/api/jobs/([^/]+)/replay$"), _handle_replay_post), + ( + re.compile(r"^/api/dead-letters/([^/]+)/retry$"), + lambda q, did: {"new_job_id": q.retry_dead(did)}, + ), + (re.compile(r"^/api/queues/([^/]+)/pause$"), lambda q, n: (q.pause(n), {"paused": n})[1]), + ( + re.compile(r"^/api/queues/([^/]+)/resume$"), + lambda q, n: (q.resume(n), {"resumed": n})[1], + ), + ] + class DashboardHandler(BaseHTTPRequestHandler): def do_GET(self) -> None: try: @@ -163,66 +303,31 @@ def _handle_get(self) -> None: path = parsed.path qs = parse_qs(parsed.query) - if path == "/api/stats": - self._json_response(queue.stats()) - elif path == "/api/jobs": - self._handle_list_jobs(qs) - elif path.startswith("/api/jobs/") and path.endswith("/errors"): - job_id = _extract_path_segment(path, "/api/jobs/", "/errors") - self._json_response(queue.job_errors(job_id)) - elif path.startswith("/api/jobs/") and path.endswith("/logs"): - job_id = _extract_path_segment(path, "/api/jobs/", "/logs") - self._json_response(queue.task_logs(job_id)) - elif path.startswith("/api/jobs/") and path.endswith("/replay-history"): - job_id = _extract_path_segment(path, "/api/jobs/", "/replay-history") - self._json_response(queue.replay_history(job_id)) - elif path.startswith("/api/jobs/"): - job_id = _extract_path_segment(path, "/api/jobs/") - job = queue.get_job(job_id) - if job is None: - self._json_response({"error": "Job not found"}, status=404) - else: - self._json_response(job.to_dict()) - elif path == "/api/dead-letters": - limit = _parse_int_qs(qs, "limit", 20) - offset = _parse_int_qs(qs, "offset", 0) - if limit is None or offset is None: - self._json_response({"error": "limit and offset must be integers"}, status=400) - return - self._json_response(queue.dead_letters(limit=limit, offset=offset)) - elif path == "/api/metrics": - task = qs.get("task", [None])[0] - since = _parse_int_qs(qs, "since", 3600) - if since is None: - self._json_response({"error": "since must be an integer"}, status=400) - return - self._json_response(queue.metrics(task_name=task, since=since)) - elif path == "/api/logs": - task = qs.get("task", [None])[0] - level = qs.get("level", [None])[0] - since = _parse_int_qs(qs, "since", 3600) - limit = _parse_int_qs(qs, "limit", 100) - if since is None or limit is None: - self._json_response({"error": "since and limit must be integers"}, status=400) + # Exact-match API routes + handler = get_routes.get(path) + if handler: + try: + self._json_response(handler(queue, qs)) + except _BadRequest as e: + self._json_response({"error": e.message}, status=400) + except _NotFound as e: + self._json_response({"error": e.message}, status=404) + return + + # Parameterized API routes + for pattern, param_handler in get_param_routes: + m = pattern.match(path) + if m: + try: + self._json_response(param_handler(queue, qs, m.group(1))) + except _BadRequest as e: + self._json_response({"error": e.message}, status=400) + except _NotFound as e: + self._json_response({"error": e.message}, status=404) return - self._json_response( - queue.query_logs(task_name=task, level=level, since=since, limit=limit) - ) - elif path == "/api/circuit-breakers": - self._json_response(queue.circuit_breakers()) - elif path == "/api/workers": - self._json_response(queue.workers()) - elif path == "/api/resources": - self._json_response(queue.resource_status()) - elif path == "/api/proxy-stats": - self._json_response(queue.proxy_stats()) - elif path == "/api/interception-stats": - self._json_response(queue.interception_stats()) - elif path == "/api/queues/paused": - self._json_response(queue.paused_queues()) - elif path == "/metrics": - self._serve_prometheus_metrics() - elif path == "/health": + + # Non-JSON routes + if path == "/health": from taskito.health import check_health self._json_response(check_health()) @@ -230,28 +335,8 @@ def _handle_get(self) -> None: from taskito.health import check_readiness self._json_response(check_readiness(queue)) - elif path == "/api/stats/queues": - q_name = qs.get("queue", [None])[0] - if q_name: - self._json_response(queue.stats_by_queue(q_name)) - else: - self._json_response(queue.stats_all_queues()) - elif path.startswith("/api/jobs/") and path.endswith("/dag"): - job_id = _extract_path_segment(path, "/api/jobs/", "/dag") - self._json_response(queue.job_dag(job_id)) - elif path == "/api/metrics/timeseries": - task = qs.get("task", [None])[0] - since = _parse_int_qs(qs, "since", 3600) - bucket = _parse_int_qs(qs, "bucket", 60) - if since is None or bucket is None: - self._json_response({"error": "since and bucket must be integers"}, status=400) - return - self._json_response( - queue.metrics_timeseries(task_name=task, since=since, bucket=bucket) - ) - elif path == "/api/scaler": - q_name = qs.get("queue", [None])[0] - self._json_response(build_scaler_response(queue, queue_name=q_name)) + elif path == "/metrics": + self._serve_prometheus_metrics() else: self._serve_spa() @@ -265,75 +350,22 @@ def do_POST(self) -> None: self._json_response({"error": "Internal server error"}, status=500) def _handle_post(self) -> None: - parsed = urlparse(self.path) - path = parsed.path + path = urlparse(self.path).path - if path.startswith("/api/jobs/") and path.endswith("/cancel"): - job_id = _extract_path_segment(path, "/api/jobs/", "/cancel") - ok = queue.cancel_job(job_id) - self._json_response({"cancelled": ok}) - elif path.startswith("/api/dead-letters/") and path.endswith("/retry"): - dead_id = _extract_path_segment(path, "/api/dead-letters/", "/retry") - new_id = queue.retry_dead(dead_id) - self._json_response({"new_job_id": new_id}) - elif path == "/api/dead-letters/purge": - count = queue.purge_dead(0) - self._json_response({"purged": count}) - elif path.startswith("/api/jobs/") and path.endswith("/replay"): - job_id = _extract_path_segment(path, "/api/jobs/", "/replay") - result = queue.replay(job_id) - self._json_response({"replay_job_id": result.id}) - elif path.startswith("/api/queues/") and path.endswith("/pause"): - name = _extract_path_segment(path, "/api/queues/", "/pause") - queue.pause(name) - self._json_response({"paused": name}) - elif path.startswith("/api/queues/") and path.endswith("/resume"): - name = _extract_path_segment(path, "/api/queues/", "/resume") - queue.resume(name) - self._json_response({"resumed": name}) - else: - self._json_response({"error": "Not found"}, status=404) - - def _handle_list_jobs(self, qs: dict) -> None: - status = qs.get("status", [None])[0] - q = qs.get("queue", [None])[0] - task = qs.get("task", [None])[0] - metadata_like = qs.get("metadata", [None])[0] - error_like = qs.get("error", [None])[0] - created_after = qs.get("created_after", [None])[0] - created_before = qs.get("created_before", [None])[0] - limit = _parse_int_qs(qs, "limit", 20) - offset = _parse_int_qs(qs, "offset", 0) - if limit is None or offset is None: - self._json_response({"error": "limit and offset must be integers"}, status=400) + # Exact-match POST routes + handler = post_routes.get(path) + if handler: + self._json_response(handler(queue)) return - # Use filtered listing if any advanced filters are provided - if any( - x is not None for x in [metadata_like, error_like, created_after, created_before] - ): - ca = int(created_after) if created_after else None - cb = int(created_before) if created_before else None - jobs = queue.list_jobs_filtered( - status=status, - queue=q, - task_name=task, - metadata_like=metadata_like, - error_like=error_like, - created_after=ca, - created_before=cb, - limit=limit, - offset=offset, - ) - else: - jobs = queue.list_jobs( - status=status, - queue=q, - task_name=task, - limit=limit, - offset=offset, - ) - self._json_response([j.to_dict() for j in jobs]) + # Parameterized POST routes + for pattern, param_handler in post_param_routes: + m = pattern.match(path) + if m: + self._json_response(param_handler(queue, m.group(1))) + return + + self._json_response({"error": "Not found"}, status=404) def _json_response(self, data: Any, status: int = 200) -> None: body = json.dumps(data, default=str).encode()