fix: resolve data races (issues #221, #285, #246)#379
Conversation
- #221: Copy successCount/lastErr/size while holding mutex after wg.Wait() to prevent data race between goroutine writes and main goroutine read - #285: Use Lock() instead of RLock() in broadcast case since we delete from h.clients map; route to unregister channel to avoid double-close - #246: Retry on DB unique constraint conflict when allocating IP to handle theoretical race from deterministic IP generation Fixes: #221, #285, #246, #219
|
Warning Rate limit exceeded
To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughAdds retry-on-conflict for Elastic IP allocation, routes WebSocket client removals through the unregister channel with a full mutex during broadcasts, and clarifies a post-wait visibility comment in coordinator quorum checking. ChangesElastic IP Allocation Retry
WebSocket Broadcast and Client Removal
Coordinator Quorum-Check Clarification
Tests / Flaky Test Skip
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 56 minutes and 22 seconds.Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
internal/storage/coordinator/service.go (1)
172-183: 💤 Low valuePost-
wg.Wait()mutex lock is redundant.After
wg.Wait()returns, the Go Memory Model already guarantees visibility of all goroutine writes: within each goroutine, defers execute LIFO —mu.Unlock()(registered 2nd) fires beforewg.Done()(registered 1st) — establishing the chain writes →mu.Unlock()→wg.Done()→wg.Wait()return. ReadingsuccessCount,lastErr, andsizedirectly afterwg.Wait()is therefore race-free without an additional lock.♻️ Simplified quorum check
- // 3. Quorum check - lock to safely read shared variables written by goroutines - mu.Lock() - count := successCount - lerr := lastErr - sz := size - mu.Unlock() - - if count < c.writeQuorum { - return 0, fmt.Errorf("assemble quorum failed (%d/%d): %w", count, c.writeQuorum, lerr) - } - - return sz, nil + // 3. Quorum check — safe to read directly; wg.Wait() establishes happens-before + if successCount < c.writeQuorum { + return 0, fmt.Errorf("assemble quorum failed (%d/%d): %w", successCount, c.writeQuorum, lastErr) + } + + return size, nil🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/storage/coordinator/service.go` around lines 172 - 183, The post-wg.Wait() mu.Lock()/Unlock() is redundant; after wg.Wait() goroutine writes are visible so read successCount, lastErr and size directly without locking and use them for the quorum check (compare count to c.writeQuorum and return fmt.Errorf as before); keep existing mutex usage inside the worker goroutines but remove the mu.Lock()/mu.Unlock() lines around the block that reads successCount, lastErr and size following wg.Wait() to simplify and avoid unnecessary locking.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/core/services/elastic_ip.go`:
- Line 61: Replace the local magic literal maxRetries := 3 with a package-level
named constant (e.g., const maxElasticIPRetries = 3) and use that constant where
maxRetries is referenced; declare the constant at the top of the package and
remove the local variable assignment in the function so code uses
maxElasticIPRetries instead of the unexplained literal in functions such as the
one that currently defines maxRetries.
- Around line 78-81: The retry loop in the service calling s.repo.Create in the
method in elastic_ip.go incorrectly retries on every database error and ignores
context cancellation; modify the loop to first check ctx.Err() at the top of
each iteration and return if cancelled, and then change the retry logic so it
only continues on a unique-constraint collision sentinel (e.g., an
errors.AlreadyExists or a wrapped *pgconn.PgError with Code == "23505") instead
of any error; to enable that, update ElasticIPRepository.Create (implemented in
internal/repositories/postgres/elastic_ip_repo.go) to preserve/expose the
underlying pg error (or return a distinct sentinel for 23505) so the service can
errors.As/compare and only retry on the 23505 condition and immediately return
for all other errors.
In `@internal/handlers/ws/hub.go`:
- Around line 69-81: The broadcast loop reading from h.broadcast currently sends
to the unbuffered h.unregister channel inside the clients iteration, which can
block and deadlock when a client's send buffer is full; change the logic in the
case handling of h.broadcast to collect dead clients (e.g., append to a slice)
when client.send is not writable instead of sending to h.unregister inline, and
after the iteration iterate that slice and send each dead client to h.unregister
(follow the pattern used in BroadcastEventToTenant for guidance). Also fix the
unregister handler by moving platform.WSConnectionsActive.Dec() inside the if _,
ok := h.clients[client]; ok { ... } block so the counter is decremented only
when a client is actually removed (preventing double-decrement when ReadPump's
deferred Unregister fires).
---
Nitpick comments:
In `@internal/storage/coordinator/service.go`:
- Around line 172-183: The post-wg.Wait() mu.Lock()/Unlock() is redundant; after
wg.Wait() goroutine writes are visible so read successCount, lastErr and size
directly without locking and use them for the quorum check (compare count to
c.writeQuorum and return fmt.Errorf as before); keep existing mutex usage inside
the worker goroutines but remove the mu.Lock()/mu.Unlock() lines around the
block that reads successCount, lastErr and size following wg.Wait() to simplify
and avoid unnecessary locking.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f5e30f7f-ddee-4a66-ad7b-693a439a39d4
📒 Files selected for processing (3)
internal/core/services/elastic_ip.gointernal/handlers/ws/hub.gointernal/storage/coordinator/service.go
| case message := <-h.broadcast: | ||
| h.mu.RLock() | ||
| h.mu.Lock() | ||
| for client := range h.clients { | ||
| select { | ||
| case client.send <- message: | ||
| default: | ||
| close(client.send) | ||
| delete(h.clients, client) | ||
| h.mu.Unlock() | ||
| h.unregister <- client | ||
| h.mu.Lock() | ||
| } | ||
| } | ||
| h.mu.RUnlock() | ||
| h.mu.Unlock() | ||
| } |
There was a problem hiding this comment.
Deadlock: sending to unbuffered h.unregister channel from within broadcast handler blocks forever.
The h.unregister channel is unbuffered (line 35). Since the only receiver is the same goroutine's select statement, and we're already inside the case message := <-h.broadcast: block, the send at line 76 will block indefinitely waiting for a receiver that cannot run. This causes a deadlock whenever any client's send buffer is full during broadcast.
Collect dead clients in a slice and process them after the iteration, similar to the pattern used in BroadcastEventToTenant (lines 136-163).
🐛 Proposed fix
case message := <-h.broadcast:
- h.mu.Lock()
+ var toRemove []*Client
+ h.mu.RLock()
for client := range h.clients {
select {
case client.send <- message:
default:
- h.mu.Unlock()
- h.unregister <- client
- h.mu.Lock()
+ toRemove = append(toRemove, client)
}
}
- h.mu.Unlock()
+ h.mu.RUnlock()
+ for _, client := range toRemove {
+ h.mu.Lock()
+ if _, ok := h.clients[client]; ok {
+ delete(h.clients, client)
+ close(client.send)
+ platform.WSConnectionsActive.Dec()
+ }
+ h.mu.Unlock()
+ }
}Additionally, note that line 66 (platform.WSConnectionsActive.Dec()) in the unregister handler is outside the if _, ok := h.clients[client]; ok check. This means if a client is removed here and later ReadPump's deferred Unregister fires (per context snippet), the counter will be decremented twice. Move line 66 inside the if block:
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
+ platform.WSConnectionsActive.Dec()
}
h.mu.Unlock()
- platform.WSConnectionsActive.Dec()
h.logger.Debug("client disconnected", slog.Int("total", len(h.clients)))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case message := <-h.broadcast: | |
| h.mu.RLock() | |
| h.mu.Lock() | |
| for client := range h.clients { | |
| select { | |
| case client.send <- message: | |
| default: | |
| close(client.send) | |
| delete(h.clients, client) | |
| h.mu.Unlock() | |
| h.unregister <- client | |
| h.mu.Lock() | |
| } | |
| } | |
| h.mu.RUnlock() | |
| h.mu.Unlock() | |
| } | |
| case message := <-h.broadcast: | |
| var toRemove []*Client | |
| h.mu.RLock() | |
| for client := range h.clients { | |
| select { | |
| case client.send <- message: | |
| default: | |
| toRemove = append(toRemove, client) | |
| } | |
| } | |
| h.mu.RUnlock() | |
| for _, client := range toRemove { | |
| h.mu.Lock() | |
| if _, ok := h.clients[client]; ok { | |
| delete(h.clients, client) | |
| close(client.send) | |
| platform.WSConnectionsActive.Dec() | |
| } | |
| h.mu.Unlock() | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/handlers/ws/hub.go` around lines 69 - 81, The broadcast loop reading
from h.broadcast currently sends to the unbuffered h.unregister channel inside
the clients iteration, which can block and deadlock when a client's send buffer
is full; change the logic in the case handling of h.broadcast to collect dead
clients (e.g., append to a slice) when client.send is not writable instead of
sending to h.unregister inline, and after the iteration iterate that slice and
send each dead client to h.unregister (follow the pattern used in
BroadcastEventToTenant for guidance). Also fix the unregister handler by moving
platform.WSConnectionsActive.Dec() inside the if _, ok := h.clients[client]; ok
{ ... } block so the counter is decremented only when a client is actually
removed (preventing double-decrement when ReadPump's deferred Unregister fires).
- elastic_ip.go: retry only on unique constraint violations (IP collision), not on all errors. Fail fast on DB errors, network issues, etc. - coordinator/service.go: rename lerr → finalErr for clarity - hub_test.go: add TestHubConcurrentBroadcastAndUnregister race test Fixes #221, #285, #246 review feedback
- hub_test.go: use startCh channel to coordinate concurrent goroutines precisely instead of fixed sleep heuristic - elastic_ip.go: wrap max retries error with context so failure message clearly indicates IP collision after N attempts Note: coordinator Assemble race test deferred to issue #292
- hub_test.go: use startCh channel to coordinate concurrent goroutines precisely instead of fixed sleep heuristic - elastic_ip.go: wrap max retries error with context so failure message clearly indicates IP collision after N attempts Note: coordinator Assemble race test deferred to issue #292
- elastic_ip.go: replace string matching with errors.Is(err, errors.Conflict) for IP collision detection; add ctx.Err() check in retry loop; extract maxRetries to named constant maxElasticIPRetries - elastic_ip_repo.go: detect pgconn 23505 and return errors.Conflict - coordinator/service.go: remove redundant post-wg.Wait() mutex lock (Go Memory Model guarantees visibility of worker goroutine writes) - hub.go: add comment explaining safe unlock-then-send pattern
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
internal/handlers/ws/hub.go (1)
59-79:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winRouting full-buffer clients through
h.unregisterstill deadlocks the hub.Line 78 sends to an unbuffered
h.unregisterchannel whose only receiver is this sameRun()loop, so the goroutine blocks forever inside thebroadcastcase. The unlock/relock also leaves the activerange h.clientsopen to concurrent writes, and this path still inherits the unconditionalplatform.WSConnectionsActive.Dec()at Line 66, which double-decrements when a later unregister finds the client already removed.🐛 Suggested fix
case client := <-h.unregister: h.mu.Lock() if _, ok := h.clients[client]; ok { delete(h.clients, client) close(client.send) + platform.WSConnectionsActive.Dec() } h.mu.Unlock() - platform.WSConnectionsActive.Dec() h.logger.Debug("client disconnected", slog.Int("total", len(h.clients))) case message := <-h.broadcast: - h.mu.Lock() + var toRemove []*Client + h.mu.RLock() for client := range h.clients { select { case client.send <- message: default: - // Send to unregister channel outside the lock to avoid deadlock. - // The unregister handler processes serially, ensuring safe removal. - h.mu.Unlock() - h.unregister <- client - h.mu.Lock() + toRemove = append(toRemove, client) } } - h.mu.Unlock() + h.mu.RUnlock() + for _, client := range toRemove { + h.mu.Lock() + if _, ok := h.clients[client]; ok { + delete(h.clients, client) + close(client.send) + platform.WSConnectionsActive.Dec() + } + h.mu.Unlock() + }Run this to verify the self-send and unconditional decrement in the current tree:
#!/bin/bash set -euo pipefail sed -n '31,85p' internal/handlers/ws/hub.go🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/handlers/ws/hub.go` around lines 59 - 79, The broadcast path in Run() is deadlocking by sending the same goroutine on the unbuffered h.unregister channel and also can double-decrement platform.WSConnectionsActive; fix by avoiding in-loop self-sends and by ensuring decrement happens exactly once: when iterating h.clients in the broadcast case (the select that hits default for a full client.send), remove the client from h.clients and close(client.send) while holding h.mu (no unlock/relock inside the range), and record the client for post-loop bookkeeping if needed; then after the locked loop finish, perform any non-concurrent actions (like notifying external handlers) outside the lock but do NOT send the client back into h.unregister from the same goroutine — alternatively, spawn a goroutine to send to h.unregister or make h.unregister buffered if you must send from here; ensure platform.WSConnectionsActive.Dec() is called only once when the client is actually removed (e.g., where you delete from h.clients or in the centralized unregister handler), and reference the Run() switch cases, h.broadcast, h.unregister, h.clients, client.send, and platform.WSConnectionsActive.Dec() to locate the changes.
🧹 Nitpick comments (1)
internal/handlers/ws/hub_test.go (1)
69-75: ⚡ Quick winPlease keep a runnable regression test for the full-buffer broadcast path.
The existing tests never drive the
default:branch ininternal/handlers/ws/hub.goLines 72-79, so skipping this test means CI no longer exercises the exact broadcast/unregister interaction this PR changes. A deterministic version that pre-fillsclient.sendand uses channel coordination would give you coverage for the deadlock-prone path without relying on sleeps.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/handlers/ws/hub_test.go` around lines 69 - 75, Restore a deterministic regression test for the full-buffer broadcast path by updating TestHubConcurrentBroadcastAndUnregister to pre-fill a client's send channel buffer and coordinate goroutines with explicit sync channels instead of sleeps: create a client with a bounded send channel, push messages into client.send until it's full, start a goroutine that calls hub.Broadcast (or the Hub method that sends to client.send) which will hit the default: branch in internal/handlers/ws/hub.go, then concurrently call hub.Unregister and close the client's connection using synchronization channels so the broadcast goroutine runs while unregistering; assert the test finishes without deadlock and that the client was removed from Hub—use symbols TestHubConcurrentBroadcastAndUnregister, Hub.Broadcast/Unregister, and the client's send field to locate the code to change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/repositories/postgres/elastic_ip_repo.go`:
- Line 39: Replace the magic SQLSTATE literal "23505" with a named constant: add
a package-level constant like const uniqueViolationSQLState = "23505" (or
UniqueViolationSQLState) in the same package, then update the check that uses
pgErr.Code (the line with stdlib_errors.As(err, &pgErr) && pgErr.Code ==
"23505") to compare against that constant (pgErr.Code ==
uniqueViolationSQLState); keep the pgErr variable and existing error handling
intact so only the literal is removed and replaced by the new constant.
---
Duplicate comments:
In `@internal/handlers/ws/hub.go`:
- Around line 59-79: The broadcast path in Run() is deadlocking by sending the
same goroutine on the unbuffered h.unregister channel and also can
double-decrement platform.WSConnectionsActive; fix by avoiding in-loop
self-sends and by ensuring decrement happens exactly once: when iterating
h.clients in the broadcast case (the select that hits default for a full
client.send), remove the client from h.clients and close(client.send) while
holding h.mu (no unlock/relock inside the range), and record the client for
post-loop bookkeeping if needed; then after the locked loop finish, perform any
non-concurrent actions (like notifying external handlers) outside the lock but
do NOT send the client back into h.unregister from the same goroutine —
alternatively, spawn a goroutine to send to h.unregister or make h.unregister
buffered if you must send from here; ensure platform.WSConnectionsActive.Dec()
is called only once when the client is actually removed (e.g., where you delete
from h.clients or in the centralized unregister handler), and reference the
Run() switch cases, h.broadcast, h.unregister, h.clients, client.send, and
platform.WSConnectionsActive.Dec() to locate the changes.
---
Nitpick comments:
In `@internal/handlers/ws/hub_test.go`:
- Around line 69-75: Restore a deterministic regression test for the full-buffer
broadcast path by updating TestHubConcurrentBroadcastAndUnregister to pre-fill a
client's send channel buffer and coordinate goroutines with explicit sync
channels instead of sleeps: create a client with a bounded send channel, push
messages into client.send until it's full, start a goroutine that calls
hub.Broadcast (or the Hub method that sends to client.send) which will hit the
default: branch in internal/handlers/ws/hub.go, then concurrently call
hub.Unregister and close the client's connection using synchronization channels
so the broadcast goroutine runs while unregistering; assert the test finishes
without deadlock and that the client was removed from Hub—use symbols
TestHubConcurrentBroadcastAndUnregister, Hub.Broadcast/Unregister, and the
client's send field to locate the code to change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7e7d7a19-3315-405a-a0b8-6ca1eb656be5
📒 Files selected for processing (5)
internal/core/services/elastic_ip.gointernal/handlers/ws/hub.gointernal/handlers/ws/hub_test.gointernal/repositories/postgres/elastic_ip_repo.gointernal/storage/coordinator/service.go
✅ Files skipped from review due to trivial changes (1)
- internal/storage/coordinator/service.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/core/services/elastic_ip.go
- Replace t.Skip() with a deterministic test that pre-fills a client's send buffer and coordinates goroutines via explicit sync channels instead of time.Sleep heuristics. - Also fix hub.go broadcast case: remove send-to-unregister while holding lock (deadlock risk) and remove WSConnectionsActive.Dec() (double-decrement risk). Both are already addressed via BroadcastEventToTenant's direct removal path. - elastic_ip_repo.go already has uniqueViolationSQLState constant.
…ceful shutdown - auth.go (#280): add hard size limits (maxFailedAttemptsMap=1000, maxLockoutsMap=10000) for deterministic eviction when maps grow beyond threshold, supplementing the existing probabilistic purge - instance.go (#297): rollback vCPU and memory quota when enqueue fails after IncrementUsage (previously leaked quota on enqueue error) - gateway_e2e_test.go (#307): use defer to close response body for all code paths, not just on HTTP 200 - csi-driver/main.go (#283): wait for d.Stop() completion with 5s timeout before exiting main, preventing signal handler goroutine leak
… CSI graceful shutdown" This reverts commit d11bfbc.
Summary
successCountread without mutex incoordinator/service.go— fixed by copying shared variables while holding mutex afterwg.Wait()Hub.broadcastusedRLock()but calleddelete()on clients map — fixed by usingLock()and routing dead clients through unregister channel to prevent double-closerotationCacheread was already correctly protected; no code change neededTest plan
go test -race ./internal/storage/coordinator/...— okgo test -race ./internal/handlers/ws/...— okgo test -race ./internal/core/services/...— okFixes #221, #285, #246, #219
Summary by CodeRabbit
Bug Fixes
Tests