Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions py_src/taskito/async_support/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,65 @@ class AsyncQueueMixin:
def stats(self) -> dict[str, int]: ...
def stats_by_queue(self, queue_name: str) -> dict[str, int]: ...
def stats_all_queues(self) -> dict[str, dict[str, int]]: ...
def get_job(self, job_id: str) -> JobResult | None: ...
def list_jobs(
self,
status: str | None = ...,
queue: str | None = ...,
task_name: str | None = ...,
limit: int = ...,
offset: int = ...,
namespace: Any = ...,
) -> list[JobResult]: ...
def list_jobs_filtered(
self,
status: str | None = ...,
queue: str | None = ...,
task_name: str | None = ...,
metadata_like: str | None = ...,
error_like: str | None = ...,
created_after: int | None = ...,
created_before: int | None = ...,
limit: int = ...,
offset: int = ...,
namespace: Any = ...,
) -> list[JobResult]: ...
def cancel_job(self, job_id: str) -> bool: ...
def cancel_running_job(self, job_id: str) -> bool: ...
def metrics(self, task_name: str | None = ..., since: int = ...) -> dict[str, Any]: ...
def metrics_timeseries(
self,
task_name: str | None = ...,
since: int = ...,
interval: int = ...,
) -> list[dict]: ...
def job_dag(self, job_id: str) -> dict[str, Any]: ...
def job_errors(self, job_id: str) -> list[dict]: ...
def task_logs(self, job_id: str) -> list[dict]: ...
def query_logs(
self,
task_name: str | None = ...,
level: str | None = ...,
since: int = ...,
limit: int = ...,
) -> list[dict]: ...
def dead_letters(self, limit: int = ..., offset: int = ...) -> list[dict]: ...
def retry_dead(self, dead_id: str) -> str: ...
def replay(self, job_id: str) -> JobResult: ...
def replay_history(self, job_id: str) -> list[dict]: ...
def circuit_breakers(self) -> list[dict]: ...
def workers(self) -> list[dict]: ...
def run_worker(
self, queues: Sequence[str] | None = ..., tags: list[str] | None = ...
) -> None: ...
def purge_completed(self, older_than: int = ...) -> int: ...
def purge_dead(self, older_than: int = ...) -> int: ...
def revoke_task(self, task_name: str) -> int: ...
def archive(self, older_than: int = ...) -> int: ...
def list_archived(self, limit: int = ..., offset: int = ...) -> list[JobResult]: ...
def pause(self, queue_name: str = ...) -> None: ...
def resume(self, queue_name: str = ...) -> None: ...
def paused_queues(self) -> list[str]: ...
def resource_status(self) -> list[dict[str, Any]]: ...

# -----------------------------------------------------------------
Expand Down Expand Up @@ -98,10 +147,88 @@ async def acircuit_breakers(self) -> list[dict]:
"""Async version of :meth:`circuit_breakers`."""
return await self._run_sync(self.circuit_breakers)

async def aget_job(self, job_id: str) -> JobResult | None:
"""Async version of :meth:`get_job`."""
return await self._run_sync(self.get_job, job_id)

async def alist_jobs(self, **kwargs: Any) -> list[JobResult]:
"""Async version of :meth:`list_jobs`."""
return await self._run_sync(self.list_jobs, **kwargs)

async def alist_jobs_filtered(self, **kwargs: Any) -> list[JobResult]:
"""Async version of :meth:`list_jobs_filtered`."""
return await self._run_sync(self.list_jobs_filtered, **kwargs)

async def ajob_dag(self, job_id: str) -> dict[str, Any]:
"""Async version of :meth:`job_dag`."""
return await self._run_sync(self.job_dag, job_id)

async def ametrics_timeseries(self, **kwargs: Any) -> list[dict]:
"""Async version of :meth:`metrics_timeseries`."""
return await self._run_sync(self.metrics_timeseries, **kwargs)

async def ajob_errors(self, job_id: str) -> list[dict]:
"""Async version of :meth:`job_errors`."""
return await self._run_sync(self.job_errors, job_id)

async def atask_logs(self, job_id: str) -> list[dict]:
"""Async version of :meth:`task_logs`."""
return await self._run_sync(self.task_logs, job_id)

async def aquery_logs(self, **kwargs: Any) -> list[dict]:
"""Async version of :meth:`query_logs`."""
return await self._run_sync(self.query_logs, **kwargs)

async def acancel_running_job(self, job_id: str) -> bool:
"""Async version of :meth:`cancel_running_job`."""
return await self._run_sync(self.cancel_running_job, job_id)

async def aworkers(self) -> list[dict]:
"""Async version of :meth:`workers`."""
return await self._run_sync(self.workers)

# -- Operations --

async def aenqueue_many(self, **kwargs: Any) -> list[JobResult]:
"""Async version of :meth:`enqueue_many`."""
return await self._run_sync(self.enqueue_many, **kwargs) # type: ignore[attr-defined]

async def apurge_completed(self, older_than: int = 86400) -> int:
"""Async version of :meth:`purge_completed`."""
return await self._run_sync(self.purge_completed, older_than=older_than)

async def apurge_dead(self, older_than: int = 86400) -> int:
"""Async version of :meth:`purge_dead`."""
return await self._run_sync(self.purge_dead, older_than=older_than)

async def arevoke_task(self, task_name: str) -> int:
"""Async version of :meth:`revoke_task`."""
return await self._run_sync(self.revoke_task, task_name)

async def areplay_history(self, job_id: str) -> list[dict]:
"""Async version of :meth:`replay_history`."""
return await self._run_sync(self.replay_history, job_id)

async def aarchive(self, older_than: int = 86400) -> int:
"""Async version of :meth:`archive`."""
return await self._run_sync(self.archive, older_than=older_than)

async def alist_archived(self, limit: int = 50, offset: int = 0) -> list[JobResult]:
"""Async version of :meth:`list_archived`."""
return await self._run_sync(self.list_archived, limit=limit, offset=offset)

async def apause(self, queue_name: str = "default") -> None:
"""Async version of :meth:`pause`."""
await self._run_sync(self.pause, queue_name=queue_name)

async def aresume(self, queue_name: str = "default") -> None:
"""Async version of :meth:`resume`."""
await self._run_sync(self.resume, queue_name=queue_name)

async def apaused_queues(self) -> list[str]:
"""Async version of :meth:`paused_queues`."""
return await self._run_sync(self.paused_queues)

# -- Locks --

def alock(
Expand Down
14 changes: 12 additions & 2 deletions py_src/taskito/prefork/child.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import base64
import importlib
import json
import logging
import os
import sys
import time
Expand All @@ -22,6 +23,8 @@
from taskito.async_support.helpers import run_maybe_async
from taskito.exceptions import TaskCancelledError

logger = logging.getLogger("taskito.prefork.child")


def _import_queue(app_path: str) -> Any:
"""Import and return the Queue instance from a dotted path like 'myapp:queue'."""
Expand Down Expand Up @@ -50,6 +53,7 @@ def _execute_job(
retry_count = job.get("retry_count", 0)
max_retries = job.get("max_retries", 3)

logger.debug("executing %s[%s]", task_name, job_id)
wrapper = queue._task_registry.get(task_name)
if wrapper is None:
return {
Expand Down Expand Up @@ -101,6 +105,7 @@ def _execute_job(
except Exception:
wall_time_ns = time.monotonic_ns() - start_ns
error_msg = traceback.format_exc()
logger.error("task %s[%s] failed: %s", task_name, job_id, error_msg.splitlines()[-1])

# Check retry filters
should_retry = True
Expand Down Expand Up @@ -160,6 +165,7 @@ def main() -> None:

# Signal readiness
_write_message({"type": "ready"})
logger.info("child ready (app=%s, pid=%d)", app_path, os.getpid())

# Main loop: read jobs from stdin, execute, write results to stdout
try:
Expand All @@ -171,16 +177,20 @@ def main() -> None:
msg = json.loads(line)

if msg.get("type") == "shutdown":
logger.info("shutdown received")
break

if msg.get("type") == "job":
result = _execute_job(queue, msg)
_write_message(result)

except (BrokenPipeError, EOFError, KeyboardInterrupt):
pass
logger.debug("child pipe closed or interrupted")

finally:
# Teardown resources
if runtime is not None:
runtime.teardown()
try:
runtime.teardown()
except Exception:
logger.warning("resource teardown error", exc_info=True)
6 changes: 6 additions & 0 deletions py_src/taskito/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
import json
import logging
import time
from collections.abc import AsyncIterator, Iterator
from typing import TYPE_CHECKING, Any
Expand All @@ -20,6 +21,8 @@
from taskito._taskito import PyJob
from taskito.app import Queue

log = logging.getLogger("taskito.result")


class JobResult(AsyncJobResultMixin):
"""
Expand Down Expand Up @@ -195,6 +198,7 @@ def stream(
try:
yield json.loads(extra)
except (json.JSONDecodeError, TypeError):
log.warning("failed to deserialize partial result for job %s", self.id)
yield extra

self.refresh()
Expand Down Expand Up @@ -237,6 +241,7 @@ async def astream(
try:
yield json.loads(extra)
except (json.JSONDecodeError, TypeError):
log.warning("failed to deserialize partial result for job %s", self.id)
yield extra

self.refresh()
Expand Down Expand Up @@ -270,6 +275,7 @@ def to_dict(self) -> dict[str, Any]:
"timeout_ms": self._py_job.timeout_ms,
"unique_key": self._py_job.unique_key,
"metadata": self._py_job.metadata,
"namespace": self._py_job.namespace,
}

def __repr__(self) -> str:
Expand Down
1 change: 1 addition & 0 deletions tests/python/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def dummy() -> None:
"timeout_ms",
"unique_key",
"metadata",
"namespace",
}
assert set(d.keys()) == expected_keys
assert d["status"] == "pending"
Expand Down
Loading