Skip to content

feat(run_task): Add better_backpressure flag to avoid re-running function#536

Merged
untitaker merged 4 commits intomainfrom
fix-runtask-backpressure
Apr 22, 2026
Merged

feat(run_task): Add better_backpressure flag to avoid re-running function#536
untitaker merged 4 commits intomainfrom
fix-runtask-backpressure

Conversation

@untitaker
Copy link
Copy Markdown
Member

Summary

  • Adds better_backpressure=True flag to RunTask strategy
  • When enabled, stores transformed messages on backpressure and retries in poll() instead of re-raising MessageRejected
  • Prevents user function from running multiple times on the same message during backpressure

Context

The span buffer's flusher sometimes applies backpressure, causing increased CPU load on Redis. We suspect this is because add-buffer.lua is being executed multiple times per message when the current RunTask implementation re-runs the function on each retry.

With this fix (opt-in via better_backpressure=True), the function runs exactly once per message, matching the Rust arroyo behavior.

Ref STREAM-881

Test plan

  • Existing tests pass
  • mypy clean
  • Enable in span buffer consumer and monitor Redis CPU

🤖 Generated with Claude Code

…tion

When RunTask experiences backpressure from next_step, the user function
would run multiple times on the same message. This is because
MessageRejected propagates up to the consumer, which retries the same
message, causing the function to execute again.

With better_backpressure=True, RunTask now stores the transformed
message and retries submission in poll(), matching the Rust behavior.

Ref STREAM-881

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@untitaker untitaker requested review from a team as code owners April 21, 2026 15:15
@linear-code
Copy link
Copy Markdown

linear-code Bot commented Apr 21, 2026

Comment thread arroyo/processing/strategies/run_task.py Outdated
Address review feedback:
- join() now flushes __message_carried_over before delegating to next_step
- Parametrize existing tests on better_backpressure flag
- Add specific tests for backpressure behavior and join() flushing

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Comment thread arroyo/processing/strategies/run_task.py Outdated
…sure=True

Address review feedback: close() was closing next_step before join() could
flush the carried-over message, causing AssertionError in downstream strategies.

Now follows the pattern from RunTaskWithMultiprocessing:
- close() only sets __closed flag
- join() flushes pending message, then calls next_step.close(), then next_step.join()

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 3c79e2d. Configure here.

Comment thread arroyo/processing/strategies/run_task.py Outdated
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
cls,
function: Callable[[Message[TStrategyPayload]], TResult],
next_step: ProcessingStrategy[Union[FilteredPayload, TResult]],
better_backpressure: bool = False,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this works well it seems like it could be the default going forward.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this should be the intent, just didn't want to take the risk now

@untitaker untitaker merged commit 5210932 into main Apr 22, 2026
17 checks passed
@untitaker untitaker deleted the fix-runtask-backpressure branch April 22, 2026 13:53
untitaker added a commit to getsentry/sentry that referenced this pull request Apr 22, 2026
Bumps arroyo to 2.39.0 and enables the new `better_backpressure=True`
flag in the span buffer's RunTask.

When the flusher applies backpressure, the current RunTask
implementation re-raises `MessageRejected` and re-runs the user function
on each retry attempt. This causes increased CPU load on Redis because
`add-buffer.lua` executes multiple times per message.

With `better_backpressure=True`, the function runs exactly once per
message — transformed messages are stored on backpressure and retried in
`poll()` instead.

See getsentry/arroyo#536 for the arroyo change.

Ref STREAM-881
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants