fix: serialize Dial/Listen queue handoff to prevent router token loss#573
fix: serialize Dial/Listen queue handoff to prevent router token loss#573raballew wants to merge 24 commits intojumpstarter-dev:mainfrom
Conversation
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 59 minutes and 57 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughThe changes fix a race condition in the Dial/Listen handoff by introducing per-lease coordination with Changes
Sequence DiagramsequenceDiagram
participant Client as Client (Dial)
participant Controller as Controller Service
participant Exporter as Exporter (Listen)
participant LeaseMap as listenQueues<br/>(per-lease state)
Note over Client,LeaseMap: OLD FLOW (Race Condition)
Client->>LeaseMap: LoadOrStore queue
Exporter->>LeaseMap: LoadOrStore queue (same chan)
Client->>LeaseMap: Send token → channel
Exporter->>Exporter: (old goroutine still reading)
Exporter->>Exporter: Token lost to stale reader!
Note over Client,LeaseMap: No cleanup, orphaned queue remains
Note over Client,LeaseMap: NEW FLOW (Fixed)
Exporter->>Controller: Listen starts, acquires leaseLock
Controller->>LeaseMap: swapListenQueue (install new wrapper w/ done)
Controller->>Controller: Close previous wrapper.done signal
Client->>Controller: Dial → sendToListener (under lock)
Controller->>Controller: Check active listener, serialize send
Controller->>LeaseMap: Non-blocking send to active queue only
Exporter->>Controller: Receive token from active queue
Exporter->>Exporter: Drain buffered msgs after done closes
Controller->>LeaseMap: CompareAndDelete on graceful exit
Controller->>Controller: Release leaseLock refcount
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.github/workflows/e2e-stress.yaml:
- Around line 84-88: The workflow's stress matrix uses strategy.fail-fast: true
which cancels remaining shards when one fails; change strategy.fail-fast to
false so the matrix (strategy.matrix.run) completes all 100 runs and reports
every failing shard instead of stopping on the first failure. Locate the
strategy block containing fail-fast and the matrix.run array and update
fail-fast to false to ensure full-run flake hunting.
In `@controller/internal/service/controller_service_test.go`:
- Around line 1032-1059: Update the two tests to assert the gRPC error code is
codes.Unavailable by invoking svc.sendToListener and inspecting the returned
status: in TestListenQueueDialReturnsUnavailableWhenNoListener call
svc.sendToListener with leaseName "nonexistent-lease" and assert the error is
non-nil and that status.Code(err) == codes.Unavailable; in
TestListenQueueDialReturnsUnavailableWhenDoneClosed keep the existing setup
(swapListenQueue + closeDone) but replace the nil check with an assertion that
status.Code(err) == codes.Unavailable (use google.golang.org/grpc/status and
codes for the check).
In `@controller/internal/service/controller_service.go`:
- Around line 541-546: When wrapper.done is closed the current select can return
without processing already-queued tokens on wrapper.ch; modify the exit path
(the select handling in the loop around wrapper.ch) to drain wrapper.ch before
returning on wrapper.done or ctx.Done: when detecting wrapper.done (or ctx.Done)
is closed, enter a non-blocking loop that repeatedly reads from wrapper.ch and
calls sendToListener for each queued token until wrapper.ch is empty, then
return. Update the logic around the select that references wrapper.done and
wrapper.ch (the loop that invokes sendToListener and the Dial handoff code) to
ensure all pending messages are processed before exiting.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ab09502c-3654-4ed4-ad56-3491d3ddf4c7
📒 Files selected for processing (4)
.github/workflows/e2e-stress.yamlcontroller/Makefilecontroller/internal/service/controller_service.gocontroller/internal/service/controller_service_test.go
| strategy: | ||
| fail-fast: true | ||
| max-parallel: 20 | ||
| matrix: | ||
| run: [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100] |
There was a problem hiding this comment.
Disable fail-fast for a flake-hunting matrix.
With fail-fast: true, the first red shard cancels the remaining iterations, so this workflow stops telling you whether the bug reproduces once or 30 times. For a 100-run stress job, you want the full matrix outcome and all failing shards.
Suggested change
strategy:
- fail-fast: true
+ fail-fast: false
max-parallel: 20📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| strategy: | |
| fail-fast: true | |
| max-parallel: 20 | |
| matrix: | |
| run: [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100] | |
| strategy: | |
| fail-fast: false | |
| max-parallel: 20 | |
| matrix: | |
| run: [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100] |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.github/workflows/e2e-stress.yaml around lines 84 - 88, The workflow's
stress matrix uses strategy.fail-fast: true which cancels remaining shards when
one fails; change strategy.fail-fast to false so the matrix
(strategy.matrix.run) completes all 100 runs and reports every failing shard
instead of stopping on the first failure. Locate the strategy block containing
fail-fast and the matrix.run array and update fail-fast to false to ensure
full-run flake hunting.
…tus codes Address CodeRabbit review feedback on PR jumpstarter-dev#573: 1. When wrapper.done fires in the Listen loop, drain wrapper.ch via a non-blocking loop calling stream.Send() for each queued token before returning. This prevents token loss when sendToListener enqueues a token just before swapListenQueue closes the done channel. 2. Update TestListenQueueDialReturnsUnavailableWhenNoListener to call sendToListener and assert status.Code(err) == codes.Unavailable instead of just checking the sync.Map directly. 3. Update TestListenQueueDialReturnsUnavailableWhenDoneClosed to assert status.Code(err) == codes.Unavailable instead of just err != nil. 4. Update TestListenQueueListenLoopDeliversTokensAndExitsOnDone to include drain logic matching the production code pattern. 5. Add TestListenQueueDrainsBufferedTokensOnSupersession and TestListenQueueListenLoopDrainsOnSupersession to verify the drain behavior. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
controller/internal/service/controller_service.go (1)
541-553:⚠️ Potential issue | 🟠 MajorDrain buffered tokens when
ctx.Done()races with supersession.If
ctx.Done()andwrapper.doneare both ready here, the select can take Line 542 and return before the buffered handoff tokens are drained. That reopens the token-loss path this PR is trying to close:sendToListenercan succeed, but the old listener still drops queued tokens during reconnect.Possible fix
+ drainBuffered := func() error { + for { + select { + case msg := <-wrapper.ch: + if err := stream.Send(msg); err != nil { + return err + } + default: + return nil + } + } + } + for { select { case <-ctx.Done(): - return nil + select { + case <-wrapper.done: + return drainBuffered() + default: + return nil + } case <-wrapper.done: - for { - select { - case msg := <-wrapper.ch: - if err := stream.Send(msg); err != nil { - return err - } - default: - return nil - } - } + return drainBuffered() case msg := <-wrapper.ch: if err := stream.Send(msg); err != nil { return err }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@controller/internal/service/controller_service.go` around lines 541 - 553, The select between ctx.Done() and wrapper.done can pick ctx.Done() while wrapper.done is also ready, causing buffered tokens in wrapper.ch to be lost; change the logic so that when ctx.Done() is selected you first check (non-blocking) whether wrapper.done is closed and, if so, drain wrapper.ch by repeatedly receiving and calling stream.Send on each message (same behavior as the wrapper.done branch) before returning; locate the code around the select that references ctx.Done(), wrapper.done, wrapper.ch and stream.Send and implement the non-blocking check-and-drain on wrapper.ch when ctx.Done() fires.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@controller/internal/service/controller_service_test.go`:
- Around line 824-903: The test
TestDialSendToListenerConcurrentWithSwapNeverLandsOnSuperseded currently treats
a non-nil error from svc.sendToListener as an acceptable "rejected" outcome;
change the test so any non-nil sendErr is treated as a test failure instead
(i.e., call t.Fatalf or t.Fatalf with context) to ensure sendToListener under
the per-lease mutex never returns an error in this race; update the branches
around sendResult <- svc.sendToListener(...) and the subsequent handling of
sendErr to assert sendErr == nil, referencing
TestDialSendToListenerConcurrentWithSwapNeverLandsOnSuperseded,
svc.sendToListener, and svc.swapListenQueue.
- Around line 327-348: The first subtest deletes the original listen queue
(wrapper) so the second subtest is running against a missing key; restore the
initial state before the second subtest by creating a fresh wrapper instance and
inserting it back into svc.listenQueues under leaseName (or re-store the
original wrapper) so that CompareAndDelete in the "queue survives when a
reconnecting Listen replaced it" case actually exercises stale-cleanup logic;
reference svc.listenQueues, wrapper, leaseName, swapListenQueue and
CompareAndDelete when making the change.
---
Duplicate comments:
In `@controller/internal/service/controller_service.go`:
- Around line 541-553: The select between ctx.Done() and wrapper.done can pick
ctx.Done() while wrapper.done is also ready, causing buffered tokens in
wrapper.ch to be lost; change the logic so that when ctx.Done() is selected you
first check (non-blocking) whether wrapper.done is closed and, if so, drain
wrapper.ch by repeatedly receiving and calling stream.Send on each message (same
behavior as the wrapper.done branch) before returning; locate the code around
the select that references ctx.Done(), wrapper.done, wrapper.ch and stream.Send
and implement the non-blocking check-and-drain on wrapper.ch when ctx.Done()
fires.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7d8baf64-1f2a-4b0d-97a9-b3e19823f9db
📒 Files selected for processing (2)
controller/internal/service/controller_service.gocontroller/internal/service/controller_service_test.go
| func TestDialSendToListenerConcurrentWithSwapNeverLandsOnSuperseded(t *testing.T) { | ||
| // Race swapListenQueue against sendToListener using goroutines. | ||
| // The per-lease mutex guarantees that the Load+send in sendToListener | ||
| // is atomic with respect to the Swap+closeDone in swapListenQueue. | ||
| // When sendToListener acquires the lock first, it sends to g1 (which | ||
| // is still current -- a valid send). When swapListenQueue acquires | ||
| // first, sendToListener sees g2 as the current queue. | ||
| // | ||
| // The invariant: if sendToListener returns nil, the done channel of the | ||
| // queue it sent to was NOT closed at the time of the send (guaranteed by | ||
| // the lock preventing concurrent swap+closeDone). | ||
| iterations := 500 | ||
| sentToG1 := 0 | ||
| sentToG2 := 0 | ||
| rejected := 0 | ||
|
|
||
| for i := 0; i < iterations; i++ { | ||
| svc := &ControllerService{} | ||
| leaseName := "test-lease-concurrent-serial" | ||
|
|
||
| g1 := &listenQueue{ | ||
| ch: make(chan *pb.ListenResponse, 8), | ||
| done: make(chan struct{}), | ||
| } | ||
| svc.swapListenQueue(leaseName, g1) | ||
|
|
||
| g2 := &listenQueue{ | ||
| ch: make(chan *pb.ListenResponse, 8), | ||
| done: make(chan struct{}), | ||
| } | ||
|
|
||
| swapDone := make(chan struct{}) | ||
| sendResult := make(chan error, 1) | ||
|
|
||
| go func() { | ||
| defer close(swapDone) | ||
| svc.swapListenQueue(leaseName, g2) | ||
| }() | ||
| go func() { | ||
| sendResult <- svc.sendToListener(context.Background(), leaseName, &pb.ListenResponse{ | ||
| RouterEndpoint: "ep", RouterToken: testRouterToken, | ||
| }) | ||
| }() | ||
|
|
||
| <-swapDone | ||
| sendErr := <-sendResult | ||
|
|
||
| if sendErr != nil { | ||
| rejected++ | ||
| continue | ||
| } | ||
|
|
||
| onG1 := false | ||
| select { | ||
| case <-g1.ch: | ||
| onG1 = true | ||
| sentToG1++ | ||
| default: | ||
| } | ||
| onG2 := false | ||
| select { | ||
| case <-g2.ch: | ||
| onG2 = true | ||
| sentToG2++ | ||
| default: | ||
| } | ||
|
|
||
| if !onG1 && !onG2 { | ||
| t.Fatalf("iteration %d: send succeeded but token is lost", i) | ||
| } | ||
| if onG1 && onG2 { | ||
| t.Fatalf("iteration %d: token duplicated across queues", i) | ||
| } | ||
| } | ||
|
|
||
| if sentToG1+sentToG2+rejected != iterations { | ||
| t.Fatalf("accounting error: g1=%d g2=%d rejected=%d total=%d", | ||
| sentToG1, sentToG2, rejected, sentToG1+sentToG2+rejected) | ||
| } | ||
| } |
There was a problem hiding this comment.
Don’t allow errors in this concurrent handoff test.
With the per-lease mutex, this race should end with delivery to exactly one active queue (g1 or g2). Counting sendErr != nil as an accepted rejected outcome lets a regression to Unavailable/ResourceExhausted pass while this test still goes green.
Suggested adjustment
- rejected := 0
-
for i := 0; i < iterations; i++ {
svc := &ControllerService{}
@@
<-swapDone
sendErr := <-sendResult
if sendErr != nil {
- rejected++
- continue
+ t.Fatalf("iteration %d: concurrent swap should still deliver to one active queue: %v", i, sendErr)
}
@@
- if sentToG1+sentToG2+rejected != iterations {
- t.Fatalf("accounting error: g1=%d g2=%d rejected=%d total=%d",
- sentToG1, sentToG2, rejected, sentToG1+sentToG2+rejected)
+ if sentToG1+sentToG2 != iterations {
+ t.Fatalf("accounting error: g1=%d g2=%d total=%d",
+ sentToG1, sentToG2, sentToG1+sentToG2)
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@controller/internal/service/controller_service_test.go` around lines 824 -
903, The test TestDialSendToListenerConcurrentWithSwapNeverLandsOnSuperseded
currently treats a non-nil error from svc.sendToListener as an acceptable
"rejected" outcome; change the test so any non-nil sendErr is treated as a test
failure instead (i.e., call t.Fatalf or t.Fatalf with context) to ensure
sendToListener under the per-lease mutex never returns an error in this race;
update the branches around sendResult <- svc.sendToListener(...) and the
subsequent handling of sendErr to assert sendErr == nil, referencing
TestDialSendToListenerConcurrentWithSwapNeverLandsOnSuperseded,
svc.sendToListener, and svc.swapListenQueue.
Replace the missing queue cleanup in Listen() with a single defer s.listenQueues.CompareAndDelete(leaseName, queue) call. This fixes issue jumpstarter-dev#414 where a race between Listen() cleanup and Dial() token delivery causes intermittent "Connection to exporter lost" errors in E2E tests. CompareAndDelete only removes the queue if it is still the same channel instance that this invocation created, so a reconnecting exporter's new queue is never accidentally deleted by an old invocation's deferred cleanup. Compared to the timer-based approach in PR jumpstarter-dev#417, this solution: - Eliminates the known race at timer expiry - Requires no additional struct fields (listenTimers) or goroutines - Has no timing-dependent test behavior Generated-By: Forge/20260415_224144_3227186_20142bee Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When a reconnecting Listen() inherits the same channel via LoadOrStore, the old Listen()'s deferred CompareAndDelete would succeed because both hold the same channel reference, incorrectly deleting the map entry that the reconnected Listen() depends on. By wrapping the channel in a unique listenQueue struct per Listen() call and using CompareAndSwap on reconnect, the old Listen()'s CompareAndDelete becomes a no-op since the pointer identity no longer matches. Generated-By: Forge/20260415_224144_3227186_20142bee
When two Listen() goroutines execute concurrently for the same lease, both attempt CompareAndSwap with the same stale reference. The loser's CAS fails, leaving its wrapper unstored. If the winner exits first, its CompareAndDelete removes the entry while the loser still reads from it. Add a retry path that re-loads the current map value and attempts CAS again, ensuring the surviving goroutine always owns the map entry. Generated-By: Forge/20260415_224144_3227186_20142bee
…ader race Replace the shared-channel approach (LoadOrStore + CompareAndSwap) with per-invocation channels and done-signaling. Each Listen() call creates a fresh listenQueue with its own ch and done channels, atomically swaps it into the sync.Map, and closes the previous entry's done channel to signal the old goroutine to stop reading. Dial() now uses Load (not LoadOrStore) to send tokens to the current listener only. This eliminates the logical race where a stale goroutine with a broken gRPC stream could consume and discard a dial token meant for the reconnected goroutine. Generated-By: Forge/20260415_235731_3329604_06ed4455
…check Add a deterministic pre-check of q.done before the send select in Dial, preventing tokens from being silently lost when sent to a queue whose listener has been superseded. Also add a fallback case <-q.done in the main select for races that occur between the pre-check and the send. Generated-By: Forge/20260415_235731_3329604_06ed4455 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The done channel was only closed when a listener was superseded by a new Listen call. On normal exit (context cancellation or stream error), done was left open, making it an unreliable signal for Dial's done pre-check. Use sync.Once to close done in a deferred call, ensuring it is always closed exactly once regardless of exit path. Generated-By: Forge/20260415_235731_3329604_06ed4455 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The defer statements in Listen() were ordered such that CompareAndDelete ran before closeDone (LIFO). This created a TOCTOU window where a concurrent Dial could load a queue reference, pass the done check (done still open), and send to a dead queue. Swapping the defer order ensures closeDone() runs first, so any concurrent Dial that loaded the queue reference will see the closed done channel and reject the send before the map entry is removed. Generated-By: Forge/20260415_235731_3329604_06ed4455
Address review findings F001, F002, F005, F006, F007, F008: - Add tests exercising listenQueue integration with actual struct behavior - Rename misleading TestListenQueueConcurrentReadersAreNonDeterministic - Add test for Dial returning Unavailable with no listener - Add concurrent Dial-during-reconnection test - Add context cancellation test - Run tests with -race flag Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The done-channel approach has a TOCTOU race: if Dial loads a queue reference before Listen reconnects, then both <-q.done and q.ch <- response are ready in the select (buffered channel), and Go may non-deterministically pick the send, delivering the token to a superseded queue. Add a per-lease mutex (leaseLocks sync.Map) that serializes swapListenQueue (Swap + closeDone) with sendToListener (Load + check + send). This guarantees that the queue loaded in Dial cannot be superseded during the send. Also replace custom contains/searchSubstring helpers with strings.Contains and add tests covering the stale-Dial scenario with pre-swap queue references. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…der backpressure When the listenQueue channel buffer is full, sendToListener blocks on the channel send while holding the per-lease mutex. This prevents a reconnecting Listen from acquiring the mutex to swap the queue, creating a deadlock chain. Adding the Dial caller's context to the select allows the blocked send to be cancelled when the Dial client disconnects, releasing the mutex for the reconnecting Listen to proceed. Generated-By: Forge/20260416_070332_3699740_7b2bda71 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extract channel draining logic from TestListenQueueConcurrentDialDuringReconnection into a reusable drainChannel function, replacing the goto drained/g2drained pattern with idiomatic Go. Generated-By: Forge/20260416_073038_3739633_70e0127f
The leaseLocks sync.Map grew unboundedly because per-lease mutex entries were never deleted. Now when Listen exits and CompareAndDelete successfully removes the queue (meaning no new listener took over), the corresponding leaseLocks entry is also deleted. Generated-By: Forge/20260416_073038_3739633_70e0127f
Exercises the full deadlock-avoidance chain: buffer full, sendToListener blocks holding the per-lease mutex, swapListenQueue blocks on the mutex, context cancellation unblocks sendToListener, and swapListenQueue proceeds. Generated-By: Forge/20260416_073038_3739633_70e0127f
Add -race flag to the go test invocation in the Makefile test target, which is used by the controller-tests CI workflow. This ensures data races are detected in CI. Generated-By: Forge/20260416_073038_3739633_70e0127f
Extract repeated string literal "tok" into testRouterToken constant and remove trailing blank line to satisfy golangci-lint checks. Generated-By: Forge/20260416_075157_11117_6c67a5ce Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the manual Load+select TOCTOU pattern in TestListenQueueConcurrentDialDuringReconnection with calls to svc.sendToListener() and svc.swapListenQueue(), exercising the actual production code path that serializes Dial with reconnecting Listen via the per-lease mutex. Generated-By: Forge/20260416_075157_11117_6c67a5ce
…p ops Refactor tests to use swapListenQueue() and sendToListener() instead of directly calling listenQueues.Store(), listenQueues.Swap(), and close(done). This ensures tests exercise the actual production code paths including per-lease mutex serialization, rather than testing a different (pre-fix) code path that bypasses the TOCTOU protection. Generated-By: Forge/20260416_075157_11117_6c67a5ce
…ks entries The Listen cleanup path called closeDone() outside the per-lease mutex, which allowed an in-flight sendToListener to see a partially-torn-down queue. Worse, leaseLocks.Delete could remove the mutex while a concurrent Listen or Dial still references it, breaking serialization guarantees. Fix by acquiring the lease mutex before calling closeDone() in the cleanup defer, and by never deleting leaseLocks entries (they are tiny and bounded by the number of distinct lease names seen). Also fix the stale-reader detection test to check done and ch deterministically instead of relying on random select ordering. Generated-By: Forge/20260416_105202_199878_b08a2035
Replace the blocking channel send in sendToListener with a non-blocking send that returns ResourceExhausted when the listener buffer is full. Previously, sendToListener held the per-lease mutex while blocking on a full channel, which prevented swapListenQueue (reconnecting listeners) and other Dial attempts from proceeding until the RPC context timed out. Generated-By: Forge/20260416_105202_199878_b08a2035 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…e cleanup The previous getLeaseLock returned raw *sync.Mutex pointers from a sync.Map that was never cleaned up, leaking memory. Replace with acquireLeaseLock / releaseLeaseLock using atomic ref counting so the map entry is removed when the last listener releases. Ensure closeDone in the Listen defer runs under the lease mutex for proper serialization. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ener test Remove the racy counter increment from TestLeaseLockRefCountConcurrentAcquireRelease since goroutines that acquire-release quickly may get different mutex instances. Add TestLeaseLockRefCountConcurrentOverlappingListeners that uses a barrier to ensure all goroutines hold a reference before using the mutex, matching the real Listen lifecycle pattern. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Runs the E2E suite 100 times in parallel (max 20 concurrent) with fail-fast enabled. Builds images once, then each matrix entry sets up its own Kind cluster and runs the full test. Triggered manually via workflow_dispatch only. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This reverts commit 160c6c1.
…tus codes Address CodeRabbit review feedback on PR jumpstarter-dev#573: 1. When wrapper.done fires in the Listen loop, drain wrapper.ch via a non-blocking loop calling stream.Send() for each queued token before returning. This prevents token loss when sendToListener enqueues a token just before swapListenQueue closes the done channel. 2. Update TestListenQueueDialReturnsUnavailableWhenNoListener to call sendToListener and assert status.Code(err) == codes.Unavailable instead of just checking the sync.Map directly. 3. Update TestListenQueueDialReturnsUnavailableWhenDoneClosed to assert status.Code(err) == codes.Unavailable instead of just err != nil. 4. Update TestListenQueueListenLoopDeliversTokensAndExitsOnDone to include drain logic matching the production code pattern. 5. Add TestListenQueueDrainsBufferedTokensOnSupersession and TestListenQueueListenLoopDrainsOnSupersession to verify the drain behavior. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
d9715ed to
94d3027
Compare
Summary
listenQueueswhere a reconnectingListen()goroutine and a dying one compete for the same channel, causing ~50% token loss ratelistenQueuestruct with adonechannel andsync.Oncefor clean shutdown signalingleaseLock) that serializes queue swaps inListen()and token sends inDial(), eliminating the race window entirelyleaseLockssync.MapTest plan
go test -race -v ./internal/service/ -run TestListenQueuepassesmake e2e-runpasses with label filtercoreFixes #572
Generated with Claude Code