Skip to content

feat: Redis Cluster support via XACK+XADD reconciliation scan (#60)#122

Open
bburli-craft wants to merge 11 commits into
mainfrom
60-support-for-redis-cluster
Open

feat: Redis Cluster support via XACK+XADD reconciliation scan (#60)#122
bburli-craft wants to merge 11 commits into
mainfrom
60-support-for-redis-cluster

Conversation

@bburli-craft

@bburli-craft bburli-craft commented May 29, 2026

Copy link
Copy Markdown
Collaborator

Closes #60. Implements the core cluster-support work and sub-tasks #106#117.

What changed

Replaces XCLAIM/XAUTOCLAIM recovery with a cluster-safe periodic reconciliation scan and adds OSS Redis Cluster support.

Tests & docs

Breaking changes

  • metrics.Recorder and types.RedisStreamClient gained methods.
  • Recovery requires Redis 6.2+ (XPENDING ... IDLE).

See docs/MIGRATION.md for upgrade steps.

Replaces XCLAIM/XAUTOCLAIM recovery with a cluster-safe periodic
reconciliation scan and adds OSS Redis Cluster support.

Core changes:
- Reconciliation scan (XPENDING + MinIdleTime) that verifies the owner's
  lock key is gone before recovering, fixing duplicate processing of
  slow-but-alive consumers (#111). Recovery re-queues via XACK + XADD,
  XACK-first so concurrent recoverers dedup (#112). Jittered timer (#110).
- RecoveryConfig + ClusterMode options (#107); ClusterModeOSS subscribes to
  keyspace notifications on every master and adds ResetTopology() (#108, #109).
- _retry_count field with DLQ routing past MaxRetries (#106).
- Claim() rewritten to re-queue instead of XCLAIM (signature unchanged).
- New metrics for reconciliation/re-queue/DLQ/mutex-skip/topology (#113).

Tests/docs:
- Unit tests for retry parsing, jitter, config validation.
- Integration tests for #111/#112/#114/#106 and infra-gated OSS (#115);
  existing tests updated to the new recovery semantics.
- README cluster section, CHANGELOG, migration guide, METRICS update
  (#116, #117).

BREAKING CHANGE: metrics.Recorder and types.RedisStreamClient interfaces
gained methods; recovery requires Redis 6.2+. See docs/MIGRATION.md.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@bburli-craft bburli-craft linked an issue May 29, 2026 that may be closed by this pull request
Badarinarayan Burli and others added 4 commits May 30, 2026 00:50
- handle XAck errors instead of assigning to blank (errcheck check-blank)
- rename routeToDLQ result var to avoid shadowing err (govet shadow)
- wrap readLBSEntry signature under 120 cols (lll)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The recovering consumer is subscribed to keyspace events and may emit a
StreamExpired (its EXISTS check can trigger Redis lazy expiry of a dead
consumer's lock) alongside the StreamAdded for the re-queued stream. Update
the recovery tests to consume notifications by type instead of asserting the
first notification is StreamAdded:

- TestLBSRecovery / TestLBSRecoveryOfDiscontinuousStreamMessages: act only on
  StreamAdded; drain remaining notifications until the channel closes.
- TestMainFlow: Claim now re-queues (XACK+XADD) instead of XCLAIM, so drop the
  XCLAIM-specific ActiveTime/SeenTime assertions and tolerate ErrAlreadyClaimed;
  drain the re-queued StreamAdded before checking closure.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Recovery via XACK+XADD is at-least-once: under CI timing the re-queued stream
can be redelivered more than once before the consumer's lock is observed,
inflating the lock-acquisition counter. Assert recovery/redistribution-
sensitive counters as lower bounds while keeping deterministic, map-based and
behavioral assertions strict. Also fix a US-locale misspell (signalled).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Bring all docs in line with the new recovery model and cluster support:

- README: rewrite Solution/config sections (RecoveryConfig, ClusterMode,
  deprecate WithLBSIdleTime/WithLBSRecoveryCount); fix Claim semantics
  (re-queue, process on the subsequent StreamAdded).
- ARCHITECTURE: add recovery-model and cluster-modes sections; reconciliation
  scanner goroutine (4+N); Redis keys (DLQ, _retry_count); keep both paths.
- USAGE: options table, notification table, Claim semantics, ResetTopology,
  Redis 6.2+ note.
- OPERATIONS: MinIdleTime tuning, scan load, DLQ monitoring, OSS notes.
- CODEBASE_OVERVIEW: interface methods, file map (reconcile/reQueue),
  fix notifs/broker.go path, threading 4+N.
- EXAMPLE / examples READMEs / load-balancing main.go: Claim no longer grants
  ownership directly (remove process-after-Claim and dead handleClaimedStream);
  fix broken import and Done(ctx) calls.
- CONTRIBUTING: correct the Claim godoc example signature/semantics.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@bburli-craft bburli-craft requested review from a3y3, hthuwal and subhramit and removed request for subhramit May 29, 2026 20:07
subhramit
subhramit previously approved these changes May 31, 2026

@subhramit subhramit left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my POV I couldn't notice anything off.
needs a look by @hthuwal @a3y3

Comment thread configs/consts.go
Comment thread configs/consts.go
Comment thread impl/helpers.go Outdated
Comment thread impl/helpers.go Outdated
Comment thread impl/helpers.go Outdated
Comment thread impl/init.go
Comment thread impl/opts.go Outdated
Comment thread impl/opts.go Outdated
Comment thread impl/relredis.go Outdated
Comment thread metrics/recorder.go
@subhramit subhramit dismissed their stale review May 31, 2026 19:43

open comments to be addressed

Badarinarayan Burli and others added 2 commits June 1, 2026 01:35
- configs: split DLQ field names and reconciliation defaults into
  dedicated const blocks; add DLQSuffix and DLQReasonUnparseable
- impl: extract lbsEntry struct (lbsentry.go) with retryCount/
  dataStreamName/cloneValues methods; move re-queue logic to
  requeue.go with a distinct reQueueError outcome and an extracted
  ownerAlive helper
- impl: isolate OSS keyspace logic into oss.go behind an
  ossSubscriptions struct; tolerate per-master partial failure and
  record it via the new RecordMasterKeyspaceSetup metric
- impl: add RecoveryConfigBuilder (recovery_config.go) and default the
  DLQ stream to <service>-input-dlq when unset
- impl: route unparseable poison entries to the DLQ (reason
  "unparseable") instead of silently dropping them
- metrics/test/examples: implement RecordMasterKeyspaceSetup across all
  recorders

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@bburli-craft

Copy link
Copy Markdown
Collaborator Author

@hthuwal @a3y3 Can you guys review this too? @subhramit Can you review again?

@bburli-craft bburli-craft requested a review from subhramit June 4, 2026 12:55
Comment thread impl/requeue.go
Badarinarayan Burli and others added 2 commits June 4, 2026 18:45
Nothing acks or trims the DLQ stream, so it grew without bound. Add a
RecoveryConfig.DLQMaxLen (default 10000, 0 disables) applied as an
approximate MAXLEN trim on every DLQ XADD, plus a builder setter,
validation, and updated docs.

Addresses review comment on impl/requeue.go.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread impl/relredis.go Outdated
Badarinarayan Burli and others added 2 commits June 4, 2026 18:53
Unlocking before XACK left a deterministic window where the lock was gone but
the LBS entry was still in the PEL; a concurrent reconcileLBS could read "no
lock => owner dead" and re-queue a stream that had just finished, causing
duplicate processing. Reorder so XACK (which removes the entry from the PEL)
and XDEL happen first, then release the lock. Until XACK succeeds the lock is
still held, so the scan treats the owner as alive and skips it.

Also fixes a latent bug where the XACK error path wrapped a stale nil err
instead of res.Err().

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Unlocking before XACK left a window where the lock (the liveness signal
the reconciliation scan uses) was gone while the entry was still in the
PEL, so a concurrent reconcileLBS could see "no lock => owner dead" and
re-queue a stream that just finished. Acknowledge and delete the entry
first, then release the lock; the scan treats the owner as alive until
the entry leaves the PEL.

Addresses review comment on impl/relredis.go.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@subhramit subhramit left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to go from my POV but others should take a final look

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support for redis cluster

2 participants