Skip to content

fix: resolve data races (issues #221, #285, #246)#379

Merged
poyrazK merged 9 commits intomainfrom
fix/data-races
May 2, 2026
Merged

fix: resolve data races (issues #221, #285, #246)#379
poyrazK merged 9 commits intomainfrom
fix/data-races

Conversation

@poyrazK
Copy link
Copy Markdown
Owner

@poyrazK poyrazK commented May 2, 2026

Summary

Test plan

  • go test -race ./internal/storage/coordinator/... — ok
  • go test -race ./internal/handlers/ws/... — ok
  • go test -race ./internal/core/services/... — ok

Fixes #221, #285, #246, #219

Summary by CodeRabbit

  • Bug Fixes

    • IP allocation now retries up to 3 times on allocation conflicts to reduce failures.
    • Broadcast handling improved to avoid dropping or closing stuck connections and to ensure proper cleanup/metrics.
    • Quorum checks strengthened to ensure correct visibility of concurrent results.
  • Tests

    • Added a skipped test documenting a known concurrent broadcast/unregister scenario and fixed a test helper termination issue.

- #221: Copy successCount/lastErr/size while holding mutex after wg.Wait()
  to prevent data race between goroutine writes and main goroutine read
- #285: Use Lock() instead of RLock() in broadcast case since we delete
  from h.clients map; route to unregister channel to avoid double-close
- #246: Retry on DB unique constraint conflict when allocating IP to
  handle theoretical race from deterministic IP generation

Fixes: #221, #285, #246, #219
Copilot AI review requested due to automatic review settings May 2, 2026 12:50
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 2, 2026

Warning

Rate limit exceeded

@poyrazK has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 56 minutes and 22 seconds before requesting another review.

To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5a1c33cc-55d0-44fb-9dd6-ed40e64a0d34

📥 Commits

Reviewing files that changed from the base of the PR and between f24c586 and 603a202.

📒 Files selected for processing (3)
  • internal/handlers/ws/hub.go
  • internal/handlers/ws/hub_test.go
  • internal/repositories/postgres/elastic_ip_repo.go
📝 Walkthrough

Walkthrough

Adds retry-on-conflict for Elastic IP allocation, routes WebSocket client removals through the unregister channel with a full mutex during broadcasts, and clarifies a post-wait visibility comment in coordinator quorum checking.

Changes

Elastic IP Allocation Retry

Layer / File(s) Summary
Data / Constants
internal/core/services/elastic_ip.go
Adds maxElasticIPRetries = 3.
Core Implementation
internal/core/services/elastic_ip.go
AllocateIP now regenerates id/ip per attempt, checks ctx.Err() before attempts, and wraps repo.Create in a retry loop storing the last conflict error; succeeds only on successful repo.Create.
Error Handling Integration
internal/repositories/postgres/elastic_ip_repo.go
Create detects Postgres unique-constraint (pgconn.PgError code 23505) and returns errors.Conflict("public IP already allocated") on violation to drive retry behavior.
Return / Logging
internal/core/services/elastic_ip.go
Audit logging and the successful return occur only after a successful create; after exhausting retries returns nil with a wrapped last conflict error.

WebSocket Broadcast and Client Removal

Layer / File(s) Summary
Locking Change
internal/handlers/ws/hub.go
Broadcast branch now uses h.mu.Lock() (not RLock) while iterating clients.
Client Removal Flow
internal/handlers/ws/hub.go
On full client.send buffer, code releases the lock and sends the client to h.unregister instead of closing the channel and deleting inline; iteration re-acquires the lock afterward.
Unregister Responsibility
internal/handlers/ws/hub.go
Client cleanup and platform.WSConnectionsActive decrement are handled by the h.unregister branch rather than inline during broadcast.

Coordinator Quorum-Check Clarification

Layer / File(s) Summary
Comment / Reasoning
internal/storage/coordinator/service.go
Adds a comment clarifying that after wg.Wait() goroutines have completed and shared variables are visible; quorum-check logic and return behavior are unchanged.

Tests / Flaky Test Skip

Layer / File(s) Summary
Test Addition / Adjustment
internal/handlers/ws/hub_test.go
Adds TestHubConcurrentBroadcastAndUnregister which unconditionally calls t.Skip(...) to avoid flaky concurrent test; fixes waitForCondition to end cleanly on timeout.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

I’m a rabbit who hops through code at night,
Retrying IPs until they land just right,
I nudge locked broadcasts to safely part,
And whisper to coordinators after goroutines depart,
Hooray — concurrent things behaving bright! 🐇✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main focus of the PR—resolving data races across multiple files and GitHub issues.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/data-races

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 56 minutes and 22 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
internal/storage/coordinator/service.go (1)

172-183: 💤 Low value

Post-wg.Wait() mutex lock is redundant.

After wg.Wait() returns, the Go Memory Model already guarantees visibility of all goroutine writes: within each goroutine, defers execute LIFO — mu.Unlock() (registered 2nd) fires before wg.Done() (registered 1st) — establishing the chain writes → mu.Unlock()wg.Done()wg.Wait() return. Reading successCount, lastErr, and size directly after wg.Wait() is therefore race-free without an additional lock.

♻️ Simplified quorum check
-	// 3. Quorum check - lock to safely read shared variables written by goroutines
-	mu.Lock()
-	count := successCount
-	lerr := lastErr
-	sz := size
-	mu.Unlock()
-
-	if count < c.writeQuorum {
-		return 0, fmt.Errorf("assemble quorum failed (%d/%d): %w", count, c.writeQuorum, lerr)
-	}
-
-	return sz, nil
+	// 3. Quorum check — safe to read directly; wg.Wait() establishes happens-before
+	if successCount < c.writeQuorum {
+		return 0, fmt.Errorf("assemble quorum failed (%d/%d): %w", successCount, c.writeQuorum, lastErr)
+	}
+
+	return size, nil
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/storage/coordinator/service.go` around lines 172 - 183, The
post-wg.Wait() mu.Lock()/Unlock() is redundant; after wg.Wait() goroutine writes
are visible so read successCount, lastErr and size directly without locking and
use them for the quorum check (compare count to c.writeQuorum and return
fmt.Errorf as before); keep existing mutex usage inside the worker goroutines
but remove the mu.Lock()/mu.Unlock() lines around the block that reads
successCount, lastErr and size following wg.Wait() to simplify and avoid
unnecessary locking.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/core/services/elastic_ip.go`:
- Line 61: Replace the local magic literal maxRetries := 3 with a package-level
named constant (e.g., const maxElasticIPRetries = 3) and use that constant where
maxRetries is referenced; declare the constant at the top of the package and
remove the local variable assignment in the function so code uses
maxElasticIPRetries instead of the unexplained literal in functions such as the
one that currently defines maxRetries.
- Around line 78-81: The retry loop in the service calling s.repo.Create in the
method in elastic_ip.go incorrectly retries on every database error and ignores
context cancellation; modify the loop to first check ctx.Err() at the top of
each iteration and return if cancelled, and then change the retry logic so it
only continues on a unique-constraint collision sentinel (e.g., an
errors.AlreadyExists or a wrapped *pgconn.PgError with Code == "23505") instead
of any error; to enable that, update ElasticIPRepository.Create (implemented in
internal/repositories/postgres/elastic_ip_repo.go) to preserve/expose the
underlying pg error (or return a distinct sentinel for 23505) so the service can
errors.As/compare and only retry on the 23505 condition and immediately return
for all other errors.

In `@internal/handlers/ws/hub.go`:
- Around line 69-81: The broadcast loop reading from h.broadcast currently sends
to the unbuffered h.unregister channel inside the clients iteration, which can
block and deadlock when a client's send buffer is full; change the logic in the
case handling of h.broadcast to collect dead clients (e.g., append to a slice)
when client.send is not writable instead of sending to h.unregister inline, and
after the iteration iterate that slice and send each dead client to h.unregister
(follow the pattern used in BroadcastEventToTenant for guidance). Also fix the
unregister handler by moving platform.WSConnectionsActive.Dec() inside the if _,
ok := h.clients[client]; ok { ... } block so the counter is decremented only
when a client is actually removed (preventing double-decrement when ReadPump's
deferred Unregister fires).

---

Nitpick comments:
In `@internal/storage/coordinator/service.go`:
- Around line 172-183: The post-wg.Wait() mu.Lock()/Unlock() is redundant; after
wg.Wait() goroutine writes are visible so read successCount, lastErr and size
directly without locking and use them for the quorum check (compare count to
c.writeQuorum and return fmt.Errorf as before); keep existing mutex usage inside
the worker goroutines but remove the mu.Lock()/mu.Unlock() lines around the
block that reads successCount, lastErr and size following wg.Wait() to simplify
and avoid unnecessary locking.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: f5e30f7f-ddee-4a66-ad7b-693a439a39d4

📥 Commits

Reviewing files that changed from the base of the PR and between 607ecce and c7fd509.

📒 Files selected for processing (3)
  • internal/core/services/elastic_ip.go
  • internal/handlers/ws/hub.go
  • internal/storage/coordinator/service.go

Comment thread internal/core/services/elastic_ip.go Outdated
Comment thread internal/core/services/elastic_ip.go
Comment on lines 69 to 81
case message := <-h.broadcast:
h.mu.RLock()
h.mu.Lock()
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
h.mu.Unlock()
h.unregister <- client
h.mu.Lock()
}
}
h.mu.RUnlock()
h.mu.Unlock()
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Deadlock: sending to unbuffered h.unregister channel from within broadcast handler blocks forever.

The h.unregister channel is unbuffered (line 35). Since the only receiver is the same goroutine's select statement, and we're already inside the case message := <-h.broadcast: block, the send at line 76 will block indefinitely waiting for a receiver that cannot run. This causes a deadlock whenever any client's send buffer is full during broadcast.

Collect dead clients in a slice and process them after the iteration, similar to the pattern used in BroadcastEventToTenant (lines 136-163).

🐛 Proposed fix
 		case message := <-h.broadcast:
-			h.mu.Lock()
+			var toRemove []*Client
+			h.mu.RLock()
 			for client := range h.clients {
 				select {
 				case client.send <- message:
 				default:
-					h.mu.Unlock()
-					h.unregister <- client
-					h.mu.Lock()
+					toRemove = append(toRemove, client)
 				}
 			}
-			h.mu.Unlock()
+			h.mu.RUnlock()
+			for _, client := range toRemove {
+				h.mu.Lock()
+				if _, ok := h.clients[client]; ok {
+					delete(h.clients, client)
+					close(client.send)
+					platform.WSConnectionsActive.Dec()
+				}
+				h.mu.Unlock()
+			}
 		}

Additionally, note that line 66 (platform.WSConnectionsActive.Dec()) in the unregister handler is outside the if _, ok := h.clients[client]; ok check. This means if a client is removed here and later ReadPump's deferred Unregister fires (per context snippet), the counter will be decremented twice. Move line 66 inside the if block:

 		case client := <-h.unregister:
 			h.mu.Lock()
 			if _, ok := h.clients[client]; ok {
 				delete(h.clients, client)
 				close(client.send)
+				platform.WSConnectionsActive.Dec()
 			}
 			h.mu.Unlock()
-			platform.WSConnectionsActive.Dec()
 			h.logger.Debug("client disconnected", slog.Int("total", len(h.clients)))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case message := <-h.broadcast:
h.mu.RLock()
h.mu.Lock()
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
h.mu.Unlock()
h.unregister <- client
h.mu.Lock()
}
}
h.mu.RUnlock()
h.mu.Unlock()
}
case message := <-h.broadcast:
var toRemove []*Client
h.mu.RLock()
for client := range h.clients {
select {
case client.send <- message:
default:
toRemove = append(toRemove, client)
}
}
h.mu.RUnlock()
for _, client := range toRemove {
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
platform.WSConnectionsActive.Dec()
}
h.mu.Unlock()
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/handlers/ws/hub.go` around lines 69 - 81, The broadcast loop reading
from h.broadcast currently sends to the unbuffered h.unregister channel inside
the clients iteration, which can block and deadlock when a client's send buffer
is full; change the logic in the case handling of h.broadcast to collect dead
clients (e.g., append to a slice) when client.send is not writable instead of
sending to h.unregister inline, and after the iteration iterate that slice and
send each dead client to h.unregister (follow the pattern used in
BroadcastEventToTenant for guidance). Also fix the unregister handler by moving
platform.WSConnectionsActive.Dec() inside the if _, ok := h.clients[client]; ok
{ ... } block so the counter is decremented only when a client is actually
removed (preventing double-decrement when ReadPump's deferred Unregister fires).

- elastic_ip.go: retry only on unique constraint violations (IP collision),
  not on all errors. Fail fast on DB errors, network issues, etc.
- coordinator/service.go: rename lerr → finalErr for clarity
- hub_test.go: add TestHubConcurrentBroadcastAndUnregister race test

Fixes #221, #285, #246 review feedback
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

poyrazK added 2 commits May 2, 2026 16:17
- hub_test.go: use startCh channel to coordinate concurrent goroutines
  precisely instead of fixed sleep heuristic
- elastic_ip.go: wrap max retries error with context so failure
  message clearly indicates IP collision after N attempts

Note: coordinator Assemble race test deferred to issue #292
- hub_test.go: use startCh channel to coordinate concurrent goroutines
  precisely instead of fixed sleep heuristic
- elastic_ip.go: wrap max retries error with context so failure
  message clearly indicates IP collision after N attempts

Note: coordinator Assemble race test deferred to issue #292
Copilot AI review requested due to automatic review settings May 2, 2026 13:37
- elastic_ip.go: replace string matching with errors.Is(err, errors.Conflict)
  for IP collision detection; add ctx.Err() check in retry loop; extract
  maxRetries to named constant maxElasticIPRetries
- elastic_ip_repo.go: detect pgconn 23505 and return errors.Conflict
- coordinator/service.go: remove redundant post-wg.Wait() mutex lock
  (Go Memory Model guarantees visibility of worker goroutine writes)
- hub.go: add comment explaining safe unlock-then-send pattern
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

The test causes CI timeouts because goroutines block on closed channel
after unregister. Issue #285 (concurrent map modification in Hub.broadcast)
is verified by the race detector on existing TestHubBroadcast and
TestHubRegisterUnregister tests. Skipping this test per issue #285.
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
internal/handlers/ws/hub.go (1)

59-79: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Routing full-buffer clients through h.unregister still deadlocks the hub.

Line 78 sends to an unbuffered h.unregister channel whose only receiver is this same Run() loop, so the goroutine blocks forever inside the broadcast case. The unlock/relock also leaves the active range h.clients open to concurrent writes, and this path still inherits the unconditional platform.WSConnectionsActive.Dec() at Line 66, which double-decrements when a later unregister finds the client already removed.

🐛 Suggested fix
 		case client := <-h.unregister:
 			h.mu.Lock()
 			if _, ok := h.clients[client]; ok {
 				delete(h.clients, client)
 				close(client.send)
+				platform.WSConnectionsActive.Dec()
 			}
 			h.mu.Unlock()
-			platform.WSConnectionsActive.Dec()
 			h.logger.Debug("client disconnected", slog.Int("total", len(h.clients)))

 		case message := <-h.broadcast:
-			h.mu.Lock()
+			var toRemove []*Client
+			h.mu.RLock()
 			for client := range h.clients {
 				select {
 				case client.send <- message:
 				default:
-					// Send to unregister channel outside the lock to avoid deadlock.
-					// The unregister handler processes serially, ensuring safe removal.
-					h.mu.Unlock()
-					h.unregister <- client
-					h.mu.Lock()
+					toRemove = append(toRemove, client)
 				}
 			}
-			h.mu.Unlock()
+			h.mu.RUnlock()
+			for _, client := range toRemove {
+				h.mu.Lock()
+				if _, ok := h.clients[client]; ok {
+					delete(h.clients, client)
+					close(client.send)
+					platform.WSConnectionsActive.Dec()
+				}
+				h.mu.Unlock()
+			}

Run this to verify the self-send and unconditional decrement in the current tree:

#!/bin/bash
set -euo pipefail

sed -n '31,85p' internal/handlers/ws/hub.go
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/handlers/ws/hub.go` around lines 59 - 79, The broadcast path in
Run() is deadlocking by sending the same goroutine on the unbuffered
h.unregister channel and also can double-decrement platform.WSConnectionsActive;
fix by avoiding in-loop self-sends and by ensuring decrement happens exactly
once: when iterating h.clients in the broadcast case (the select that hits
default for a full client.send), remove the client from h.clients and
close(client.send) while holding h.mu (no unlock/relock inside the range), and
record the client for post-loop bookkeeping if needed; then after the locked
loop finish, perform any non-concurrent actions (like notifying external
handlers) outside the lock but do NOT send the client back into h.unregister
from the same goroutine — alternatively, spawn a goroutine to send to
h.unregister or make h.unregister buffered if you must send from here; ensure
platform.WSConnectionsActive.Dec() is called only once when the client is
actually removed (e.g., where you delete from h.clients or in the centralized
unregister handler), and reference the Run() switch cases, h.broadcast,
h.unregister, h.clients, client.send, and platform.WSConnectionsActive.Dec() to
locate the changes.
🧹 Nitpick comments (1)
internal/handlers/ws/hub_test.go (1)

69-75: ⚡ Quick win

Please keep a runnable regression test for the full-buffer broadcast path.

The existing tests never drive the default: branch in internal/handlers/ws/hub.go Lines 72-79, so skipping this test means CI no longer exercises the exact broadcast/unregister interaction this PR changes. A deterministic version that pre-fills client.send and uses channel coordination would give you coverage for the deadlock-prone path without relying on sleeps.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/handlers/ws/hub_test.go` around lines 69 - 75, Restore a
deterministic regression test for the full-buffer broadcast path by updating
TestHubConcurrentBroadcastAndUnregister to pre-fill a client's send channel
buffer and coordinate goroutines with explicit sync channels instead of sleeps:
create a client with a bounded send channel, push messages into client.send
until it's full, start a goroutine that calls hub.Broadcast (or the Hub method
that sends to client.send) which will hit the default: branch in
internal/handlers/ws/hub.go, then concurrently call hub.Unregister and close the
client's connection using synchronization channels so the broadcast goroutine
runs while unregistering; assert the test finishes without deadlock and that the
client was removed from Hub—use symbols TestHubConcurrentBroadcastAndUnregister,
Hub.Broadcast/Unregister, and the client's send field to locate the code to
change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/repositories/postgres/elastic_ip_repo.go`:
- Line 39: Replace the magic SQLSTATE literal "23505" with a named constant: add
a package-level constant like const uniqueViolationSQLState = "23505" (or
UniqueViolationSQLState) in the same package, then update the check that uses
pgErr.Code (the line with stdlib_errors.As(err, &pgErr) && pgErr.Code ==
"23505") to compare against that constant (pgErr.Code ==
uniqueViolationSQLState); keep the pgErr variable and existing error handling
intact so only the literal is removed and replaced by the new constant.

---

Duplicate comments:
In `@internal/handlers/ws/hub.go`:
- Around line 59-79: The broadcast path in Run() is deadlocking by sending the
same goroutine on the unbuffered h.unregister channel and also can
double-decrement platform.WSConnectionsActive; fix by avoiding in-loop
self-sends and by ensuring decrement happens exactly once: when iterating
h.clients in the broadcast case (the select that hits default for a full
client.send), remove the client from h.clients and close(client.send) while
holding h.mu (no unlock/relock inside the range), and record the client for
post-loop bookkeeping if needed; then after the locked loop finish, perform any
non-concurrent actions (like notifying external handlers) outside the lock but
do NOT send the client back into h.unregister from the same goroutine —
alternatively, spawn a goroutine to send to h.unregister or make h.unregister
buffered if you must send from here; ensure platform.WSConnectionsActive.Dec()
is called only once when the client is actually removed (e.g., where you delete
from h.clients or in the centralized unregister handler), and reference the
Run() switch cases, h.broadcast, h.unregister, h.clients, client.send, and
platform.WSConnectionsActive.Dec() to locate the changes.

---

Nitpick comments:
In `@internal/handlers/ws/hub_test.go`:
- Around line 69-75: Restore a deterministic regression test for the full-buffer
broadcast path by updating TestHubConcurrentBroadcastAndUnregister to pre-fill a
client's send channel buffer and coordinate goroutines with explicit sync
channels instead of sleeps: create a client with a bounded send channel, push
messages into client.send until it's full, start a goroutine that calls
hub.Broadcast (or the Hub method that sends to client.send) which will hit the
default: branch in internal/handlers/ws/hub.go, then concurrently call
hub.Unregister and close the client's connection using synchronization channels
so the broadcast goroutine runs while unregistering; assert the test finishes
without deadlock and that the client was removed from Hub—use symbols
TestHubConcurrentBroadcastAndUnregister, Hub.Broadcast/Unregister, and the
client's send field to locate the code to change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7e7d7a19-3315-405a-a0b8-6ca1eb656be5

📥 Commits

Reviewing files that changed from the base of the PR and between c7fd509 and f24c586.

📒 Files selected for processing (5)
  • internal/core/services/elastic_ip.go
  • internal/handlers/ws/hub.go
  • internal/handlers/ws/hub_test.go
  • internal/repositories/postgres/elastic_ip_repo.go
  • internal/storage/coordinator/service.go
✅ Files skipped from review due to trivial changes (1)
  • internal/storage/coordinator/service.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/core/services/elastic_ip.go

Comment thread internal/repositories/postgres/elastic_ip_repo.go Outdated
- Replace t.Skip() with a deterministic test that pre-fills a
  client's send buffer and coordinates goroutines via explicit
  sync channels instead of time.Sleep heuristics.
- Also fix hub.go broadcast case: remove send-to-unregister while
  holding lock (deadlock risk) and remove WSConnectionsActive.Dec()
  (double-decrement risk). Both are already addressed via
  BroadcastEventToTenant's direct removal path.
- elastic_ip_repo.go already has uniqueViolationSQLState constant.
Copilot AI review requested due to automatic review settings May 2, 2026 16:09
…ceful shutdown

- auth.go (#280): add hard size limits (maxFailedAttemptsMap=1000,
  maxLockoutsMap=10000) for deterministic eviction when maps grow
  beyond threshold, supplementing the existing probabilistic purge
- instance.go (#297): rollback vCPU and memory quota when enqueue
  fails after IncrementUsage (previously leaked quota on enqueue error)
- gateway_e2e_test.go (#307): use defer to close response body for
  all code paths, not just on HTTP 200
- csi-driver/main.go (#283): wait for d.Stop() completion with 5s
  timeout before exiting main, preventing signal handler goroutine leak
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Copy Markdown
Owner Author

@poyrazK poyrazK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's okay to merge

@poyrazK poyrazK merged commit cf0fee5 into main May 2, 2026
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Data race: successCount read without mutex in coordinator/service.go

2 participants