Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions clients/python/src/taskbroker_client/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def __init__(
result_queue_maxsize: int = DEFAULT_WORKER_QUEUE_SIZE,
rebalance_after: int = DEFAULT_REBALANCE_AFTER,
processing_pool_name: str | None = None,
pod_name: str | None = None,
process_type: str = "spawn",
health_check_file_path: str | None = None,
health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH,
Comment on lines +130 to 133
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The TaskWorker class does not accept a pod_name parameter, causing queue size metrics to be incorrectly tagged with pod_name="unknown".
Severity: MEDIUM

Suggested Fix

Update the TaskWorker.__init__ method to accept a pod_name parameter and pass it to the TaskWorkerProcessingPool constructor, similar to how PushTaskWorker handles it.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: clients/python/src/taskbroker_client/worker/worker.py#L130-L133

Potential issue: The `TaskWorker` class, used for pull-mode workers, does not accept a
`pod_name` parameter in its `__init__` method. Consequently, it cannot pass the pod name
when it instantiates `TaskWorkerProcessingPool`. The `TaskWorkerProcessingPool` then
defaults the `pod_name` to `"unknown"`. This results in all queue size metrics for
pull-mode workers being tagged with `pod_name="unknown"`, making it difficult to monitor
queue sizes on a per-pod basis for this worker type.

Did we get this right? 👍 / 👎 to inform future reviews.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't care about pull workers.

Expand All @@ -152,6 +153,7 @@ def __init__(
child_tasks_queue_maxsize=child_tasks_queue_maxsize,
result_queue_maxsize=result_queue_maxsize,
processing_pool_name=processing_pool_name,
pod_name=pod_name,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskWorker missing pod_name parameter for metrics tagging

Medium Severity

The new pod_name parameter was added to PushTaskWorker and plumbed through to TaskWorkerProcessingPool, but the TaskWorker class (PULL mode) was not updated. Since TaskWorkerProcessingPool.result_thread now emits gauge metrics tagged with pod_name, all PULL-mode workers will always report pod_name="unknown", defeating the PR's goal of per-pod metric visibility for those workers.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 8d8f444. Configure here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't care about pull workers.

process_type=process_type,
)

Expand Down Expand Up @@ -512,10 +514,12 @@ def __init__(
child_tasks_queue_maxsize: int = DEFAULT_WORKER_QUEUE_SIZE,
result_queue_maxsize: int = DEFAULT_WORKER_QUEUE_SIZE,
processing_pool_name: str | None = None,
pod_name: str | None = None,
process_type: str = "spawn",
) -> None:
self._concurrency = concurrency
self._processing_pool_name = processing_pool_name or "unknown"
self._pod_name = pod_name or "unknown"
self._send_result = send_result_fn
self._max_child_task_count = max_child_task_count
self._app_module = app_module
Expand Down Expand Up @@ -571,6 +575,30 @@ def result_thread() -> None:
iopool = ThreadPoolExecutor(max_workers=self._concurrency)
with iopool as executor:
while not self._shutdown_event.is_set():
tags = {
"processing_pool": self._processing_pool_name,
"pod_name": self._pod_name,
}

try:
# 'qsize' is not implemented on all platforms, such as macOS
self._metrics.gauge(
"taskworker.child_tasks.size",
float(self._child_tasks.qsize()),
tags=tags,
)

self._metrics.gauge(
"taskworker.processed_tasks.size",
float(self._processed_tasks.qsize()),
tags=tags,
)
except Exception as e:
logger.debug(
"taskworker.worker.queue_gauges.error",
extra={"error": e, "processing_pool": self._processing_pool_name},
)

try:
result = self._processed_tasks.get(timeout=1.0)
executor.submit(self.send_result, result, False)
Expand Down Expand Up @@ -632,12 +660,6 @@ def push_task(self, inflight: InflightTaskActivation, timeout: float | None = No
set (e.g. 5.0), waits at most that many seconds and returns `False` if the
queue is still full (worker busy).
"""
try:
self._metrics.gauge("taskworker.child_tasks.size", self._child_tasks.qsize())
except Exception as e:
# 'qsize' does not work on macOS
logger.debug("taskworker.child_tasks.size.error", extra={"error": e})

start_time = time.monotonic()
try:
self._child_tasks.put(inflight, timeout=timeout)
Expand Down
Loading