diff --git a/clients/python/src/taskbroker_client/retry.py b/clients/python/src/taskbroker_client/retry.py index ca77d3b5..213248d5 100644 --- a/clients/python/src/taskbroker_client/retry.py +++ b/clients/python/src/taskbroker_client/retry.py @@ -81,7 +81,7 @@ def max_attempts_reached(self, state: RetryState) -> bool: # starts at 1. return state.attempts >= (self._times - 1) - def should_retry(self, state: RetryState, exc: Exception) -> bool: + def should_retry(self, state: RetryState, exc: BaseException) -> bool: # If there are no retries remaining we should not retry if self.max_attempts_reached(state): return False diff --git a/clients/python/src/taskbroker_client/task.py b/clients/python/src/taskbroker_client/task.py index 5dfa9ead..da9e1988 100644 --- a/clients/python/src/taskbroker_client/task.py +++ b/clients/python/src/taskbroker_client/task.py @@ -51,6 +51,7 @@ def __init__( at_most_once: bool = False, wait_for_delivery: bool = False, compression_type: CompressionType = CompressionType.PLAINTEXT, + report_timeout_errors: bool = True, ): self.name = name self._func = func @@ -71,6 +72,7 @@ def __init__( self.at_most_once = at_most_once self.wait_for_delivery = wait_for_delivery self.compression_type = compression_type + self.report_timeout_errors = report_timeout_errors update_wrapper(self, func) @property diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 18e2ad68..f64502cf 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -229,14 +229,15 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: _execute_activation(task_func, inflight.activation, app.context_hooks) next_state = TASK_ACTIVATION_STATUS_COMPLETE except ProcessingDeadlineExceeded as err: - with sentry_sdk.isolation_scope() as scope: - scope.fingerprint = [ - "taskworker.processing_deadline_exceeded", - inflight.activation.namespace, - inflight.activation.taskname, - ] - scope.set_transaction_name(inflight.activation.taskname) - sentry_sdk.capture_exception(err) + if task_func.report_timeout_errors: + with sentry_sdk.isolation_scope() as scope: + scope.fingerprint = [ + "taskworker.processing_deadline_exceeded", + inflight.activation.namespace, + inflight.activation.taskname, + ] + scope.set_transaction_name(inflight.activation.taskname) + sentry_sdk.capture_exception(err) metrics.incr( "taskworker.worker.processing_deadline_exceeded", tags={ @@ -245,7 +246,11 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: "taskname": inflight.activation.taskname, }, ) - next_state = TASK_ACTIVATION_STATUS_FAILURE + retry = task_func.retry + if retry and retry.should_retry(inflight.activation.retry_state, err): + next_state = TASK_ACTIVATION_STATUS_RETRY + else: + next_state = TASK_ACTIVATION_STATUS_FAILURE except Exception as err: retry = task_func.retry captured_error = False diff --git a/clients/python/tests/test_retry.py b/clients/python/tests/test_retry.py index 624efa96..11d9b8b6 100644 --- a/clients/python/tests/test_retry.py +++ b/clients/python/tests/test_retry.py @@ -1,13 +1,12 @@ from __future__ import annotations -from multiprocessing.context import TimeoutError - from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( ON_ATTEMPTS_EXCEEDED_DEADLETTER, ON_ATTEMPTS_EXCEEDED_DISCARD, ) from taskbroker_client.retry import LastAction, Retry, RetryTaskError +from taskbroker_client.worker.workerchild import ProcessingDeadlineExceeded class RuntimeChildError(RuntimeError): @@ -71,22 +70,21 @@ def test_should_retry_retryerror() -> None: assert not retry.should_retry(state, err) -def test_should_retry_multiprocessing_timeout() -> None: - retry = Retry(times=3) +def test_should_retry_processing_deadline_exceeded() -> None: + retry = Retry(times=3, on=(ProcessingDeadlineExceeded,)) state = retry.initial_state() - timeout = TimeoutError("timeouts should retry if there are attempts left") - assert retry.should_retry(state, timeout) + deadline_exceeded = ProcessingDeadlineExceeded("processing deadline exceeded") + assert retry.should_retry(state, deadline_exceeded) state.attempts = 1 - assert retry.should_retry(state, timeout) + assert retry.should_retry(state, deadline_exceeded) - # attempt = 2 is actually the third attempt. state.attempts = 2 - assert not retry.should_retry(state, timeout) + assert not retry.should_retry(state, deadline_exceeded) state.attempts = 3 - assert not retry.should_retry(state, timeout) + assert not retry.should_retry(state, deadline_exceeded) def test_should_retry_error_allow_list() -> None: