From b7f98fc4b782d4ed92a72d5fe44290cf899bed09 Mon Sep 17 00:00:00 2001 From: Gabe Villalobos Date: Mon, 13 Apr 2026 14:48:40 -0700 Subject: [PATCH 1/3] ref(eco): Adds retry_on_timeout, report_timeout_errors options to Retry and Task definitions respectively --- clients/python/src/taskbroker_client/retry.py | 15 ++++++++---- clients/python/src/taskbroker_client/task.py | 2 ++ .../taskbroker_client/worker/workerchild.py | 23 +++++++++++-------- clients/python/tests/test_retry.py | 18 +++++++-------- 4 files changed, 34 insertions(+), 24 deletions(-) diff --git a/clients/python/src/taskbroker_client/retry.py b/clients/python/src/taskbroker_client/retry.py index ca77d3b5..23f15ded 100644 --- a/clients/python/src/taskbroker_client/retry.py +++ b/clients/python/src/taskbroker_client/retry.py @@ -2,7 +2,6 @@ import logging from enum import Enum -from multiprocessing.context import TimeoutError from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( ON_ATTEMPTS_EXCEEDED_DEADLETTER, @@ -69,19 +68,21 @@ def __init__( ignore: tuple[type[BaseException], ...] | None = None, times_exceeded: LastAction = LastAction.Discard, delay: int | None = None, + retry_on_timeout: bool = True, ): self._times = times self._allowed_exception_types: tuple[type[BaseException], ...] = on or () self._denied_exception_types: tuple[type[BaseException], ...] = ignore or () self._times_exceeded = times_exceeded self._delay = delay + self._retry_on_timeout = retry_on_timeout def max_attempts_reached(self, state: RetryState) -> bool: # We subtract one, as attempts starts at 0, but `times` # starts at 1. return state.attempts >= (self._times - 1) - def should_retry(self, state: RetryState, exc: Exception) -> bool: + def should_retry(self, state: RetryState, exc: BaseException) -> bool: # If there are no retries remaining we should not retry if self.max_attempts_reached(state): return False @@ -94,9 +95,13 @@ def should_retry(self, state: RetryState, exc: Exception) -> bool: if isinstance(exc, self._denied_exception_types): return False - # In the retry allow list or processing deadline is exceeded - # When processing deadline is exceeded, the subprocess raises a TimeoutError - if isinstance(exc, (TimeoutError, self._allowed_exception_types)): + # Retry for allowed exception types + if isinstance(exc, self._allowed_exception_types): + return True + + from taskbroker_client.worker.workerchild import ProcessingDeadlineExceeded + + if self._retry_on_timeout and isinstance(exc, ProcessingDeadlineExceeded): return True return False diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index 5dfa9ead..da9e1988 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -51,6 +51,7 @@ def __init__( at_most_once: bool = False, wait_for_delivery: bool = False, compression_type: CompressionType = CompressionType.PLAINTEXT, + report_timeout_errors: bool = True, ): self.name = name self._func = func @@ -71,6 +72,7 @@ def __init__( self.at_most_once = at_most_once self.wait_for_delivery = wait_for_delivery self.compression_type = compression_type + self.report_timeout_errors = report_timeout_errors update_wrapper(self, func) @property diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 18e2ad68..f64502cf 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -229,14 +229,15 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: _execute_activation(task_func, inflight.activation, app.context_hooks) next_state = TASK_ACTIVATION_STATUS_COMPLETE except ProcessingDeadlineExceeded as err: - with sentry_sdk.isolation_scope() as scope: - scope.fingerprint = [ - "taskworker.processing_deadline_exceeded", - inflight.activation.namespace, - inflight.activation.taskname, - ] - scope.set_transaction_name(inflight.activation.taskname) - sentry_sdk.capture_exception(err) + if task_func.report_timeout_errors: + with sentry_sdk.isolation_scope() as scope: + scope.fingerprint = [ + "taskworker.processing_deadline_exceeded", + inflight.activation.namespace, + inflight.activation.taskname, + ] + scope.set_transaction_name(inflight.activation.taskname) + sentry_sdk.capture_exception(err) metrics.incr( "taskworker.worker.processing_deadline_exceeded", tags={ @@ -245,7 +246,11 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: "taskname": inflight.activation.taskname, }, ) - next_state = TASK_ACTIVATION_STATUS_FAILURE + retry = task_func.retry + if retry and retry.should_retry(inflight.activation.retry_state, err): + next_state = TASK_ACTIVATION_STATUS_RETRY + else: + next_state = TASK_ACTIVATION_STATUS_FAILURE except Exception as err: retry = task_func.retry captured_error = False diff --git a/clients/python/tests/test_retry.py b/clients/python/tests/test_retry.py index 624efa96..001e0a1b 100644 --- a/clients/python/tests/test_retry.py +++ b/clients/python/tests/test_retry.py @@ -1,13 +1,12 @@ from __future__ import annotations -from multiprocessing.context import TimeoutError - from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( ON_ATTEMPTS_EXCEEDED_DEADLETTER, ON_ATTEMPTS_EXCEEDED_DISCARD, ) from taskbroker_client.retry import LastAction, Retry, RetryTaskError +from taskbroker_client.worker.workerchild import ProcessingDeadlineExceeded class RuntimeChildError(RuntimeError): @@ -71,22 +70,21 @@ def test_should_retry_retryerror() -> None: assert not retry.should_retry(state, err) -def test_should_retry_multiprocessing_timeout() -> None: - retry = Retry(times=3) +def test_should_retry_processing_deadline_exceeded() -> None: + retry = Retry(times=3, retry_on_timeout=True) state = retry.initial_state() - timeout = TimeoutError("timeouts should retry if there are attempts left") - assert retry.should_retry(state, timeout) + deadline_exceeded = ProcessingDeadlineExceeded("processing deadline exceeded") + assert retry.should_retry(state, deadline_exceeded) state.attempts = 1 - assert retry.should_retry(state, timeout) + assert retry.should_retry(state, deadline_exceeded) - # attempt = 2 is actually the third attempt. state.attempts = 2 - assert not retry.should_retry(state, timeout) + assert not retry.should_retry(state, deadline_exceeded) state.attempts = 3 - assert not retry.should_retry(state, timeout) + assert not retry.should_retry(state, deadline_exceeded) def test_should_retry_error_allow_list() -> None: From 548242399cefd2faa1b9f5e92f12a7899c2253ef Mon Sep 17 00:00:00 2001 From: Gabe Villalobos Date: Mon, 13 Apr 2026 15:27:13 -0700 Subject: [PATCH 2/3] Removes special handling for processingDeadlineExceeded --- clients/python/src/taskbroker_client/retry.py | 5 ----- clients/python/tests/test_retry.py | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/clients/python/src/taskbroker_client/retry.py b/clients/python/src/taskbroker_client/retry.py index 23f15ded..1836b319 100644 --- a/clients/python/src/taskbroker_client/retry.py +++ b/clients/python/src/taskbroker_client/retry.py @@ -99,11 +99,6 @@ def should_retry(self, state: RetryState, exc: BaseException) -> bool: if isinstance(exc, self._allowed_exception_types): return True - from taskbroker_client.worker.workerchild import ProcessingDeadlineExceeded - - if self._retry_on_timeout and isinstance(exc, ProcessingDeadlineExceeded): - return True - return False def initial_state(self) -> RetryState: diff --git a/clients/python/tests/test_retry.py b/clients/python/tests/test_retry.py index 001e0a1b..11d9b8b6 100644 --- a/clients/python/tests/test_retry.py +++ b/clients/python/tests/test_retry.py @@ -71,7 +71,7 @@ def test_should_retry_retryerror() -> None: def test_should_retry_processing_deadline_exceeded() -> None: - retry = Retry(times=3, retry_on_timeout=True) + retry = Retry(times=3, on=(ProcessingDeadlineExceeded,)) state = retry.initial_state() deadline_exceeded = ProcessingDeadlineExceeded("processing deadline exceeded") From 91c9dfffb6dc8f335307eab69de8c2e53647f20e Mon Sep 17 00:00:00 2001 From: Gabe Villalobos Date: Mon, 20 Apr 2026 11:28:23 -0700 Subject: [PATCH 3/3] Updates retry logic to implicitly allow ProcessingDeadlineException as retriable exception --- clients/python/src/taskbroker_client/retry.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/python/src/taskbroker_client/retry.py b/clients/python/src/taskbroker_client/retry.py index 1836b319..213248d5 100644 --- a/clients/python/src/taskbroker_client/retry.py +++ b/clients/python/src/taskbroker_client/retry.py @@ -2,6 +2,7 @@ import logging from enum import Enum +from multiprocessing.context import TimeoutError from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( ON_ATTEMPTS_EXCEEDED_DEADLETTER, @@ -68,14 +69,12 @@ def __init__( ignore: tuple[type[BaseException], ...] | None = None, times_exceeded: LastAction = LastAction.Discard, delay: int | None = None, - retry_on_timeout: bool = True, ): self._times = times self._allowed_exception_types: tuple[type[BaseException], ...] = on or () self._denied_exception_types: tuple[type[BaseException], ...] = ignore or () self._times_exceeded = times_exceeded self._delay = delay - self._retry_on_timeout = retry_on_timeout def max_attempts_reached(self, state: RetryState) -> bool: # We subtract one, as attempts starts at 0, but `times` @@ -95,8 +94,9 @@ def should_retry(self, state: RetryState, exc: BaseException) -> bool: if isinstance(exc, self._denied_exception_types): return False - # Retry for allowed exception types - if isinstance(exc, self._allowed_exception_types): + # In the retry allow list or processing deadline is exceeded + # When processing deadline is exceeded, the subprocess raises a TimeoutError + if isinstance(exc, (TimeoutError, self._allowed_exception_types)): return True return False