Skip to content

feat(redis): add SDK-managed PEL reclaimer via retry_on_idle_ms#189

Open
rocky-jaiswal wants to merge 5 commits intomainfrom
feature/redis-pel-reclaimer
Open

feat(redis): add SDK-managed PEL reclaimer via retry_on_idle_ms#189
rocky-jaiswal wants to merge 5 commits intomainfrom
feature/redis-pel-reclaimer

Conversation

@rocky-jaiswal
Copy link
Contributor

Description

Adds SDK-managed PEL reclaiming for Redis Streams via a new retry_on_idle_ms opt-in kwarg on agent.subscribe(). Without this, any message handler that raises an exception under NACK_ON_ERROR leaves the message stuck in the Redis Pending Entries List (PEL) forever — FastStream only reads with XREADGROUP … > (new messages only) and never revisits the PEL.

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Refactoring (no functional changes)
  • Performance improvement
  • Test coverage improvement
  • CI/CD update
  • Dependency update

Related Issue

Fixes #(issue number)

Changes Made

  • sdk/eggai/transport/pending_reclaimer.py (new): PendingReclaimerManager
    runs a background task per subscription that pages through XPENDING, claims
    idle entries under a distinct -reclaimer consumer, re-publishes them to a
    dedicated .retry stream, and XACKs the original. Injects _retry_count
    and _original_message_id into the FastStream binary payload body for
    handler-level idempotency checks. Restart-safe: Redis client is recreated on
    each start() call.
  • sdk/eggai/transport/redis.py: Wires retry_on_idle_ms and
    retry_reclaim_interval_s kwargs into subscribe(); auto-subscribes the
    same handler to {channel}.retry; starts a second reclaimer on the retry
    stream that re-queues back to itself (no .retry.retry chain). Adds
    _get_stream_key() to normalise channel names to full Redis keys. Forwards
    all subscriber kwargs to the retry subscription.
  • sdk/eggai/agent.py: Validates min_idle_time / retry_on_idle_ms
    mutual exclusion at decoration time (not deferred to start()).
  • sdk/README.md: Documents the new retry_on_idle_ms feature, delivery
    semantics, injected metadata fields, and constraints.
  • sdk/tests/test_redis.py: Five new integration tests covering basic retry
    flow, no .retry.retry stream creation, conflict guard, stream key prefixing,
    and retry metadata injection.

Testing

  • Existing tests pass locally (make test)
  • Added new tests for new functionality
  • Tested manually (describe below if applicable)

Manual Testing Steps

N/A — covered by integration tests against a live Redis instance.

Changelog

  • I have updated sdk/CHANGELOG.md under [Unreleased] section
  • Or: This PR has the skip-changelog label (docs, CI, etc.)

Checklist

  • My code follows the project's code style (make lint passes)
  • My code is properly formatted (make format applied)
  • I have added/updated tests that prove my fix/feature works
  • I have added/updated documentation as needed
  • All tests pass locally
  • I have reviewed my own code
  • My changes generate no new warnings
  • My commit messages follow the Conventional Commits standard

Additional Notes

  • Fully backward-compatible — existing subscriptions without retry_on_idle_ms
    are completely unaffected.
  • Delivery guarantee is at-least-once. XADD and XACK are not atomic;
    handlers must be idempotent. Use _original_message_id for deduplication.
  • uv.lock is an untracked artefact from local test runs with uv; it is not
    part of this PR and should not be committed.

Copy link
Collaborator

@pontino pontino left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM — solid work fixing the four PEL reclaimer bugs. The dual-reclaimer + retry stream architecture is well thought out and the test suite gives good confidence.

A few findings worth discussing (shared separately), but nothing blocking:

  1. Silent parse error swallowing in _inject_retry_metadata — suggest logging at warning level when metadata injection falls back to raw bytes
  2. No max_retries / dead-letter — poison messages retry forever (already flagged, natural follow-up)
  3. ack_policy not forwarded to retry subscription — if user sets ACK, retry stream silently defaults to NACK_ON_ERROR
  4. Redis client orphaned in __init__start() recreates it, init should be None
  5. Private API riskBinaryWriter import from faststream.redis.parser.binary is not part of FastStream's public API (works today, fragile long-term)

Approving as-is — items 2 and 3 are worth addressing before or shortly after merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants