feat: Redis Cluster support via XACK+XADD reconciliation scan (#60)#122
Open
bburli-craft wants to merge 11 commits into
Open
feat: Redis Cluster support via XACK+XADD reconciliation scan (#60)#122bburli-craft wants to merge 11 commits into
bburli-craft wants to merge 11 commits into
Conversation
Replaces XCLAIM/XAUTOCLAIM recovery with a cluster-safe periodic reconciliation scan and adds OSS Redis Cluster support. Core changes: - Reconciliation scan (XPENDING + MinIdleTime) that verifies the owner's lock key is gone before recovering, fixing duplicate processing of slow-but-alive consumers (#111). Recovery re-queues via XACK + XADD, XACK-first so concurrent recoverers dedup (#112). Jittered timer (#110). - RecoveryConfig + ClusterMode options (#107); ClusterModeOSS subscribes to keyspace notifications on every master and adds ResetTopology() (#108, #109). - _retry_count field with DLQ routing past MaxRetries (#106). - Claim() rewritten to re-queue instead of XCLAIM (signature unchanged). - New metrics for reconciliation/re-queue/DLQ/mutex-skip/topology (#113). Tests/docs: - Unit tests for retry parsing, jitter, config validation. - Integration tests for #111/#112/#114/#106 and infra-gated OSS (#115); existing tests updated to the new recovery semantics. - README cluster section, CHANGELOG, migration guide, METRICS update (#116, #117). BREAKING CHANGE: metrics.Recorder and types.RedisStreamClient interfaces gained methods; recovery requires Redis 6.2+. See docs/MIGRATION.md. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- handle XAck errors instead of assigning to blank (errcheck check-blank) - rename routeToDLQ result var to avoid shadowing err (govet shadow) - wrap readLBSEntry signature under 120 cols (lll) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The recovering consumer is subscribed to keyspace events and may emit a StreamExpired (its EXISTS check can trigger Redis lazy expiry of a dead consumer's lock) alongside the StreamAdded for the re-queued stream. Update the recovery tests to consume notifications by type instead of asserting the first notification is StreamAdded: - TestLBSRecovery / TestLBSRecoveryOfDiscontinuousStreamMessages: act only on StreamAdded; drain remaining notifications until the channel closes. - TestMainFlow: Claim now re-queues (XACK+XADD) instead of XCLAIM, so drop the XCLAIM-specific ActiveTime/SeenTime assertions and tolerate ErrAlreadyClaimed; drain the re-queued StreamAdded before checking closure. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Recovery via XACK+XADD is at-least-once: under CI timing the re-queued stream can be redelivered more than once before the consumer's lock is observed, inflating the lock-acquisition counter. Assert recovery/redistribution- sensitive counters as lower bounds while keeping deterministic, map-based and behavioral assertions strict. Also fix a US-locale misspell (signalled). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Bring all docs in line with the new recovery model and cluster support: - README: rewrite Solution/config sections (RecoveryConfig, ClusterMode, deprecate WithLBSIdleTime/WithLBSRecoveryCount); fix Claim semantics (re-queue, process on the subsequent StreamAdded). - ARCHITECTURE: add recovery-model and cluster-modes sections; reconciliation scanner goroutine (4+N); Redis keys (DLQ, _retry_count); keep both paths. - USAGE: options table, notification table, Claim semantics, ResetTopology, Redis 6.2+ note. - OPERATIONS: MinIdleTime tuning, scan load, DLQ monitoring, OSS notes. - CODEBASE_OVERVIEW: interface methods, file map (reconcile/reQueue), fix notifs/broker.go path, threading 4+N. - EXAMPLE / examples READMEs / load-balancing main.go: Claim no longer grants ownership directly (remove process-after-Claim and dead handleClaimedStream); fix broken import and Done(ctx) calls. - CONTRIBUTING: correct the Claim godoc example signature/semantics. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
bburli-craft
commented
May 31, 2026
- configs: split DLQ field names and reconciliation defaults into dedicated const blocks; add DLQSuffix and DLQReasonUnparseable - impl: extract lbsEntry struct (lbsentry.go) with retryCount/ dataStreamName/cloneValues methods; move re-queue logic to requeue.go with a distinct reQueueError outcome and an extracted ownerAlive helper - impl: isolate OSS keyspace logic into oss.go behind an ossSubscriptions struct; tolerate per-master partial failure and record it via the new RecordMasterKeyspaceSetup metric - impl: add RecoveryConfigBuilder (recovery_config.go) and default the DLQ stream to <service>-input-dlq when unset - impl: route unparseable poison entries to the DLQ (reason "unparseable") instead of silently dropping them - metrics/test/examples: implement RecordMasterKeyspaceSetup across all recorders Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Collaborator
Author
|
@hthuwal @a3y3 Can you guys review this too? @subhramit Can you review again? |
subhramit
reviewed
Jun 4, 2026
Nothing acks or trims the DLQ stream, so it grew without bound. Add a RecoveryConfig.DLQMaxLen (default 10000, 0 disables) applied as an approximate MAXLEN trim on every DLQ XADD, plus a builder setter, validation, and updated docs. Addresses review comment on impl/requeue.go. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
subhramit
reviewed
Jun 4, 2026
Unlocking before XACK left a deterministic window where the lock was gone but the LBS entry was still in the PEL; a concurrent reconcileLBS could read "no lock => owner dead" and re-queue a stream that had just finished, causing duplicate processing. Reorder so XACK (which removes the entry from the PEL) and XDEL happen first, then release the lock. Until XACK succeeds the lock is still held, so the scan treats the owner as alive and skips it. Also fixes a latent bug where the XACK error path wrapped a stale nil err instead of res.Err(). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Unlocking before XACK left a window where the lock (the liveness signal the reconciliation scan uses) was gone while the entry was still in the PEL, so a concurrent reconcileLBS could see "no lock => owner dead" and re-queue a stream that just finished. Acknowledge and delete the entry first, then release the lock; the scan treats the owner as alive until the entry leaves the PEL. Addresses review comment on impl/relredis.go. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
subhramit
approved these changes
Jun 4, 2026
subhramit
left a comment
Collaborator
There was a problem hiding this comment.
Good to go from my POV but others should take a final look
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #60. Implements the core cluster-support work and sub-tasks #106–#117.
What changed
Replaces
XCLAIM/XAUTOCLAIMrecovery with a cluster-safe periodic reconciliation scan and adds OSS Redis Cluster support.XPENDING+MinIdleTime, jittered timer) verifies the owning consumer's lock key is gone before recovering — fixing duplicate processing of slow-but-alive consumers (Integration tests: mutex check prevents duplicate processing of slow consumers #111). Recovery re-queues viaXACK+XADD, XACK-first for cross-recoverer dedup (Integration tests: XACK-first deduplication across concurrent recoverers #112). Jitter: Add jitter to periodic scan timer #110.RecoveryConfig+ClusterModeoptions (AddRecoveryConfigandClusterModetoStreamClientConfig#107).ClusterModeOSSenables keyspace notifications and subscribes on every master (ForClusterModeOSS: per-consumer single-master keyspace subscription #108);ResetTopology()rebuilds subscriptions after failover/resharding (ForClusterModeOSS: exposeResetTopology()#109)._retry_count+ DLQ routing pastMaxRetries(Implement DLQ routing when_retry_countexceedsMaxRetries#106). The DLQ stream defaults to<service>-input-dlqwhen unset, so poison messages are preserved rather than dropped. Unparseable/poison entries are also routed to the DLQ (reasonunparseable) instead of being silently dropped.Claim()rewritten to re-queue instead ofXCLAIM(signature unchanged).Tests & docs
_retry_countexceedsMaxRetries#106 and infra-gated OSS (Integration tests: OSS Cluster keyspace subscription (if infra available) #115); existing tests updated to the new recovery semantics.CHANGELOG.md,docs/MIGRATION.md,docs/METRICS.md(Update README and CHANGELOG #116, Migration guide #117).Breaking changes
metrics.Recorderandtypes.RedisStreamClientgained methods.XPENDING ... IDLE).See
docs/MIGRATION.mdfor upgrade steps.