diff --git a/py_src/taskito/async_support/mixins.py b/py_src/taskito/async_support/mixins.py index 72126ca..7f6579d 100644 --- a/py_src/taskito/async_support/mixins.py +++ b/py_src/taskito/async_support/mixins.py @@ -34,16 +34,65 @@ class AsyncQueueMixin: def stats(self) -> dict[str, int]: ... def stats_by_queue(self, queue_name: str) -> dict[str, int]: ... def stats_all_queues(self) -> dict[str, dict[str, int]]: ... + def get_job(self, job_id: str) -> JobResult | None: ... + def list_jobs( + self, + status: str | None = ..., + queue: str | None = ..., + task_name: str | None = ..., + limit: int = ..., + offset: int = ..., + namespace: Any = ..., + ) -> list[JobResult]: ... + def list_jobs_filtered( + self, + status: str | None = ..., + queue: str | None = ..., + task_name: str | None = ..., + metadata_like: str | None = ..., + error_like: str | None = ..., + created_after: int | None = ..., + created_before: int | None = ..., + limit: int = ..., + offset: int = ..., + namespace: Any = ..., + ) -> list[JobResult]: ... def cancel_job(self, job_id: str) -> bool: ... + def cancel_running_job(self, job_id: str) -> bool: ... def metrics(self, task_name: str | None = ..., since: int = ...) -> dict[str, Any]: ... + def metrics_timeseries( + self, + task_name: str | None = ..., + since: int = ..., + interval: int = ..., + ) -> list[dict]: ... + def job_dag(self, job_id: str) -> dict[str, Any]: ... + def job_errors(self, job_id: str) -> list[dict]: ... + def task_logs(self, job_id: str) -> list[dict]: ... + def query_logs( + self, + task_name: str | None = ..., + level: str | None = ..., + since: int = ..., + limit: int = ..., + ) -> list[dict]: ... def dead_letters(self, limit: int = ..., offset: int = ...) -> list[dict]: ... def retry_dead(self, dead_id: str) -> str: ... def replay(self, job_id: str) -> JobResult: ... + def replay_history(self, job_id: str) -> list[dict]: ... def circuit_breakers(self) -> list[dict]: ... def workers(self) -> list[dict]: ... def run_worker( self, queues: Sequence[str] | None = ..., tags: list[str] | None = ... ) -> None: ... + def purge_completed(self, older_than: int = ...) -> int: ... + def purge_dead(self, older_than: int = ...) -> int: ... + def revoke_task(self, task_name: str) -> int: ... + def archive(self, older_than: int = ...) -> int: ... + def list_archived(self, limit: int = ..., offset: int = ...) -> list[JobResult]: ... + def pause(self, queue_name: str = ...) -> None: ... + def resume(self, queue_name: str = ...) -> None: ... + def paused_queues(self) -> list[str]: ... def resource_status(self) -> list[dict[str, Any]]: ... # ----------------------------------------------------------------- @@ -98,10 +147,88 @@ async def acircuit_breakers(self) -> list[dict]: """Async version of :meth:`circuit_breakers`.""" return await self._run_sync(self.circuit_breakers) + async def aget_job(self, job_id: str) -> JobResult | None: + """Async version of :meth:`get_job`.""" + return await self._run_sync(self.get_job, job_id) + + async def alist_jobs(self, **kwargs: Any) -> list[JobResult]: + """Async version of :meth:`list_jobs`.""" + return await self._run_sync(self.list_jobs, **kwargs) + + async def alist_jobs_filtered(self, **kwargs: Any) -> list[JobResult]: + """Async version of :meth:`list_jobs_filtered`.""" + return await self._run_sync(self.list_jobs_filtered, **kwargs) + + async def ajob_dag(self, job_id: str) -> dict[str, Any]: + """Async version of :meth:`job_dag`.""" + return await self._run_sync(self.job_dag, job_id) + + async def ametrics_timeseries(self, **kwargs: Any) -> list[dict]: + """Async version of :meth:`metrics_timeseries`.""" + return await self._run_sync(self.metrics_timeseries, **kwargs) + + async def ajob_errors(self, job_id: str) -> list[dict]: + """Async version of :meth:`job_errors`.""" + return await self._run_sync(self.job_errors, job_id) + + async def atask_logs(self, job_id: str) -> list[dict]: + """Async version of :meth:`task_logs`.""" + return await self._run_sync(self.task_logs, job_id) + + async def aquery_logs(self, **kwargs: Any) -> list[dict]: + """Async version of :meth:`query_logs`.""" + return await self._run_sync(self.query_logs, **kwargs) + + async def acancel_running_job(self, job_id: str) -> bool: + """Async version of :meth:`cancel_running_job`.""" + return await self._run_sync(self.cancel_running_job, job_id) + async def aworkers(self) -> list[dict]: """Async version of :meth:`workers`.""" return await self._run_sync(self.workers) + # -- Operations -- + + async def aenqueue_many(self, **kwargs: Any) -> list[JobResult]: + """Async version of :meth:`enqueue_many`.""" + return await self._run_sync(self.enqueue_many, **kwargs) # type: ignore[attr-defined] + + async def apurge_completed(self, older_than: int = 86400) -> int: + """Async version of :meth:`purge_completed`.""" + return await self._run_sync(self.purge_completed, older_than=older_than) + + async def apurge_dead(self, older_than: int = 86400) -> int: + """Async version of :meth:`purge_dead`.""" + return await self._run_sync(self.purge_dead, older_than=older_than) + + async def arevoke_task(self, task_name: str) -> int: + """Async version of :meth:`revoke_task`.""" + return await self._run_sync(self.revoke_task, task_name) + + async def areplay_history(self, job_id: str) -> list[dict]: + """Async version of :meth:`replay_history`.""" + return await self._run_sync(self.replay_history, job_id) + + async def aarchive(self, older_than: int = 86400) -> int: + """Async version of :meth:`archive`.""" + return await self._run_sync(self.archive, older_than=older_than) + + async def alist_archived(self, limit: int = 50, offset: int = 0) -> list[JobResult]: + """Async version of :meth:`list_archived`.""" + return await self._run_sync(self.list_archived, limit=limit, offset=offset) + + async def apause(self, queue_name: str = "default") -> None: + """Async version of :meth:`pause`.""" + await self._run_sync(self.pause, queue_name=queue_name) + + async def aresume(self, queue_name: str = "default") -> None: + """Async version of :meth:`resume`.""" + await self._run_sync(self.resume, queue_name=queue_name) + + async def apaused_queues(self) -> list[str]: + """Async version of :meth:`paused_queues`.""" + return await self._run_sync(self.paused_queues) + # -- Locks -- def alock( diff --git a/py_src/taskito/prefork/child.py b/py_src/taskito/prefork/child.py index cde0d9f..2492ff0 100644 --- a/py_src/taskito/prefork/child.py +++ b/py_src/taskito/prefork/child.py @@ -13,6 +13,7 @@ import base64 import importlib import json +import logging import os import sys import time @@ -22,6 +23,8 @@ from taskito.async_support.helpers import run_maybe_async from taskito.exceptions import TaskCancelledError +logger = logging.getLogger("taskito.prefork.child") + def _import_queue(app_path: str) -> Any: """Import and return the Queue instance from a dotted path like 'myapp:queue'.""" @@ -50,6 +53,7 @@ def _execute_job( retry_count = job.get("retry_count", 0) max_retries = job.get("max_retries", 3) + logger.debug("executing %s[%s]", task_name, job_id) wrapper = queue._task_registry.get(task_name) if wrapper is None: return { @@ -101,6 +105,7 @@ def _execute_job( except Exception: wall_time_ns = time.monotonic_ns() - start_ns error_msg = traceback.format_exc() + logger.error("task %s[%s] failed: %s", task_name, job_id, error_msg.splitlines()[-1]) # Check retry filters should_retry = True @@ -160,6 +165,7 @@ def main() -> None: # Signal readiness _write_message({"type": "ready"}) + logger.info("child ready (app=%s, pid=%d)", app_path, os.getpid()) # Main loop: read jobs from stdin, execute, write results to stdout try: @@ -171,6 +177,7 @@ def main() -> None: msg = json.loads(line) if msg.get("type") == "shutdown": + logger.info("shutdown received") break if msg.get("type") == "job": @@ -178,9 +185,12 @@ def main() -> None: _write_message(result) except (BrokenPipeError, EOFError, KeyboardInterrupt): - pass + logger.debug("child pipe closed or interrupted") finally: # Teardown resources if runtime is not None: - runtime.teardown() + try: + runtime.teardown() + except Exception: + logger.warning("resource teardown error", exc_info=True) diff --git a/py_src/taskito/result.py b/py_src/taskito/result.py index 33556cc..20b1089 100644 --- a/py_src/taskito/result.py +++ b/py_src/taskito/result.py @@ -4,6 +4,7 @@ import asyncio import json +import logging import time from collections.abc import AsyncIterator, Iterator from typing import TYPE_CHECKING, Any @@ -20,6 +21,8 @@ from taskito._taskito import PyJob from taskito.app import Queue +log = logging.getLogger("taskito.result") + class JobResult(AsyncJobResultMixin): """ @@ -195,6 +198,7 @@ def stream( try: yield json.loads(extra) except (json.JSONDecodeError, TypeError): + log.warning("failed to deserialize partial result for job %s", self.id) yield extra self.refresh() @@ -237,6 +241,7 @@ async def astream( try: yield json.loads(extra) except (json.JSONDecodeError, TypeError): + log.warning("failed to deserialize partial result for job %s", self.id) yield extra self.refresh() @@ -270,6 +275,7 @@ def to_dict(self) -> dict[str, Any]: "timeout_ms": self._py_job.timeout_ms, "unique_key": self._py_job.unique_key, "metadata": self._py_job.metadata, + "namespace": self._py_job.namespace, } def __repr__(self) -> str: diff --git a/tests/python/test_dashboard.py b/tests/python/test_dashboard.py index 81c1754..a62b98b 100644 --- a/tests/python/test_dashboard.py +++ b/tests/python/test_dashboard.py @@ -143,6 +143,7 @@ def dummy() -> None: "timeout_ms", "unique_key", "metadata", + "namespace", } assert set(d.keys()) == expected_keys assert d["status"] == "pending"