Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f12fb64
fix: use CompareAndDelete for race-free listenQueue cleanup
raballew Apr 15, 2026
1783ae0
fix: wrap listenQueue channel in struct to prevent stale cleanup race
raballew Apr 15, 2026
e310ebe
fix: retry CompareAndSwap on failure in concurrent Listen() reconnect
raballew Apr 15, 2026
53d992e
fix: give each Listen() goroutine its own channel to prevent stale re…
raballew Apr 15, 2026
191cbf2
fix: reject Dial send to superseded listenQueue via done channel pre-…
raballew Apr 15, 2026
523e7e6
fix: close done channel on all Listen exit paths using sync.Once
raballew Apr 15, 2026
ffa54d5
fix: swap defer order to close done channel before map deletion
raballew Apr 15, 2026
ad00133
Improve test quality for listen queue race fix
raballew Apr 15, 2026
b84ded9
fix: serialize Dial/Listen queue handoff with per-lease mutex
raballew Apr 16, 2026
578a1fd
fix: pass Dial context to sendToListener to prevent mutex deadlock un…
raballew Apr 16, 2026
e10b3bd
refactor: replace goto labels with drainChannel helper in test
raballew Apr 16, 2026
f60c36b
fix: clean up leaseLocks entries when Listen exits without replacement
raballew Apr 16, 2026
6c76f12
test: add end-to-end deadlock chain test
raballew Apr 16, 2026
d186eec
ci: enable race detector in controller test target
raballew Apr 16, 2026
ba706d8
fix: resolve golangci-lint goconst and gofmt violations
raballew Apr 16, 2026
44c2e5f
fix: use sendToListener in concurrent dial reconnection test
raballew Apr 16, 2026
eb7e0a5
fix: use service methods in listen queue tests instead of raw sync.Ma…
raballew Apr 16, 2026
df5f015
fix: serialize closeDone under lease mutex and stop deleting leaseLoc…
raballew Apr 16, 2026
739486d
fix: use non-blocking send in sendToListener to prevent mutex starvation
raballew Apr 16, 2026
2b4e57f
fix: replace raw lease mutex with ref-counted leaseLock to enable saf…
raballew Apr 16, 2026
6d15cdb
test: fix race in concurrent lease lock test and add overlapping list…
raballew Apr 16, 2026
63c9654
ci: add E2E stress test workflow for flakiness detection
raballew Apr 16, 2026
6786ccf
Revert "ci: add E2E stress test workflow for flakiness detection"
raballew Apr 16, 2026
94d3027
fix: drain buffered tokens on Listen supersession and assert gRPC sta…
raballew Apr 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controller/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ vet: ## Run go vet against code.

.PHONY: test
test: manifests generate fmt vet envtest ## Run tests.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $$(go list ./... | grep -v /e2e) -coverprofile cover.out
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $$(go list ./... | grep -v /e2e) -race -coverprofile cover.out

# Utilize Kind or modify the e2e tests to load the image locally, enabling compatibility with other vendors.
.PHONY: test-e2e # Run the e2e tests against a Kind k8s instance that is spun up.
Expand Down
119 changes: 112 additions & 7 deletions controller/internal/service/controller_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"slices"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/exp/maps"
Expand Down Expand Up @@ -80,6 +81,90 @@ type ControllerService struct {
ServerOptions []grpc.ServerOption
Router config.Router
listenQueues sync.Map
leaseLocks sync.Map
}

type listenQueue struct {
ch chan *pb.ListenResponse
done chan struct{}
closeOnce sync.Once
}

func (q *listenQueue) closeDone() {
q.closeOnce.Do(func() { close(q.done) })
}

type leaseLock struct {
mu sync.Mutex
refs int32
}

func (s *ControllerService) acquireLeaseLock(leaseName string) *sync.Mutex {
for {
v, loaded := s.leaseLocks.LoadOrStore(leaseName, &leaseLock{refs: 1})
ll := v.(*leaseLock)
if !loaded {
return &ll.mu
}
newRefs := atomic.AddInt32(&ll.refs, 1)
if newRefs <= 1 {
atomic.AddInt32(&ll.refs, -1)
continue
}
return &ll.mu
}
}

func (s *ControllerService) releaseLeaseLock(leaseName string) {
v, ok := s.leaseLocks.Load(leaseName)
if !ok {
return
}
ll := v.(*leaseLock)
if atomic.AddInt32(&ll.refs, -1) == 0 {
s.leaseLocks.CompareAndDelete(leaseName, ll)
}
}

// swapListenQueue atomically replaces the listen queue for a lease and signals
// the previous queue to stop. The per-lease lock serializes this with
// sendToListener so that Dial never sends a token to a superseded queue.
func (s *ControllerService) swapListenQueue(leaseName string, newQueue *listenQueue) {
mu := s.acquireLeaseLock(leaseName)
mu.Lock()
old, loaded := s.listenQueues.Swap(leaseName, newQueue)
if loaded {
old.(*listenQueue).closeDone()
}
mu.Unlock()
s.releaseLeaseLock(leaseName)
}

// sendToListener delivers a response to the active listener for a lease. The
// per-lease lock guarantees that the queue loaded here cannot be superseded
// between the load and the send, eliminating the TOCTOU race between Dial and
// a reconnecting Listen.
func (s *ControllerService) sendToListener(_ context.Context, leaseName string, response *pb.ListenResponse) error {
mu := s.acquireLeaseLock(leaseName)
defer s.releaseLeaseLock(leaseName)
mu.Lock()
defer mu.Unlock()
v, ok := s.listenQueues.Load(leaseName)
if !ok {
return status.Errorf(codes.Unavailable, "exporter is not listening on lease %s", leaseName)
}
q := v.(*listenQueue)
select {
case <-q.done:
return status.Errorf(codes.Unavailable, "exporter is not listening on lease %s", leaseName)
default:
}
select {
case q.ch <- response:
return nil
default:
return status.Errorf(codes.ResourceExhausted, "listener buffer full on lease %s", leaseName)
}
}

type wrappedStream struct {
Expand Down Expand Up @@ -439,12 +524,35 @@ func (s *ControllerService) Listen(req *pb.ListenRequest, stream pb.ControllerSe
return err
}

queue, _ := s.listenQueues.LoadOrStore(leaseName, make(chan *pb.ListenResponse, 8))
wrapper := &listenQueue{
ch: make(chan *pb.ListenResponse, 8),
done: make(chan struct{}),
}
listenMu := s.acquireLeaseLock(leaseName)
s.swapListenQueue(leaseName, wrapper)
defer func() {
listenMu.Lock()
wrapper.closeDone()
listenMu.Unlock()
s.listenQueues.CompareAndDelete(leaseName, wrapper)
s.releaseLeaseLock(leaseName)
}()
for {
select {
case <-ctx.Done():
return nil
case msg := <-queue.(chan *pb.ListenResponse):
case <-wrapper.done:
for {
select {
case msg := <-wrapper.ch:
if err := stream.Send(msg); err != nil {
return err
}
default:
return nil
}
}
case msg := <-wrapper.ch:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if err := stream.Send(msg); err != nil {
return err
}
Expand Down Expand Up @@ -732,11 +840,8 @@ func (s *ControllerService) Dial(ctx context.Context, req *pb.DialRequest) (*pb.
RouterToken: token,
}

queue, _ := s.listenQueues.LoadOrStore(leaseName, make(chan *pb.ListenResponse, 8))
select {
case <-ctx.Done():
return nil, ctx.Err()
case queue.(chan *pb.ListenResponse) <- response:
if err := s.sendToListener(ctx, leaseName, response); err != nil {
return nil, err
}

logger.Info("Client dial assigned stream", "stream", stream)
Expand Down
Loading
Loading