feat(redis): add SDK-managed PEL reclaimer via retry_on_idle_ms#189
Open
rocky-jaiswal wants to merge 5 commits intomainfrom
Open
feat(redis): add SDK-managed PEL reclaimer via retry_on_idle_ms#189rocky-jaiswal wants to merge 5 commits intomainfrom
rocky-jaiswal wants to merge 5 commits intomainfrom
Conversation
pontino
approved these changes
Mar 10, 2026
Collaborator
pontino
left a comment
There was a problem hiding this comment.
LGTM — solid work fixing the four PEL reclaimer bugs. The dual-reclaimer + retry stream architecture is well thought out and the test suite gives good confidence.
A few findings worth discussing (shared separately), but nothing blocking:
- Silent parse error swallowing in
_inject_retry_metadata— suggest logging at warning level when metadata injection falls back to raw bytes - No max_retries / dead-letter — poison messages retry forever (already flagged, natural follow-up)
ack_policynot forwarded to retry subscription — if user setsACK, retry stream silently defaults toNACK_ON_ERROR- Redis client orphaned in
__init__—start()recreates it, init should beNone - Private API risk —
BinaryWriterimport fromfaststream.redis.parser.binaryis not part of FastStream's public API (works today, fragile long-term)
Approving as-is — items 2 and 3 are worth addressing before or shortly after merge.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Adds SDK-managed PEL reclaiming for Redis Streams via a new
retry_on_idle_msopt-in kwarg onagent.subscribe(). Without this, any message handler that raises an exception underNACK_ON_ERRORleaves the message stuck in the Redis Pending Entries List (PEL) forever — FastStream only reads withXREADGROUP … >(new messages only) and never revisits the PEL.Type of Change
Related Issue
Fixes #(issue number)
Changes Made
sdk/eggai/transport/pending_reclaimer.py(new):PendingReclaimerManagerruns a background task per subscription that pages through
XPENDING, claimsidle entries under a distinct
-reclaimerconsumer, re-publishes them to adedicated
.retrystream, andXACKs the original. Injects_retry_countand
_original_message_idinto the FastStream binary payload body forhandler-level idempotency checks. Restart-safe: Redis client is recreated on
each
start()call.sdk/eggai/transport/redis.py: Wiresretry_on_idle_msandretry_reclaim_interval_skwargs intosubscribe(); auto-subscribes thesame handler to
{channel}.retry; starts a second reclaimer on the retrystream that re-queues back to itself (no
.retry.retrychain). Adds_get_stream_key()to normalise channel names to full Redis keys. Forwardsall subscriber kwargs to the retry subscription.
sdk/eggai/agent.py: Validatesmin_idle_time/retry_on_idle_msmutual exclusion at decoration time (not deferred to
start()).sdk/README.md: Documents the newretry_on_idle_msfeature, deliverysemantics, injected metadata fields, and constraints.
sdk/tests/test_redis.py: Five new integration tests covering basic retryflow, no
.retry.retrystream creation, conflict guard, stream key prefixing,and retry metadata injection.
Testing
make test)Manual Testing Steps
N/A — covered by integration tests against a live Redis instance.
Changelog
sdk/CHANGELOG.mdunder[Unreleased]sectionskip-changeloglabel (docs, CI, etc.)Checklist
make lintpasses)make formatapplied)Additional Notes
retry_on_idle_msare completely unaffected.
XADDandXACKare not atomic;handlers must be idempotent. Use
_original_message_idfor deduplication.uv.lockis an untracked artefact from local test runs withuv; it is notpart of this PR and should not be committed.