Conversation
Signed-off-by: Yakir Oren <yakiroren@gmail.com>
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughThis pull request refactors the MapMutex synchronization mechanism, replacing backoff-based timeout handling with condition variable–based synchronization. The internal lock bookkeeping changes from ref-counted locks to a per-key Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
a131958 to
47bce0f
Compare
|
Summary:
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/utils/mutex.go`:
- Around line 11-15: The current keyState allows new readers to starve queued
writers because RLock only checks s.writer; modify the locking logic so writers
signal their intent and readers defer if writers are waiting: in Lock (and any
write-acquiring code path) increment s.waiters before blocking and decrement it
after acquiring (and before setting s.writer=true), and change RLock to block
when either s.writer is true OR s.waiters>0 (i.e., treat queued writers as
active), ensuring waiters is properly updated in Unlock/try paths and that all
Cond broadcasts/signals remain correct; update references in the keyState
struct, Lock, Unlock, RLock, and RUnlock methods to use waiters to prevent
reader admission while a writer is queued.
- Around line 71-85: The waiter can acquire the lock after its context was
canceled because we only check ctx.Err() while waiting; fix by re-checking the
context immediately after canProceed() becomes true and before decrementing
s.waiters or calling onAcquire: if ctx.Err() != nil then decrement s.waiters,
call m.maybeEvict(key, s), unlock m.mu, close(stop) and return ctx.Err(); apply
the same additional pre-acquisition ctx.Err() guard to the other fast-paths in
the same file that call onAcquire (the blocks around where onAcquire() is
invoked) to ensure canceled contexts never proceed.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 991c5165-87d9-4e77-a1e7-45ca49ef7139
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (3)
go.modpkg/utils/mutex.gopkg/utils/mutex_test.go
There was a problem hiding this comment.
Pull request overview
Refactors pkg/utils.MapMutex to use sync.Cond-based waiting instead of TryLock + backoff polling, aiming to improve cancellation responsiveness and contention behavior, and updates/extends related tests and module metadata.
Changes:
- Replaced per-key
sync.RWMutex+ backoff retry with async.Cond-driven wait/notify implementation and per-key state tracking. - Updated/expanded mutex tests to cover eviction behavior, cancellation/timeout scenarios, and high-contention stress cases.
- Ran dependency tidy-related updates in
go.mod/go.sum(e.g.,backoff/v5no longer a direct dependency).
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| pkg/utils/mutex.go | Reimplements keyed mutex using sync.Cond and state counters; removes backoff polling. |
| pkg/utils/mutex_test.go | Updates tests to align with new internals and adds high-contention/cancellation coverage. |
| go.mod | Adjusts direct vs indirect requirements after refactor/tidy. |
| go.sum | Updates sums to match module graph after tidy. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Spawn a goroutine to broadcast on context cancellation so that | ||
| // Cond.Wait wakes up and can observe ctx.Err(). The goroutine exits | ||
| // immediately in all cases (no leak). | ||
| stop := make(chan struct{}) | ||
| go func() { | ||
| select { | ||
| case <-done: | ||
| m.release(key) | ||
| return ctx.Err() | ||
| default: | ||
| case <-ctx.Done(): | ||
| m.mu.Lock() | ||
| s.cond.Broadcast() | ||
| m.mu.Unlock() |
There was a problem hiding this comment.
lockSlow spawns a new goroutine per contended Lock/RLock to broadcast on ctx.Done(). Under high contention this can create significant goroutine churn. Since this repo targets Go 1.25, consider using context.AfterFunc(ctx, ...) (and stopping it on exit) to avoid per-wait goroutine creation while still waking Cond.Wait() on cancellation.
| assert.Error(t, err, "should fail due to context timeout") | ||
|
|
||
| // Release the original lock | ||
| // Release the original lock — the cleanup goroutine will acquire then release. |
There was a problem hiding this comment.
The comment mentions a “cleanup goroutine” acquiring then releasing after Unlock, but the current MapMutex implementation performs eviction synchronously in Unlock/RUnlock (no cleanup goroutine). Please update/remove this comment to match the implementation so future readers aren’t misled.
| // Release the original lock — the cleanup goroutine will acquire then release. | |
| // Release the original lock so any state associated with the timed-out waiter can be cleaned up. |
Signed-off-by: Yakir Oren <yakiroren@gmail.com>
|
Summary:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 4 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Give the writer time to enter the wait loop. | ||
| time.Sleep(10 * time.Millisecond) | ||
|
|
||
| // A new reader must NOT slip past the pending writer. | ||
| shortCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) | ||
| defer cancel() | ||
| err = m.RLock(shortCtx, "key") | ||
| assert.Error(t, err, "new reader should block while a writer is pending") |
There was a problem hiding this comment.
The writer-priority test relies on fixed sleeps (10ms/50ms) for ordering, which can be flaky on slow/loaded CI runners. Consider synchronizing deterministically (e.g., wait until pendingWriters/waiters is observed >0 under m.mu, or use an explicit channel/barrier) before asserting the new reader blocks.
| stop := make(chan struct{}) | ||
| go func() { | ||
| select { | ||
| case <-done: | ||
| m.release(key) | ||
| return ctx.Err() | ||
| default: | ||
| case <-ctx.Done(): | ||
| m.mu.Lock() | ||
| s.cond.Broadcast() | ||
| m.mu.Unlock() | ||
| case <-stop: | ||
| } | ||
| if l.mu.TryLock() { | ||
| return nil | ||
| }() | ||
|
|
||
| for !canProceed() { | ||
| if ctx.Err() != nil { | ||
| s.waiters-- | ||
| if onCancel != nil { | ||
| onCancel() | ||
| } | ||
| m.maybeEvict(key, s) | ||
| m.mu.Unlock() | ||
| close(stop) | ||
| return ctx.Err() |
There was a problem hiding this comment.
In lockSlow, the cancellation wake-up relies on a helper goroutine doing select { case <-ctx.Done(): Broadcast; case <-stop: }. When lockSlow exits via the ctx.Err() path it closes stop after unlocking, and if the helper goroutine hasn't started yet it may observe both ctx.Done() and stop as ready and randomly choose the stop branch, skipping the Broadcast. That can leave other waiters asleep even though state changed (e.g., pendingWriters decremented), causing a potential indefinite block. Ensure cancellation always triggers a Broadcast (e.g., Broadcast explicitly in the ctx.Err() branch after updating state, or restructure using context.AfterFunc so the callback is guaranteed to run on cancellation).
| // Spawn a goroutine to broadcast on context cancellation so that | ||
| // Cond.Wait wakes up and can observe ctx.Err(). The goroutine exits | ||
| // immediately in all cases (no leak). | ||
| stop := make(chan struct{}) | ||
| go func() { | ||
| select { | ||
| case <-done: | ||
| m.release(key) | ||
| return ctx.Err() | ||
| default: | ||
| case <-ctx.Done(): | ||
| m.mu.Lock() | ||
| s.cond.Broadcast() | ||
| m.mu.Unlock() | ||
| case <-stop: | ||
| } | ||
| if l.mu.TryLock() { | ||
| return nil | ||
| }() |
There was a problem hiding this comment.
lockSlow spawns a dedicated goroutine for every contended Lock/RLock attempt to watch ctx.Done and broadcast. Under high contention with long timeouts, this can create large numbers of parked goroutines and increase scheduling/memory overhead. Consider replacing this with context.AfterFunc (stopped on successful acquire) or another mechanism that avoids one goroutine per waiter.
| m.mu.Lock() | ||
| s := m.keys[key] | ||
| s.writer = false | ||
| s.cond.Broadcast() | ||
| m.maybeEvict(key, s) |
There was a problem hiding this comment.
Unlock assumes m.keys[key] is present and will panic with a nil-pointer dereference if Unlock is called for an unknown/evicted key. Even if you want to panic on misuse (like sync.Mutex), it would be better to check for missing state and panic with a clear message rather than a nil deref.
| m.mu.Lock() | ||
| s := m.keys[key] | ||
| s.readers-- | ||
| if s.readers == 0 { |
There was a problem hiding this comment.
RUnlock assumes m.keys[key] is present and will panic with a nil-pointer dereference if RUnlock is called for an unknown/evicted key. Consider an explicit existence check and a clear panic/error to make misuse easier to diagnose.
| err := m.Lock(ctx, "key") | ||
| require.NoError(t, err) | ||
| close(writerDone) |
There was a problem hiding this comment.
Using require.NoError inside this goroutine is risky: if it fails, the goroutine will exit before close(writerDone), and the main test goroutine will block forever on <-writerDone. Prefer sending the error back to the main goroutine (or use t.Error/assert and ensure writerDone is always closed via defer) so the test can fail without hanging.
| err := m.Lock(ctx, "key") | |
| require.NoError(t, err) | |
| close(writerDone) | |
| defer close(writerDone) | |
| err := m.Lock(ctx, "key") | |
| assert.NoError(t, err) | |
| if err != nil { | |
| return | |
| } |
Summary by CodeRabbit
Release Notes
Refactor
Tests