Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/python/src/taskbroker_client/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def max_attempts_reached(self, state: RetryState) -> bool:
# starts at 1.
return state.attempts >= (self._times - 1)

def should_retry(self, state: RetryState, exc: Exception) -> bool:
def should_retry(self, state: RetryState, exc: BaseException) -> bool:
# If there are no retries remaining we should not retry
if self.max_attempts_reached(state):
return False
Comment on lines 81 to 87
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The retry logic does not implicitly handle ProcessingDeadlineExceeded as intended. It requires an explicit on=(ProcessingDeadlineExceeded,) configuration, causing tasks to fail instead of retrying on timeout.
Severity: HIGH

Suggested Fix

Update the should_retry method in retry.py to also check for isinstance(exc, ProcessingDeadlineExceeded). This will make the exception implicitly retriable, aligning the code's behavior with the pull request's description. Also, update the stale comment that incorrectly refers to TimeoutError being raised.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: clients/python/src/taskbroker_client/retry.py#L81-L87

Potential issue: The pull request's stated goal is to make `ProcessingDeadlineExceeded`
exceptions implicitly retriable. However, the implementation does not achieve this. The
`should_retry` method checks for `TimeoutError` or exceptions explicitly passed in the
`on` parameter of the `Retry` object. Since `ProcessingDeadlineExceeded` does not
inherit from `TimeoutError`, it is not retried by default. Tasks that exceed their
processing deadline will fail immediately instead of being retried, contrary to the
feature's intent. This requires users to explicitly opt-in via
`retry=Retry(on=(ProcessingDeadlineExceeded,))`, which contradicts the documented
behavior.

Did we get this right? 👍 / 👎 to inform future reviews.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's what we want now.

Expand Down
2 changes: 2 additions & 0 deletions clients/python/src/taskbroker_client/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(
at_most_once: bool = False,
wait_for_delivery: bool = False,
compression_type: CompressionType = CompressionType.PLAINTEXT,
report_timeout_errors: bool = True,
):
self.name = name
self._func = func
Expand All @@ -71,6 +72,7 @@ def __init__(
self.at_most_once = at_most_once
self.wait_for_delivery = wait_for_delivery
self.compression_type = compression_type
self.report_timeout_errors = report_timeout_errors
update_wrapper(self, func)

@property
Expand Down
23 changes: 14 additions & 9 deletions clients/python/src/taskbroker_client/worker/workerchild.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,15 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
_execute_activation(task_func, inflight.activation, app.context_hooks)
next_state = TASK_ACTIVATION_STATUS_COMPLETE
except ProcessingDeadlineExceeded as err:
with sentry_sdk.isolation_scope() as scope:
scope.fingerprint = [
"taskworker.processing_deadline_exceeded",
inflight.activation.namespace,
inflight.activation.taskname,
]
scope.set_transaction_name(inflight.activation.taskname)
sentry_sdk.capture_exception(err)
if task_func.report_timeout_errors:
with sentry_sdk.isolation_scope() as scope:
scope.fingerprint = [
"taskworker.processing_deadline_exceeded",
inflight.activation.namespace,
inflight.activation.taskname,
]
scope.set_transaction_name(inflight.activation.taskname)
sentry_sdk.capture_exception(err)
metrics.incr(
"taskworker.worker.processing_deadline_exceeded",
tags={
Expand All @@ -245,7 +246,11 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
"taskname": inflight.activation.taskname,
},
)
next_state = TASK_ACTIVATION_STATUS_FAILURE
retry = task_func.retry
if retry and retry.should_retry(inflight.activation.retry_state, err):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you have tasks configured to do retries on ProcessingDeadlineExceeded?

Copy link
Copy Markdown
Member Author

@GabeVillalobos GabeVillalobos Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use case for us would be: we try a webhook dispatch, it fails with a timeout, so we retry x number of times total before bailing on the webhook attempt entirely. If there's another way to achieve this without hooking into processingDeadlineExceeded except block here, I'm game for that.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could have the task processing deadline be longer than the cumulative timeouts of your HTTP request (and retries). That would let you do some retries within the task, handle those failures, and request a full task retry without having to catch processing deadlines.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed off issue but the issue we're hitting is with the DNS resolution in certain cases (ngrok is typically a culprit here), which we can't meaningfully apply a timeout to. This was a way for us to allow retries without reimplementing the same logic we already have here.

next_state = TASK_ACTIVATION_STATUS_RETRY
else:
next_state = TASK_ACTIVATION_STATUS_FAILURE
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sentry error captured before retry check causes duplicate reports

Medium Severity

In the ProcessingDeadlineExceeded handler, sentry_sdk.capture_exception fires before should_retry is checked. When report_timeout_errors is True (the default) and retries are configured, every timeout attempt reports to Sentry — even those that will be retried. This is inconsistent with the Exception handler, which only captures errors when the task is not being retried.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 91c9dff. Configure here.

except Exception as err:
retry = task_func.retry
captured_error = False
Expand Down
18 changes: 8 additions & 10 deletions clients/python/tests/test_retry.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from __future__ import annotations

from multiprocessing.context import TimeoutError

from sentry_protos.taskbroker.v1.taskbroker_pb2 import (
ON_ATTEMPTS_EXCEEDED_DEADLETTER,
ON_ATTEMPTS_EXCEEDED_DISCARD,
)

from taskbroker_client.retry import LastAction, Retry, RetryTaskError
from taskbroker_client.worker.workerchild import ProcessingDeadlineExceeded


class RuntimeChildError(RuntimeError):
Expand Down Expand Up @@ -71,22 +70,21 @@ def test_should_retry_retryerror() -> None:
assert not retry.should_retry(state, err)


def test_should_retry_multiprocessing_timeout() -> None:
retry = Retry(times=3)
def test_should_retry_processing_deadline_exceeded() -> None:
retry = Retry(times=3, on=(ProcessingDeadlineExceeded,))
state = retry.initial_state()

timeout = TimeoutError("timeouts should retry if there are attempts left")
assert retry.should_retry(state, timeout)
deadline_exceeded = ProcessingDeadlineExceeded("processing deadline exceeded")
assert retry.should_retry(state, deadline_exceeded)

state.attempts = 1
assert retry.should_retry(state, timeout)
assert retry.should_retry(state, deadline_exceeded)

# attempt = 2 is actually the third attempt.
state.attempts = 2
assert not retry.should_retry(state, timeout)
assert not retry.should_retry(state, deadline_exceeded)

state.attempts = 3
assert not retry.should_retry(state, timeout)
assert not retry.should_retry(state, deadline_exceeded)


def test_should_retry_error_allow_list() -> None:
Expand Down
Loading