diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index b4441f6a..12360032 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -127,6 +127,7 @@ def __init__( result_queue_maxsize: int = DEFAULT_WORKER_QUEUE_SIZE, rebalance_after: int = DEFAULT_REBALANCE_AFTER, processing_pool_name: str | None = None, + pod_name: str | None = None, process_type: str = "spawn", health_check_file_path: str | None = None, health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, @@ -152,6 +153,7 @@ def __init__( child_tasks_queue_maxsize=child_tasks_queue_maxsize, result_queue_maxsize=result_queue_maxsize, processing_pool_name=processing_pool_name, + pod_name=pod_name, process_type=process_type, ) @@ -512,10 +514,12 @@ def __init__( child_tasks_queue_maxsize: int = DEFAULT_WORKER_QUEUE_SIZE, result_queue_maxsize: int = DEFAULT_WORKER_QUEUE_SIZE, processing_pool_name: str | None = None, + pod_name: str | None = None, process_type: str = "spawn", ) -> None: self._concurrency = concurrency self._processing_pool_name = processing_pool_name or "unknown" + self._pod_name = pod_name or "unknown" self._send_result = send_result_fn self._max_child_task_count = max_child_task_count self._app_module = app_module @@ -571,6 +575,30 @@ def result_thread() -> None: iopool = ThreadPoolExecutor(max_workers=self._concurrency) with iopool as executor: while not self._shutdown_event.is_set(): + tags = { + "processing_pool": self._processing_pool_name, + "pod_name": self._pod_name, + } + + try: + # 'qsize' is not implemented on all platforms, such as macOS + self._metrics.gauge( + "taskworker.child_tasks.size", + float(self._child_tasks.qsize()), + tags=tags, + ) + + self._metrics.gauge( + "taskworker.processed_tasks.size", + float(self._processed_tasks.qsize()), + tags=tags, + ) + except Exception as e: + logger.debug( + "taskworker.worker.queue_gauges.error", + extra={"error": e, "processing_pool": self._processing_pool_name}, + ) + try: result = self._processed_tasks.get(timeout=1.0) executor.submit(self.send_result, result, False) @@ -632,12 +660,6 @@ def push_task(self, inflight: InflightTaskActivation, timeout: float | None = No set (e.g. 5.0), waits at most that many seconds and returns `False` if the queue is still full (worker busy). """ - try: - self._metrics.gauge("taskworker.child_tasks.size", self._child_tasks.qsize()) - except Exception as e: - # 'qsize' does not work on macOS - logger.debug("taskworker.child_tasks.size.error", extra={"error": e}) - start_time = time.monotonic() try: self._child_tasks.put(inflight, timeout=timeout)