Skip to content

fix: serialize Dial/Listen queue handoff to prevent router token loss#573

Open
raballew wants to merge 24 commits intojumpstarter-dev:mainfrom
raballew:fix-listen-queue-race
Open

fix: serialize Dial/Listen queue handoff to prevent router token loss#573
raballew wants to merge 24 commits intojumpstarter-dev:mainfrom
raballew:fix-listen-queue-race

Conversation

@raballew
Copy link
Copy Markdown
Member

Summary

  • Fixes the TOCTOU race in listenQueues where a reconnecting Listen() goroutine and a dying one compete for the same channel, causing ~50% token loss rate
  • Wraps the raw channel in a listenQueue struct with a done channel and sync.Once for clean shutdown signaling
  • Introduces a per-lease mutex (leaseLock) that serializes queue swaps in Listen() and token sends in Dial(), eliminating the race window entirely
  • Uses non-blocking sends to prevent mutex starvation when the channel buffer is full
  • Adds ref-counted lease lock cleanup to prevent memory leaks in the leaseLocks sync.Map
  • Enables the race detector in the controller test target

Test plan

  • go test -race -v ./internal/service/ -run TestListenQueue passes
  • make e2e-run passes with label filter core
  • Stress test workflow confirms reduced flake rate

Fixes #572

Generated with Claude Code

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 16, 2026

Warning

Rate limit exceeded

@raballew has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 59 minutes and 57 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 59 minutes and 57 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: be6bc4ee-ca92-44c2-a794-42cc5aabc0b8

📥 Commits

Reviewing files that changed from the base of the PR and between d9715ed and 94d3027.

📒 Files selected for processing (3)
  • controller/Makefile
  • controller/internal/service/controller_service.go
  • controller/internal/service/controller_service_test.go
📝 Walkthrough

Walkthrough

The changes fix a race condition in the Dial/Listen handoff by introducing per-lease coordination with listenQueue wrappers, atomic queue swapping via swapListenQueue, and serialized sends via sendToListener. Additionally, the Makefile enables Go's race detector for non-e2e tests, and comprehensive concurrency tests validate the fix's correctness.

Changes

Cohort / File(s) Summary
Build Configuration
controller/Makefile
Added -race flag to Go test invocation to enable race detector during non-e2e test runs.
Core Service Logic
controller/internal/service/controller_service.go
Introduced per-lease synchronization via listenQueue wrapper (channel + done signal), refcounted leaseLock, and listenQueues map. Added swapListenQueue for atomic queue replacement and sendToListener for serialized token delivery with non-blocking sends. Updated Listen to manage queue lifecycle and cleanup; updated Dial to use new send function with proper error codes (Unavailable, ResourceExhausted).
Service Tests
controller/internal/service/controller_service_test.go
Replaced substring-check helper with strings.Contains. Added test utilities (testRouterToken, drainChannel) and extensive concurrency suite validating listenQueues behavior, queue supersession semantics, token delivery correctness under reconnects/swaps, race conditions, backpressure handling, and lease-lock refcount logic across 1475 new lines of tests.

Sequence Diagram

sequenceDiagram
    participant Client as Client (Dial)
    participant Controller as Controller Service
    participant Exporter as Exporter (Listen)
    participant LeaseMap as listenQueues<br/>(per-lease state)

    Note over Client,LeaseMap: OLD FLOW (Race Condition)
    Client->>LeaseMap: LoadOrStore queue
    Exporter->>LeaseMap: LoadOrStore queue (same chan)
    Client->>LeaseMap: Send token → channel
    Exporter->>Exporter: (old goroutine still reading)
    Exporter->>Exporter: Token lost to stale reader!
    Note over Client,LeaseMap: No cleanup, orphaned queue remains

    Note over Client,LeaseMap: NEW FLOW (Fixed)
    Exporter->>Controller: Listen starts, acquires leaseLock
    Controller->>LeaseMap: swapListenQueue (install new wrapper w/ done)
    Controller->>Controller: Close previous wrapper.done signal
    Client->>Controller: Dial → sendToListener (under lock)
    Controller->>Controller: Check active listener, serialize send
    Controller->>LeaseMap: Non-blocking send to active queue only
    Exporter->>Controller: Receive token from active queue
    Exporter->>Exporter: Drain buffered msgs after done closes
    Controller->>LeaseMap: CompareAndDelete on graceful exit
    Controller->>Controller: Release leaseLock refcount
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

Poem

🐰 A race was caught in the listen queue,
Where tokens vanished, old streams grew new.
Now swaps are atomic, locks hold true—
Per-lease coordination shines through,
No more lost hops, the handoff flew! 🎯

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately summarizes the main change: fixing serialization of Dial/Listen queue handoff to prevent router token loss, which is the primary objective.
Description check ✅ Passed The PR description is directly related to the changeset, clearly explaining the TOCTOU race fix, the listenQueue wrapper, per-lease mutex, ref-counted cleanup, and race detector enablement.
Linked Issues check ✅ Passed The PR fully addresses issue #572 by eliminating the Dial/Listen race through serialization with per-lease mutexes, listenQueue wrappers with done channels, non-blocking sends, and ref-counted cleanup.
Out of Scope Changes check ✅ Passed All changes are directly scoped to fixing the race condition: Makefile race detector enablement, listenQueue struct introduction, per-lease locking, and comprehensive race condition tests.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.github/workflows/e2e-stress.yaml:
- Around line 84-88: The workflow's stress matrix uses strategy.fail-fast: true
which cancels remaining shards when one fails; change strategy.fail-fast to
false so the matrix (strategy.matrix.run) completes all 100 runs and reports
every failing shard instead of stopping on the first failure. Locate the
strategy block containing fail-fast and the matrix.run array and update
fail-fast to false to ensure full-run flake hunting.

In `@controller/internal/service/controller_service_test.go`:
- Around line 1032-1059: Update the two tests to assert the gRPC error code is
codes.Unavailable by invoking svc.sendToListener and inspecting the returned
status: in TestListenQueueDialReturnsUnavailableWhenNoListener call
svc.sendToListener with leaseName "nonexistent-lease" and assert the error is
non-nil and that status.Code(err) == codes.Unavailable; in
TestListenQueueDialReturnsUnavailableWhenDoneClosed keep the existing setup
(swapListenQueue + closeDone) but replace the nil check with an assertion that
status.Code(err) == codes.Unavailable (use google.golang.org/grpc/status and
codes for the check).

In `@controller/internal/service/controller_service.go`:
- Around line 541-546: When wrapper.done is closed the current select can return
without processing already-queued tokens on wrapper.ch; modify the exit path
(the select handling in the loop around wrapper.ch) to drain wrapper.ch before
returning on wrapper.done or ctx.Done: when detecting wrapper.done (or ctx.Done)
is closed, enter a non-blocking loop that repeatedly reads from wrapper.ch and
calls sendToListener for each queued token until wrapper.ch is empty, then
return. Update the logic around the select that references wrapper.done and
wrapper.ch (the loop that invokes sendToListener and the Dial handoff code) to
ensure all pending messages are processed before exiting.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ab09502c-3654-4ed4-ad56-3491d3ddf4c7

📥 Commits

Reviewing files that changed from the base of the PR and between 74794de and 160c6c1.

📒 Files selected for processing (4)
  • .github/workflows/e2e-stress.yaml
  • controller/Makefile
  • controller/internal/service/controller_service.go
  • controller/internal/service/controller_service_test.go

Comment thread .github/workflows/e2e-stress.yaml Outdated
Comment on lines +84 to +88
strategy:
fail-fast: true
max-parallel: 20
matrix:
run: [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Disable fail-fast for a flake-hunting matrix.

With fail-fast: true, the first red shard cancels the remaining iterations, so this workflow stops telling you whether the bug reproduces once or 30 times. For a 100-run stress job, you want the full matrix outcome and all failing shards.

Suggested change
     strategy:
-      fail-fast: true
+      fail-fast: false
       max-parallel: 20
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
strategy:
fail-fast: true
max-parallel: 20
matrix:
run: [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100]
strategy:
fail-fast: false
max-parallel: 20
matrix:
run: [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.github/workflows/e2e-stress.yaml around lines 84 - 88, The workflow's
stress matrix uses strategy.fail-fast: true which cancels remaining shards when
one fails; change strategy.fail-fast to false so the matrix
(strategy.matrix.run) completes all 100 runs and reports every failing shard
instead of stopping on the first failure. Locate the strategy block containing
fail-fast and the matrix.run array and update fail-fast to false to ensure
full-run flake hunting.

Comment thread controller/internal/service/controller_service_test.go
Comment thread controller/internal/service/controller_service.go
raballew added a commit to raballew/jumpstarter that referenced this pull request Apr 16, 2026
…tus codes

Address CodeRabbit review feedback on PR jumpstarter-dev#573:

1. When wrapper.done fires in the Listen loop, drain wrapper.ch via a
   non-blocking loop calling stream.Send() for each queued token before
   returning. This prevents token loss when sendToListener enqueues a
   token just before swapListenQueue closes the done channel.

2. Update TestListenQueueDialReturnsUnavailableWhenNoListener to call
   sendToListener and assert status.Code(err) == codes.Unavailable
   instead of just checking the sync.Map directly.

3. Update TestListenQueueDialReturnsUnavailableWhenDoneClosed to assert
   status.Code(err) == codes.Unavailable instead of just err != nil.

4. Update TestListenQueueListenLoopDeliversTokensAndExitsOnDone to
   include drain logic matching the production code pattern.

5. Add TestListenQueueDrainsBufferedTokensOnSupersession and
   TestListenQueueListenLoopDrainsOnSupersession to verify the drain
   behavior.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
controller/internal/service/controller_service.go (1)

541-553: ⚠️ Potential issue | 🟠 Major

Drain buffered tokens when ctx.Done() races with supersession.

If ctx.Done() and wrapper.done are both ready here, the select can take Line 542 and return before the buffered handoff tokens are drained. That reopens the token-loss path this PR is trying to close: sendToListener can succeed, but the old listener still drops queued tokens during reconnect.

Possible fix
+	drainBuffered := func() error {
+		for {
+			select {
+			case msg := <-wrapper.ch:
+				if err := stream.Send(msg); err != nil {
+					return err
+				}
+			default:
+				return nil
+			}
+		}
+	}
+
 	for {
 		select {
 		case <-ctx.Done():
-			return nil
+			select {
+			case <-wrapper.done:
+				return drainBuffered()
+			default:
+				return nil
+			}
 		case <-wrapper.done:
-			for {
-				select {
-				case msg := <-wrapper.ch:
-					if err := stream.Send(msg); err != nil {
-						return err
-					}
-				default:
-					return nil
-				}
-			}
+			return drainBuffered()
 		case msg := <-wrapper.ch:
 			if err := stream.Send(msg); err != nil {
 				return err
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@controller/internal/service/controller_service.go` around lines 541 - 553,
The select between ctx.Done() and wrapper.done can pick ctx.Done() while
wrapper.done is also ready, causing buffered tokens in wrapper.ch to be lost;
change the logic so that when ctx.Done() is selected you first check
(non-blocking) whether wrapper.done is closed and, if so, drain wrapper.ch by
repeatedly receiving and calling stream.Send on each message (same behavior as
the wrapper.done branch) before returning; locate the code around the select
that references ctx.Done(), wrapper.done, wrapper.ch and stream.Send and
implement the non-blocking check-and-drain on wrapper.ch when ctx.Done() fires.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@controller/internal/service/controller_service_test.go`:
- Around line 824-903: The test
TestDialSendToListenerConcurrentWithSwapNeverLandsOnSuperseded currently treats
a non-nil error from svc.sendToListener as an acceptable "rejected" outcome;
change the test so any non-nil sendErr is treated as a test failure instead
(i.e., call t.Fatalf or t.Fatalf with context) to ensure sendToListener under
the per-lease mutex never returns an error in this race; update the branches
around sendResult <- svc.sendToListener(...) and the subsequent handling of
sendErr to assert sendErr == nil, referencing
TestDialSendToListenerConcurrentWithSwapNeverLandsOnSuperseded,
svc.sendToListener, and svc.swapListenQueue.
- Around line 327-348: The first subtest deletes the original listen queue
(wrapper) so the second subtest is running against a missing key; restore the
initial state before the second subtest by creating a fresh wrapper instance and
inserting it back into svc.listenQueues under leaseName (or re-store the
original wrapper) so that CompareAndDelete in the "queue survives when a
reconnecting Listen replaced it" case actually exercises stale-cleanup logic;
reference svc.listenQueues, wrapper, leaseName, swapListenQueue and
CompareAndDelete when making the change.

---

Duplicate comments:
In `@controller/internal/service/controller_service.go`:
- Around line 541-553: The select between ctx.Done() and wrapper.done can pick
ctx.Done() while wrapper.done is also ready, causing buffered tokens in
wrapper.ch to be lost; change the logic so that when ctx.Done() is selected you
first check (non-blocking) whether wrapper.done is closed and, if so, drain
wrapper.ch by repeatedly receiving and calling stream.Send on each message (same
behavior as the wrapper.done branch) before returning; locate the code around
the select that references ctx.Done(), wrapper.done, wrapper.ch and stream.Send
and implement the non-blocking check-and-drain on wrapper.ch when ctx.Done()
fires.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 7d8baf64-1f2a-4b0d-97a9-b3e19823f9db

📥 Commits

Reviewing files that changed from the base of the PR and between 160c6c1 and d9715ed.

📒 Files selected for processing (2)
  • controller/internal/service/controller_service.go
  • controller/internal/service/controller_service_test.go

Comment thread controller/internal/service/controller_service_test.go
Comment on lines +824 to +903
func TestDialSendToListenerConcurrentWithSwapNeverLandsOnSuperseded(t *testing.T) {
// Race swapListenQueue against sendToListener using goroutines.
// The per-lease mutex guarantees that the Load+send in sendToListener
// is atomic with respect to the Swap+closeDone in swapListenQueue.
// When sendToListener acquires the lock first, it sends to g1 (which
// is still current -- a valid send). When swapListenQueue acquires
// first, sendToListener sees g2 as the current queue.
//
// The invariant: if sendToListener returns nil, the done channel of the
// queue it sent to was NOT closed at the time of the send (guaranteed by
// the lock preventing concurrent swap+closeDone).
iterations := 500
sentToG1 := 0
sentToG2 := 0
rejected := 0

for i := 0; i < iterations; i++ {
svc := &ControllerService{}
leaseName := "test-lease-concurrent-serial"

g1 := &listenQueue{
ch: make(chan *pb.ListenResponse, 8),
done: make(chan struct{}),
}
svc.swapListenQueue(leaseName, g1)

g2 := &listenQueue{
ch: make(chan *pb.ListenResponse, 8),
done: make(chan struct{}),
}

swapDone := make(chan struct{})
sendResult := make(chan error, 1)

go func() {
defer close(swapDone)
svc.swapListenQueue(leaseName, g2)
}()
go func() {
sendResult <- svc.sendToListener(context.Background(), leaseName, &pb.ListenResponse{
RouterEndpoint: "ep", RouterToken: testRouterToken,
})
}()

<-swapDone
sendErr := <-sendResult

if sendErr != nil {
rejected++
continue
}

onG1 := false
select {
case <-g1.ch:
onG1 = true
sentToG1++
default:
}
onG2 := false
select {
case <-g2.ch:
onG2 = true
sentToG2++
default:
}

if !onG1 && !onG2 {
t.Fatalf("iteration %d: send succeeded but token is lost", i)
}
if onG1 && onG2 {
t.Fatalf("iteration %d: token duplicated across queues", i)
}
}

if sentToG1+sentToG2+rejected != iterations {
t.Fatalf("accounting error: g1=%d g2=%d rejected=%d total=%d",
sentToG1, sentToG2, rejected, sentToG1+sentToG2+rejected)
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Don’t allow errors in this concurrent handoff test.

With the per-lease mutex, this race should end with delivery to exactly one active queue (g1 or g2). Counting sendErr != nil as an accepted rejected outcome lets a regression to Unavailable/ResourceExhausted pass while this test still goes green.

Suggested adjustment
-	rejected := 0
-
 	for i := 0; i < iterations; i++ {
 		svc := &ControllerService{}
@@
 		<-swapDone
 		sendErr := <-sendResult

 		if sendErr != nil {
-			rejected++
-			continue
+			t.Fatalf("iteration %d: concurrent swap should still deliver to one active queue: %v", i, sendErr)
 		}
@@
-	if sentToG1+sentToG2+rejected != iterations {
-		t.Fatalf("accounting error: g1=%d g2=%d rejected=%d total=%d",
-			sentToG1, sentToG2, rejected, sentToG1+sentToG2+rejected)
+	if sentToG1+sentToG2 != iterations {
+		t.Fatalf("accounting error: g1=%d g2=%d total=%d",
+			sentToG1, sentToG2, sentToG1+sentToG2)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@controller/internal/service/controller_service_test.go` around lines 824 -
903, The test TestDialSendToListenerConcurrentWithSwapNeverLandsOnSuperseded
currently treats a non-nil error from svc.sendToListener as an acceptable
"rejected" outcome; change the test so any non-nil sendErr is treated as a test
failure instead (i.e., call t.Fatalf or t.Fatalf with context) to ensure
sendToListener under the per-lease mutex never returns an error in this race;
update the branches around sendResult <- svc.sendToListener(...) and the
subsequent handling of sendErr to assert sendErr == nil, referencing
TestDialSendToListenerConcurrentWithSwapNeverLandsOnSuperseded,
svc.sendToListener, and svc.swapListenQueue.

raballew and others added 24 commits April 17, 2026 12:41
Replace the missing queue cleanup in Listen() with a single
defer s.listenQueues.CompareAndDelete(leaseName, queue) call.

This fixes issue jumpstarter-dev#414 where a race between Listen() cleanup and Dial()
token delivery causes intermittent "Connection to exporter lost" errors
in E2E tests. CompareAndDelete only removes the queue if it is still the
same channel instance that this invocation created, so a reconnecting
exporter's new queue is never accidentally deleted by an old invocation's
deferred cleanup.

Compared to the timer-based approach in PR jumpstarter-dev#417, this solution:
- Eliminates the known race at timer expiry
- Requires no additional struct fields (listenTimers) or goroutines
- Has no timing-dependent test behavior

Generated-By: Forge/20260415_224144_3227186_20142bee
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When a reconnecting Listen() inherits the same channel via LoadOrStore,
the old Listen()'s deferred CompareAndDelete would succeed because both
hold the same channel reference, incorrectly deleting the map entry that
the reconnected Listen() depends on.

By wrapping the channel in a unique listenQueue struct per Listen() call
and using CompareAndSwap on reconnect, the old Listen()'s
CompareAndDelete becomes a no-op since the pointer identity no longer
matches.

Generated-By: Forge/20260415_224144_3227186_20142bee
When two Listen() goroutines execute concurrently for the same lease,
both attempt CompareAndSwap with the same stale reference. The loser's
CAS fails, leaving its wrapper unstored. If the winner exits first,
its CompareAndDelete removes the entry while the loser still reads
from it. Add a retry path that re-loads the current map value and
attempts CAS again, ensuring the surviving goroutine always owns the
map entry.

Generated-By: Forge/20260415_224144_3227186_20142bee
…ader race

Replace the shared-channel approach (LoadOrStore + CompareAndSwap) with
per-invocation channels and done-signaling. Each Listen() call creates a
fresh listenQueue with its own ch and done channels, atomically swaps it
into the sync.Map, and closes the previous entry's done channel to signal
the old goroutine to stop reading. Dial() now uses Load (not LoadOrStore)
to send tokens to the current listener only.

This eliminates the logical race where a stale goroutine with a broken
gRPC stream could consume and discard a dial token meant for the
reconnected goroutine.

Generated-By: Forge/20260415_235731_3329604_06ed4455
…check

Add a deterministic pre-check of q.done before the send select in Dial,
preventing tokens from being silently lost when sent to a queue whose
listener has been superseded. Also add a fallback case <-q.done in the
main select for races that occur between the pre-check and the send.

Generated-By: Forge/20260415_235731_3329604_06ed4455

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The done channel was only closed when a listener was superseded by a new
Listen call. On normal exit (context cancellation or stream error), done
was left open, making it an unreliable signal for Dial's done pre-check.
Use sync.Once to close done in a deferred call, ensuring it is always
closed exactly once regardless of exit path.

Generated-By: Forge/20260415_235731_3329604_06ed4455

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The defer statements in Listen() were ordered such that
CompareAndDelete ran before closeDone (LIFO). This created a
TOCTOU window where a concurrent Dial could load a queue reference,
pass the done check (done still open), and send to a dead queue.

Swapping the defer order ensures closeDone() runs first, so any
concurrent Dial that loaded the queue reference will see the closed
done channel and reject the send before the map entry is removed.

Generated-By: Forge/20260415_235731_3329604_06ed4455
Address review findings F001, F002, F005, F006, F007, F008:
- Add tests exercising listenQueue integration with actual struct behavior
- Rename misleading TestListenQueueConcurrentReadersAreNonDeterministic
- Add test for Dial returning Unavailable with no listener
- Add concurrent Dial-during-reconnection test
- Add context cancellation test
- Run tests with -race flag

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The done-channel approach has a TOCTOU race: if Dial loads a queue
reference before Listen reconnects, then both <-q.done and q.ch <-
response are ready in the select (buffered channel), and Go may
non-deterministically pick the send, delivering the token to a
superseded queue.

Add a per-lease mutex (leaseLocks sync.Map) that serializes
swapListenQueue (Swap + closeDone) with sendToListener (Load + check
+ send). This guarantees that the queue loaded in Dial cannot be
superseded during the send.

Also replace custom contains/searchSubstring helpers with
strings.Contains and add tests covering the stale-Dial scenario
with pre-swap queue references.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…der backpressure

When the listenQueue channel buffer is full, sendToListener blocks on
the channel send while holding the per-lease mutex. This prevents a
reconnecting Listen from acquiring the mutex to swap the queue, creating
a deadlock chain. Adding the Dial caller's context to the select allows
the blocked send to be cancelled when the Dial client disconnects,
releasing the mutex for the reconnecting Listen to proceed.

Generated-By: Forge/20260416_070332_3699740_7b2bda71

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extract channel draining logic from TestListenQueueConcurrentDialDuringReconnection
into a reusable drainChannel function, replacing the goto drained/g2drained pattern
with idiomatic Go.

Generated-By: Forge/20260416_073038_3739633_70e0127f
The leaseLocks sync.Map grew unboundedly because per-lease mutex entries
were never deleted. Now when Listen exits and CompareAndDelete successfully
removes the queue (meaning no new listener took over), the corresponding
leaseLocks entry is also deleted.

Generated-By: Forge/20260416_073038_3739633_70e0127f
Exercises the full deadlock-avoidance chain: buffer full, sendToListener
blocks holding the per-lease mutex, swapListenQueue blocks on the mutex,
context cancellation unblocks sendToListener, and swapListenQueue proceeds.

Generated-By: Forge/20260416_073038_3739633_70e0127f
Add -race flag to the go test invocation in the Makefile test target,
which is used by the controller-tests CI workflow. This ensures data
races are detected in CI.

Generated-By: Forge/20260416_073038_3739633_70e0127f
Extract repeated string literal "tok" into testRouterToken constant
and remove trailing blank line to satisfy golangci-lint checks.

Generated-By: Forge/20260416_075157_11117_6c67a5ce
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the manual Load+select TOCTOU pattern in
TestListenQueueConcurrentDialDuringReconnection with calls to
svc.sendToListener() and svc.swapListenQueue(), exercising the
actual production code path that serializes Dial with reconnecting
Listen via the per-lease mutex.

Generated-By: Forge/20260416_075157_11117_6c67a5ce
…p ops

Refactor tests to use swapListenQueue() and sendToListener() instead
of directly calling listenQueues.Store(), listenQueues.Swap(), and
close(done). This ensures tests exercise the actual production code
paths including per-lease mutex serialization, rather than testing a
different (pre-fix) code path that bypasses the TOCTOU protection.

Generated-By: Forge/20260416_075157_11117_6c67a5ce
…ks entries

The Listen cleanup path called closeDone() outside the per-lease mutex,
which allowed an in-flight sendToListener to see a partially-torn-down
queue. Worse, leaseLocks.Delete could remove the mutex while a concurrent
Listen or Dial still references it, breaking serialization guarantees.

Fix by acquiring the lease mutex before calling closeDone() in the
cleanup defer, and by never deleting leaseLocks entries (they are tiny
and bounded by the number of distinct lease names seen).

Also fix the stale-reader detection test to check done and ch
deterministically instead of relying on random select ordering.

Generated-By: Forge/20260416_105202_199878_b08a2035
Replace the blocking channel send in sendToListener with a non-blocking
send that returns ResourceExhausted when the listener buffer is full.
Previously, sendToListener held the per-lease mutex while blocking on a
full channel, which prevented swapListenQueue (reconnecting listeners)
and other Dial attempts from proceeding until the RPC context timed out.

Generated-By: Forge/20260416_105202_199878_b08a2035
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…e cleanup

The previous getLeaseLock returned raw *sync.Mutex pointers from a sync.Map
that was never cleaned up, leaking memory. Replace with acquireLeaseLock /
releaseLeaseLock using atomic ref counting so the map entry is removed when
the last listener releases. Ensure closeDone in the Listen defer runs under
the lease mutex for proper serialization.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ener test

Remove the racy counter increment from TestLeaseLockRefCountConcurrentAcquireRelease
since goroutines that acquire-release quickly may get different mutex instances.
Add TestLeaseLockRefCountConcurrentOverlappingListeners that uses a barrier to
ensure all goroutines hold a reference before using the mutex, matching the
real Listen lifecycle pattern.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Runs the E2E suite 100 times in parallel (max 20 concurrent) with
fail-fast enabled. Builds images once, then each matrix entry sets up
its own Kind cluster and runs the full test. Triggered manually via
workflow_dispatch only.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tus codes

Address CodeRabbit review feedback on PR jumpstarter-dev#573:

1. When wrapper.done fires in the Listen loop, drain wrapper.ch via a
   non-blocking loop calling stream.Send() for each queued token before
   returning. This prevents token loss when sendToListener enqueues a
   token just before swapListenQueue closes the done channel.

2. Update TestListenQueueDialReturnsUnavailableWhenNoListener to call
   sendToListener and assert status.Code(err) == codes.Unavailable
   instead of just checking the sync.Map directly.

3. Update TestListenQueueDialReturnsUnavailableWhenDoneClosed to assert
   status.Code(err) == codes.Unavailable instead of just err != nil.

4. Update TestListenQueueListenLoopDeliversTokensAndExitsOnDone to
   include drain logic matching the production code pattern.

5. Add TestListenQueueDrainsBufferedTokensOnSupersession and
   TestListenQueueListenLoopDrainsOnSupersession to verify the drain
   behavior.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@raballew raballew force-pushed the fix-listen-queue-race branch from d9715ed to 94d3027 Compare April 17, 2026 10:46
@raballew raballew enabled auto-merge (squash) April 17, 2026 18:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flaky E2E: listen queue race causes 'Connection to exporter lost' in Dial/Listen handoff

1 participant