Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions clients/python/src/examples/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from examples.app import app
from taskbroker_client.retry import LastAction, NoRetriesRemainingError, Retry, RetryTaskError
from taskbroker_client.retry import retry_task as retry_task_helper
from taskbroker_client.worker.workerchild import ProcessingDeadlineExceeded

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -72,3 +73,39 @@ def will_retry(failure: str) -> None:
def timed_task(sleep_seconds: float | str, *args: list[Any], **kwargs: dict[str, Any]) -> None:
sleep(float(sleep_seconds))
logger.debug("timed_task complete")


@exampletasks.register(
name="examples.will_timeout_without_reporting",
processing_deadline_duration=1,
report_timeout_errors=False,
)
def will_timeout_without_reporting() -> None:
timed_task(sleep_seconds=2)


@exampletasks.register(
name="examples.will_fail_with_expected_exception",
retry=Retry(times=2, times_exceeded=LastAction.Discard),
expected_exceptions=(RuntimeError,),
)
def will_fail_with_expected_exception() -> None:
raise RuntimeError("oh no")


@exampletasks.register(
name="examples.will_fail_with_expected_ignored_exception",
retry=Retry(times=2, on=(RuntimeError,), times_exceeded=LastAction.Discard),
expected_exceptions=(RuntimeError,),
)
def will_fail_with_expected_ignored_exception() -> None:
raise RuntimeError("oh no")


@exampletasks.register(
name="examples.will_retry_on_deadline_exceeded",
processing_deadline_duration=1,
retry=Retry(times=2, on=(ProcessingDeadlineExceeded,), times_exceeded=LastAction.Discard),
)
def will_retry_on_deadline_exceeded() -> None:
timed_task(sleep_seconds=2)
16 changes: 16 additions & 0 deletions clients/python/src/taskbroker_client/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ def register(
at_most_once: bool = False,
wait_for_delivery: bool = False,
compression_type: CompressionType = CompressionType.PLAINTEXT,
report_timeout_errors: bool = True,
expected_exceptions: tuple[type[BaseException], ...] | None = None,
) -> Callable[[Callable[P, R]], Task[P, R]]:
"""
Register a task.
Expand Down Expand Up @@ -115,6 +117,10 @@ def register(
before returning.
compression_type: CompressionType
The compression type to use to compress the task parameters.
report_timeout_errors: bool
Enable reporting of ProcessingDeadlineExceededError to Sentry.
expected_exceptions: tuple[type[BaseException], ...] | None
A tuple of exception types that will not be reported by Sentry.
"""

def wrapped(func: Callable[P, R]) -> Task[P, R]:
Expand All @@ -133,6 +139,8 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]:
at_most_once=at_most_once,
wait_for_delivery=wait_for_delivery,
compression_type=compression_type,
report_timeout_errors=report_timeout_errors,
expected_exceptions=expected_exceptions,
)
# TODO(taskworker) tasks should be registered into the registry
# so that we can ensure task names are globally unique
Expand Down Expand Up @@ -224,6 +232,8 @@ def register(
at_most_once: bool = False,
wait_for_delivery: bool = False,
compression_type: CompressionType = CompressionType.PLAINTEXT,
report_timeout_errors: bool = True,
expected_exceptions: tuple[type[BaseException], ...] | None = None,
) -> Callable[[Callable[P, R]], ExternalTask[P, R]]:
"""
Register an external task stub.
Expand Down Expand Up @@ -251,6 +261,10 @@ def register(
before returning.
compression_type: CompressionType
The compression type to use to compress the task parameters.
report_timeout_errors: bool
Enable reporting of ProcessingDeadlineExceededError to Sentry.
expected_exceptions: tuple[type[BaseException], ...] | None
A tuple of exception types that will not be reported by Sentry.
"""

def wrapped(func: Callable[P, R]) -> ExternalTask[P, R]:
Expand All @@ -269,6 +283,8 @@ def wrapped(func: Callable[P, R]) -> ExternalTask[P, R]:
at_most_once=at_most_once,
wait_for_delivery=wait_for_delivery,
compression_type=compression_type,
report_timeout_errors=report_timeout_errors,
expected_exceptions=expected_exceptions,
)
self._registered_tasks[name] = task
return task
Comment on lines 283 to 290
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The new expected_exceptions and report_timeout_errors parameters on ExternalNamespace.register() have no effect because ExternalTask instances are not executed locally.
Severity: MEDIUM

Suggested Fix

Raise a TypeError or ValueError if expected_exceptions or report_timeout_errors are passed to ExternalNamespace.register(), as these parameters are not supported for external tasks. Alternatively, document this limitation clearly in the method's docstring to prevent misuse.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: clients/python/src/taskbroker_client/registry.py#L283-L290

Potential issue: The `ExternalNamespace.register()` method now accepts
`expected_exceptions` and `report_timeout_errors` parameters. However, these are
silently ignored for external tasks. These parameters control task execution behavior,
but `ExternalTask` instances are only dispatched, not executed, by the local worker; the
remote application's task registration governs error handling. A developer setting
`expected_exceptions` on an external task stub will incorrectly believe they have
suppressed certain error reports, when in fact the setting has no effect at runtime.

Did we get this right? 👍 / 👎 to inform future reviews.

Expand Down
2 changes: 2 additions & 0 deletions clients/python/src/taskbroker_client/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(
wait_for_delivery: bool = False,
compression_type: CompressionType = CompressionType.PLAINTEXT,
report_timeout_errors: bool = True,
expected_exceptions: tuple[type[BaseException], ...] | None = None,
):
self.name = name
self._func = func
Expand All @@ -86,6 +87,7 @@ def __init__(
self.wait_for_delivery = wait_for_delivery
self.compression_type = compression_type
self.report_timeout_errors = report_timeout_errors
self.expected_exceptions = expected_exceptions or ()
update_wrapper(self, func)

@property
Expand Down
32 changes: 20 additions & 12 deletions clients/python/src/taskbroker_client/worker/workerchild.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
except Exception as err:
retry = task_func.retry
captured_error = False
should_capture_error = not isinstance(err, task_func.expected_exceptions)
if retry:
if retry.should_retry(inflight.activation.retry_state, err):
logger.info(
Expand All @@ -280,20 +281,27 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
next_state = TASK_ACTIVATION_STATUS_RETRY
elif retry.max_attempts_reached(inflight.activation.retry_state):
with sentry_sdk.isolation_scope() as scope:
retry_error = NoRetriesRemainingError(
f"{inflight.activation.taskname} has consumed all of its retries"
)
retry_error.__cause__ = err
scope.fingerprint = [
"taskworker.no_retries_remaining",
inflight.activation.namespace,
inflight.activation.taskname,
]
scope.set_transaction_name(inflight.activation.taskname)
sentry_sdk.capture_exception(retry_error)
if should_capture_error:
retry_error = NoRetriesRemainingError(
f"{inflight.activation.taskname} has consumed all of its retries"
)
retry_error.__cause__ = err
scope.fingerprint = [
"taskworker.no_retries_remaining",
inflight.activation.namespace,
inflight.activation.taskname,
]
scope.set_transaction_name(inflight.activation.taskname)
sentry_sdk.capture_exception(retry_error)
# In this branch, all exceptions should be either
# captured or silenced.
Comment thread
cursor[bot] marked this conversation as resolved.
captured_error = True

if not captured_error and next_state != TASK_ACTIVATION_STATUS_RETRY:
if (
should_capture_error
and not captured_error
and next_state != TASK_ACTIVATION_STATUS_RETRY
):
sentry_sdk.capture_exception(err)

clear_current_task()
Expand Down
179 changes: 179 additions & 0 deletions clients/python/tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,82 @@
),
)

# Task with Retry logic, expected exceptions to silence reporting
RETRY_TASK_WITH_SILENCED_TIMEOUT = InflightTaskActivation(
host="localhost:50051",
receive_timestamp=0,
activation=TaskActivation(
id="654",
taskname="examples.will_timeout_without_reporting",
namespace="examples",
parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True),
processing_deadline_duration=1,
retry_state=RetryState(
# no more attempts left
attempts=1,
max_attempts=2,
on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD,
),
),
)

# Task with Retry logic, expected exceptions to silence reporting
RETRY_TASK_WITH_EXPECTED_UNHANDLED_EXCEPTION = InflightTaskActivation(
host="localhost:50051",
receive_timestamp=0,
activation=TaskActivation(
id="654",
taskname="examples.will_fail_with_expected_exception",
namespace="examples",
parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True),
processing_deadline_duration=2,
retry_state=RetryState(
# One retry left
attempts=0,
max_attempts=2,
on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD,
),
),
)

# Task set to retry on deadline exceeded exceptions
RETRY_TASK_WITH_EXPECTED_IGNORED_EXCEPTION = InflightTaskActivation(
host="localhost:50051",
receive_timestamp=0,
activation=TaskActivation(
id="654",
taskname="examples.will_fail_with_expected_ignored_exception",
namespace="examples",
parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True),
processing_deadline_duration=2,
retry_state=RetryState(
# One retry left
attempts=0,
max_attempts=2,
on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD,
),
),
)

# Task set to retry on deadline exceeded exceptions
RETRY_TASK_ON_DEADLINE_EXCEEDED = InflightTaskActivation(
host="localhost:50051",
receive_timestamp=0,
activation=TaskActivation(
id="654",
taskname="examples.will_retry_on_deadline_exceeded",
namespace="examples",
parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True),
processing_deadline_duration=1,
retry_state=RetryState(
# One retry left
attempts=0,
max_attempts=2,
on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD,
),
),
)


class TestTaskWorker(TestCase):
def test_fetch_task(self) -> None:
Expand Down Expand Up @@ -901,3 +977,106 @@ def on_execute(self, headers: dict[str, str]) -> contextlib.AbstractContextManag
assert executed_headers[0]["x-viewer-user"] == "7"
finally:
app.context_hooks.remove(hook)


@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception")
def test_child_process_silenced_timeout(mock_capture: mock.Mock) -> None:
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
shutdown = Event()

todo.put(RETRY_TASK_WITH_SILENCED_TIMEOUT)
child_process(
"examples.app:app",
todo,
processed,
shutdown,
max_task_count=1,
processing_pool_name="test",
process_type="fork",
)

assert todo.empty()
result = processed.get()
assert result.task_id == RETRY_TASK_WITH_SILENCED_TIMEOUT.activation.id
assert result.status == TASK_ACTIVATION_STATUS_FAILURE
assert mock_capture.call_count == 0


@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception")
def test_child_process_expected_exception_with_retries(mock_capture: mock.Mock) -> None:
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
shutdown = Event()

todo.put(RETRY_TASK_WITH_EXPECTED_UNHANDLED_EXCEPTION)
child_process(
"examples.app:app",
todo,
processed,
shutdown,
max_task_count=1,
processing_pool_name="test",
process_type="fork",
)

assert todo.empty()
result = processed.get()
assert result.task_id == RETRY_TASK_WITH_EXPECTED_UNHANDLED_EXCEPTION.activation.id

# No reporting, but the task still raised an unhandled exception
assert result.status == TASK_ACTIVATION_STATUS_FAILURE
assert mock_capture.call_count == 0


@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception")
def test_child_process_expected_ignored_exception_max_attempts(mock_capture: mock.Mock) -> None:
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
shutdown = Event()

# Task has more retries left, but is set to ignore the raised error type
todo.put(RETRY_TASK_WITH_EXPECTED_IGNORED_EXCEPTION)
child_process(
"examples.app:app",
todo,
processed,
shutdown,
max_task_count=1,
processing_pool_name="test",
process_type="fork",
)

# No reporting, but exception type is retriable
assert todo.empty()
result = processed.get()
assert result.task_id == RETRY_TASK_WITH_EXPECTED_IGNORED_EXCEPTION.activation.id
assert result.status == TASK_ACTIVATION_STATUS_RETRY
assert mock_capture.call_count == 0


@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception")
def test_child_process_retry_on_deadline_exceeded(mock_capture: mock.Mock) -> None:
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
shutdown = Event()

# Task will timeout, but should retry, because ProcessingDeadlineExceeded is
# in the Retry.on list
todo.put(RETRY_TASK_ON_DEADLINE_EXCEEDED)
child_process(
"examples.app:app",
todo,
processed,
shutdown,
max_task_count=1,
processing_pool_name="test",
process_type="fork",
)

assert todo.empty()
result = processed.get()
assert result.task_id == RETRY_TASK_ON_DEADLINE_EXCEEDED.activation.id
assert result.status == TASK_ACTIVATION_STATUS_RETRY
assert mock_capture.call_count == 1
assert type(mock_capture.call_args.args[0]) is ProcessingDeadlineExceeded
Loading