From 498c91f95d8e12d7659b7d3cd58fbb3e8cde744b Mon Sep 17 00:00:00 2001 From: Gabe Villalobos Date: Mon, 27 Apr 2026 15:27:23 -0700 Subject: [PATCH 1/6] Adds additional task parameter to silence swaths of exceptions --- .../python/src/taskbroker_client/registry.py | 9 +++++++ clients/python/src/taskbroker_client/task.py | 2 ++ .../taskbroker_client/worker/workerchild.py | 25 +++++++++++-------- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/clients/python/src/taskbroker_client/registry.py b/clients/python/src/taskbroker_client/registry.py index 686b1c5a..0c4d1e2d 100644 --- a/clients/python/src/taskbroker_client/registry.py +++ b/clients/python/src/taskbroker_client/registry.py @@ -43,6 +43,7 @@ def __init__( processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE, app_feature: str | None = None, context_hooks: list[ContextHook] | None = None, + report_timeout_errors: bool = True, ): self.name = name self.application = application @@ -88,6 +89,8 @@ def register( at_most_once: bool = False, wait_for_delivery: bool = False, compression_type: CompressionType = CompressionType.PLAINTEXT, + report_timeout_errors: bool = True, + exceptions_to_silence: tuple[type[BaseException], ...] | None = None, ) -> Callable[[Callable[P, R]], Task[P, R]]: """ Register a task. @@ -133,6 +136,8 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]: at_most_once=at_most_once, wait_for_delivery=wait_for_delivery, compression_type=compression_type, + report_timeout_errors=report_timeout_errors, + exceptions_to_silence=exceptions_to_silence, ) # TODO(taskworker) tasks should be registered into the registry # so that we can ensure task names are globally unique @@ -224,6 +229,8 @@ def register( at_most_once: bool = False, wait_for_delivery: bool = False, compression_type: CompressionType = CompressionType.PLAINTEXT, + report_timeout_errors: bool = True, + exceptions_to_silence: tuple[type[BaseException], ...] | None = None, ) -> Callable[[Callable[P, R]], ExternalTask[P, R]]: """ Register an external task stub. @@ -269,6 +276,8 @@ def wrapped(func: Callable[P, R]) -> ExternalTask[P, R]: at_most_once=at_most_once, wait_for_delivery=wait_for_delivery, compression_type=compression_type, + report_timeout_errors=report_timeout_errors, + exceptions_to_silence=exceptions_to_silence, ) self._registered_tasks[name] = task return task diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index d9493517..d536cc3d 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -67,6 +67,7 @@ def __init__( wait_for_delivery: bool = False, compression_type: CompressionType = CompressionType.PLAINTEXT, report_timeout_errors: bool = True, + exceptions_to_silence: tuple[type[BaseException], ...] | None = None, ): self.name = name self._func = func @@ -86,6 +87,7 @@ def __init__( self.wait_for_delivery = wait_for_delivery self.compression_type = compression_type self.report_timeout_errors = report_timeout_errors + self.exceptions_to_silence = exceptions_to_silence or () update_wrapper(self, func) @property diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index cf8162a9..2796bd82 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -280,17 +280,20 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: next_state = TASK_ACTIVATION_STATUS_RETRY elif retry.max_attempts_reached(inflight.activation.retry_state): with sentry_sdk.isolation_scope() as scope: - retry_error = NoRetriesRemainingError( - f"{inflight.activation.taskname} has consumed all of its retries" - ) - retry_error.__cause__ = err - scope.fingerprint = [ - "taskworker.no_retries_remaining", - inflight.activation.namespace, - inflight.activation.taskname, - ] - scope.set_transaction_name(inflight.activation.taskname) - sentry_sdk.capture_exception(retry_error) + if not isinstance(err, task_func.exceptions_to_silence): + retry_error = NoRetriesRemainingError( + f"{inflight.activation.taskname} has consumed all of its retries" + ) + retry_error.__cause__ = err + scope.fingerprint = [ + "taskworker.no_retries_remaining", + inflight.activation.namespace, + inflight.activation.taskname, + ] + scope.set_transaction_name(inflight.activation.taskname) + sentry_sdk.capture_exception(retry_error) + # In this branch, all exceptions should be either + # captured or silenced. captured_error = True if not captured_error and next_state != TASK_ACTIVATION_STATUS_RETRY: From f37ed8fd4d29cffc20e3a9a0c065f96ca9cfbb70 Mon Sep 17 00:00:00 2001 From: Gabe Villalobos Date: Mon, 27 Apr 2026 16:13:04 -0700 Subject: [PATCH 2/6] removes extraneous init param --- clients/python/src/taskbroker_client/registry.py | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/python/src/taskbroker_client/registry.py b/clients/python/src/taskbroker_client/registry.py index 0c4d1e2d..e47b0294 100644 --- a/clients/python/src/taskbroker_client/registry.py +++ b/clients/python/src/taskbroker_client/registry.py @@ -43,7 +43,6 @@ def __init__( processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE, app_feature: str | None = None, context_hooks: list[ContextHook] | None = None, - report_timeout_errors: bool = True, ): self.name = name self.application = application From 7f580b493621a30818ed90c09b3ccc5dae8868cb Mon Sep 17 00:00:00 2001 From: Gabe Villalobos Date: Tue, 28 Apr 2026 13:48:23 -0700 Subject: [PATCH 3/6] Addresses comment from cursorbot --- .../python/src/taskbroker_client/worker/workerchild.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 2796bd82..8e54f28c 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -266,6 +266,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: except Exception as err: retry = task_func.retry captured_error = False + should_capture_error = not isinstance(err, task_func.exceptions_to_silence) if retry: if retry.should_retry(inflight.activation.retry_state, err): logger.info( @@ -280,7 +281,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: next_state = TASK_ACTIVATION_STATUS_RETRY elif retry.max_attempts_reached(inflight.activation.retry_state): with sentry_sdk.isolation_scope() as scope: - if not isinstance(err, task_func.exceptions_to_silence): + if should_capture_error: retry_error = NoRetriesRemainingError( f"{inflight.activation.taskname} has consumed all of its retries" ) @@ -296,7 +297,11 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: # captured or silenced. captured_error = True - if not captured_error and next_state != TASK_ACTIVATION_STATUS_RETRY: + if ( + should_capture_error + and not captured_error + and next_state != TASK_ACTIVATION_STATUS_RETRY + ): sentry_sdk.capture_exception(err) clear_current_task() From 3e53af715ddfbf5dc837bd7e7bd51a4a13a274c9 Mon Sep 17 00:00:00 2001 From: Gabe Villalobos Date: Wed, 29 Apr 2026 11:33:22 -0700 Subject: [PATCH 4/6] Updates silence param name, adds dosctring comments for new parameters --- clients/python/src/taskbroker_client/registry.py | 16 ++++++++++++---- clients/python/src/taskbroker_client/task.py | 4 ++-- .../src/taskbroker_client/worker/workerchild.py | 2 +- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/clients/python/src/taskbroker_client/registry.py b/clients/python/src/taskbroker_client/registry.py index e47b0294..83a3fae6 100644 --- a/clients/python/src/taskbroker_client/registry.py +++ b/clients/python/src/taskbroker_client/registry.py @@ -89,7 +89,7 @@ def register( wait_for_delivery: bool = False, compression_type: CompressionType = CompressionType.PLAINTEXT, report_timeout_errors: bool = True, - exceptions_to_silence: tuple[type[BaseException], ...] | None = None, + expected_exceptions: tuple[type[BaseException], ...] | None = None, ) -> Callable[[Callable[P, R]], Task[P, R]]: """ Register a task. @@ -117,6 +117,10 @@ def register( before returning. compression_type: CompressionType The compression type to use to compress the task parameters. + report_timeout_errors: bool + Enable reporting of ProcessingDeadlineExceededError to Sentry. + expected_exceptions: tuple[type[BaseException], ...] | None + A tuple of exception types that will not be reported by Sentry. """ def wrapped(func: Callable[P, R]) -> Task[P, R]: @@ -136,7 +140,7 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]: wait_for_delivery=wait_for_delivery, compression_type=compression_type, report_timeout_errors=report_timeout_errors, - exceptions_to_silence=exceptions_to_silence, + expected_exceptions=expected_exceptions, ) # TODO(taskworker) tasks should be registered into the registry # so that we can ensure task names are globally unique @@ -229,7 +233,7 @@ def register( wait_for_delivery: bool = False, compression_type: CompressionType = CompressionType.PLAINTEXT, report_timeout_errors: bool = True, - exceptions_to_silence: tuple[type[BaseException], ...] | None = None, + expected_exceptions: tuple[type[BaseException], ...] | None = None, ) -> Callable[[Callable[P, R]], ExternalTask[P, R]]: """ Register an external task stub. @@ -257,6 +261,10 @@ def register( before returning. compression_type: CompressionType The compression type to use to compress the task parameters. + report_timeout_errors: bool + Enable reporting of ProcessingDeadlineExceededError to Sentry. + expected_exceptions: tuple[type[BaseException], ...] | None + A tuple of exception types that will not be reported by Sentry. """ def wrapped(func: Callable[P, R]) -> ExternalTask[P, R]: @@ -276,7 +284,7 @@ def wrapped(func: Callable[P, R]) -> ExternalTask[P, R]: wait_for_delivery=wait_for_delivery, compression_type=compression_type, report_timeout_errors=report_timeout_errors, - exceptions_to_silence=exceptions_to_silence, + expected_exceptions=expected_exceptions, ) self._registered_tasks[name] = task return task diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index d536cc3d..efbcd6fa 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -67,7 +67,7 @@ def __init__( wait_for_delivery: bool = False, compression_type: CompressionType = CompressionType.PLAINTEXT, report_timeout_errors: bool = True, - exceptions_to_silence: tuple[type[BaseException], ...] | None = None, + expected_exceptions: tuple[type[BaseException], ...] | None = None, ): self.name = name self._func = func @@ -87,7 +87,7 @@ def __init__( self.wait_for_delivery = wait_for_delivery self.compression_type = compression_type self.report_timeout_errors = report_timeout_errors - self.exceptions_to_silence = exceptions_to_silence or () + self.expected_exceptions = expected_exceptions or () update_wrapper(self, func) @property diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 8e54f28c..c9937352 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -266,7 +266,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: except Exception as err: retry = task_func.retry captured_error = False - should_capture_error = not isinstance(err, task_func.exceptions_to_silence) + should_capture_error = not isinstance(err, task_func.expected_exceptions) if retry: if retry.should_retry(inflight.activation.retry_state, err): logger.info( From a2800de006f327960e47499cf6ca309640d1f268 Mon Sep 17 00:00:00 2001 From: Gabe Villalobos Date: Thu, 30 Apr 2026 14:55:38 -0700 Subject: [PATCH 5/6] Adds task stubs, new example tasks --- clients/python/src/examples/tasks.py | 27 ++++++++++ clients/python/tests/worker/test_worker.py | 57 ++++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/clients/python/src/examples/tasks.py b/clients/python/src/examples/tasks.py index 86eb312a..28cf43c9 100644 --- a/clients/python/src/examples/tasks.py +++ b/clients/python/src/examples/tasks.py @@ -72,3 +72,30 @@ def will_retry(failure: str) -> None: def timed_task(sleep_seconds: float | str, *args: list[Any], **kwargs: dict[str, Any]) -> None: sleep(float(sleep_seconds)) logger.debug("timed_task complete") + + +@exampletasks.register( + name="examples.will_timeout_without_reporting", + processing_deadline_duration=1, + report_timeout_errors=False, +) +def will_timeout_without_reporting() -> None: + timed_task(sleep_seconds=2) + + +@exampletasks.register( + name="examples.will_fail_with_expected_exception", + retry=Retry(times=2, times_exceeded=LastAction.Discard), + expected_exceptions=(RuntimeError,), +) +def will_fail_with_expected_exception() -> None: + raise RuntimeError("oh no") + + +@exampletasks.register( + name="examples.will_fail_with_expected_ignored_exception", + retry=Retry(times=2, on=(RuntimeError,), times_exceeded=LastAction.Discard), + expected_exceptions=(RuntimeError,), +) +def will_fail_with_expected_ignored_exception() -> None: + raise RuntimeError("oh no") diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index 8ec90e50..db6e3827 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -193,6 +193,63 @@ ), ) +# Task with Retry logic, expected exceptions to silence reporting +RETRY_TASK_WITH_SILENCED_TIMEOUT = InflightTaskActivation( + host="localhost:50051", + receive_timestamp=0, + activation=TaskActivation( + id="654", + taskname="examples.will_timeout_without_reporting", + namespace="examples", + parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True), + processing_deadline_duration=1, + retry_state=RetryState( + # no more attempts left + attempts=1, + max_attempts=2, + on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD, + ), + ), +) + +# Task with Retry logic, expected exceptions to silence reporting +RETRY_TASK_WITH_EXPECTED_EXCEPTION = InflightTaskActivation( + host="localhost:50051", + receive_timestamp=0, + activation=TaskActivation( + id="654", + taskname="examples.will_fail_with_expected_exception", + namespace="examples", + parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True), + processing_deadline_duration=2, + retry_state=RetryState( + # One retry left + attempts=0, + max_attempts=2, + on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD, + ), + ), +) + +# Task set to retry on deadline exceeded exceptions +RETRY_TASK_WITH_EXPECTED_IGNORED_EXCEPTION = InflightTaskActivation( + host="localhost:50051", + receive_timestamp=0, + activation=TaskActivation( + id="654", + taskname="examples.will_fail_with_expected_ignored_exception", + namespace="examples", + parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True), + processing_deadline_duration=2, + retry_state=RetryState( + # no more attempts left + attempts=1, + max_attempts=2, + on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD, + ), + ), +) + class TestTaskWorker(TestCase): def test_fetch_task(self) -> None: From 24f66a697c269487a04811d98258a1564b0547a9 Mon Sep 17 00:00:00 2001 From: Gabe Villalobos Date: Thu, 30 Apr 2026 15:35:19 -0700 Subject: [PATCH 6/6] Adds tests to cover different cases --- clients/python/src/examples/tasks.py | 10 ++ clients/python/tests/worker/test_worker.py | 128 ++++++++++++++++++++- 2 files changed, 135 insertions(+), 3 deletions(-) diff --git a/clients/python/src/examples/tasks.py b/clients/python/src/examples/tasks.py index 28cf43c9..660f2213 100644 --- a/clients/python/src/examples/tasks.py +++ b/clients/python/src/examples/tasks.py @@ -13,6 +13,7 @@ from examples.app import app from taskbroker_client.retry import LastAction, NoRetriesRemainingError, Retry, RetryTaskError from taskbroker_client.retry import retry_task as retry_task_helper +from taskbroker_client.worker.workerchild import ProcessingDeadlineExceeded logger = logging.getLogger(__name__) @@ -99,3 +100,12 @@ def will_fail_with_expected_exception() -> None: ) def will_fail_with_expected_ignored_exception() -> None: raise RuntimeError("oh no") + + +@exampletasks.register( + name="examples.will_retry_on_deadline_exceeded", + processing_deadline_duration=1, + retry=Retry(times=2, on=(ProcessingDeadlineExceeded,), times_exceeded=LastAction.Discard), +) +def will_retry_on_deadline_exceeded() -> None: + timed_task(sleep_seconds=2) diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index db6e3827..24adf5c6 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -213,7 +213,7 @@ ) # Task with Retry logic, expected exceptions to silence reporting -RETRY_TASK_WITH_EXPECTED_EXCEPTION = InflightTaskActivation( +RETRY_TASK_WITH_EXPECTED_UNHANDLED_EXCEPTION = InflightTaskActivation( host="localhost:50051", receive_timestamp=0, activation=TaskActivation( @@ -242,8 +242,27 @@ parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True), processing_deadline_duration=2, retry_state=RetryState( - # no more attempts left - attempts=1, + # One retry left + attempts=0, + max_attempts=2, + on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD, + ), + ), +) + +# Task set to retry on deadline exceeded exceptions +RETRY_TASK_ON_DEADLINE_EXCEEDED = InflightTaskActivation( + host="localhost:50051", + receive_timestamp=0, + activation=TaskActivation( + id="654", + taskname="examples.will_retry_on_deadline_exceeded", + namespace="examples", + parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True), + processing_deadline_duration=1, + retry_state=RetryState( + # One retry left + attempts=0, max_attempts=2, on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD, ), @@ -958,3 +977,106 @@ def on_execute(self, headers: dict[str, str]) -> contextlib.AbstractContextManag assert executed_headers[0]["x-viewer-user"] == "7" finally: app.context_hooks.remove(hook) + + +@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") +def test_child_process_silenced_timeout(mock_capture: mock.Mock) -> None: + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + todo.put(RETRY_TASK_WITH_SILENCED_TIMEOUT) + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + ) + + assert todo.empty() + result = processed.get() + assert result.task_id == RETRY_TASK_WITH_SILENCED_TIMEOUT.activation.id + assert result.status == TASK_ACTIVATION_STATUS_FAILURE + assert mock_capture.call_count == 0 + + +@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") +def test_child_process_expected_exception_with_retries(mock_capture: mock.Mock) -> None: + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + todo.put(RETRY_TASK_WITH_EXPECTED_UNHANDLED_EXCEPTION) + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + ) + + assert todo.empty() + result = processed.get() + assert result.task_id == RETRY_TASK_WITH_EXPECTED_UNHANDLED_EXCEPTION.activation.id + + # No reporting, but the task still raised an unhandled exception + assert result.status == TASK_ACTIVATION_STATUS_FAILURE + assert mock_capture.call_count == 0 + + +@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") +def test_child_process_expected_ignored_exception_max_attempts(mock_capture: mock.Mock) -> None: + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + # Task has more retries left, but is set to ignore the raised error type + todo.put(RETRY_TASK_WITH_EXPECTED_IGNORED_EXCEPTION) + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + ) + + # No reporting, but exception type is retriable + assert todo.empty() + result = processed.get() + assert result.task_id == RETRY_TASK_WITH_EXPECTED_IGNORED_EXCEPTION.activation.id + assert result.status == TASK_ACTIVATION_STATUS_RETRY + assert mock_capture.call_count == 0 + + +@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") +def test_child_process_retry_on_deadline_exceeded(mock_capture: mock.Mock) -> None: + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + # Task will timeout, but should retry, because ProcessingDeadlineExceeded is + # in the Retry.on list + todo.put(RETRY_TASK_ON_DEADLINE_EXCEEDED) + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + ) + + assert todo.empty() + result = processed.get() + assert result.task_id == RETRY_TASK_ON_DEADLINE_EXCEEDED.activation.id + assert result.status == TASK_ACTIVATION_STATUS_RETRY + assert mock_capture.call_count == 1 + assert type(mock_capture.call_args.args[0]) is ProcessingDeadlineExceeded