feat(run_task): Add better_backpressure flag to avoid re-running function#536
Merged
feat(run_task): Add better_backpressure flag to avoid re-running function#536
Conversation
…tion When RunTask experiences backpressure from next_step, the user function would run multiple times on the same message. This is because MessageRejected propagates up to the consumer, which retries the same message, causing the function to execute again. With better_backpressure=True, RunTask now stores the transformed message and retries submission in poll(), matching the Rust behavior. Ref STREAM-881 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Address review feedback: - join() now flushes __message_carried_over before delegating to next_step - Parametrize existing tests on better_backpressure flag - Add specific tests for backpressure behavior and join() flushing Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…sure=True Address review feedback: close() was closing next_step before join() could flush the carried-over message, causing AssertionError in downstream strategies. Now follows the pattern from RunTaskWithMultiprocessing: - close() only sets __closed flag - join() flushes pending message, then calls next_step.close(), then next_step.join() Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 3c79e2d. Configure here.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
markstory
approved these changes
Apr 22, 2026
| cls, | ||
| function: Callable[[Message[TStrategyPayload]], TResult], | ||
| next_step: ProcessingStrategy[Union[FilteredPayload, TResult]], | ||
| better_backpressure: bool = False, |
Member
There was a problem hiding this comment.
If this works well it seems like it could be the default going forward.
Member
Author
There was a problem hiding this comment.
i think this should be the intent, just didn't want to take the risk now
untitaker
added a commit
to getsentry/sentry
that referenced
this pull request
Apr 22, 2026
Bumps arroyo to 2.39.0 and enables the new `better_backpressure=True` flag in the span buffer's RunTask. When the flusher applies backpressure, the current RunTask implementation re-raises `MessageRejected` and re-runs the user function on each retry attempt. This causes increased CPU load on Redis because `add-buffer.lua` executes multiple times per message. With `better_backpressure=True`, the function runs exactly once per message — transformed messages are stored on backpressure and retried in `poll()` instead. See getsentry/arroyo#536 for the arroyo change. Ref STREAM-881
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Summary
better_backpressure=Trueflag toRunTaskstrategypoll()instead of re-raisingMessageRejectedContext
The span buffer's flusher sometimes applies backpressure, causing increased CPU load on Redis. We suspect this is because
add-buffer.luais being executed multiple times per message when the currentRunTaskimplementation re-runs the function on each retry.With this fix (opt-in via
better_backpressure=True), the function runs exactly once per message, matching the Rust arroyo behavior.Ref STREAM-881
Test plan
🤖 Generated with Claude Code