From 43f4fdf143d8d12dba73a57d1a76e31b272a3685 Mon Sep 17 00:00:00 2001 From: SAY-5 Date: Thu, 30 Apr 2026 11:43:06 -0700 Subject: [PATCH] fix(internal): serialise SendRequest Add(1) with close-time Wait() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit failLeftRequestsWhenClose previously called incomingRequestsWG.Wait() without first stopping new SendRequest / SendRequestNoWait callers. Both callers did Add(1) before checking the connection state, so a caller that started concurrently with the close path could Add(1) at the moment Wait() was draining to zero, tripping Go's panic: sync: WaitGroup is reused before previous Wait has returned (reported in #1483 and seen in production close paths). Add a sync.RWMutex (incomingRequestsLock) that: * SendRequest / SendRequestNoWait take RLock, check state, Add(1), RUnlock — so multiple senders still proceed in parallel; and * failLeftRequestsWhenClose takes Lock, sets state=closed, Unlocks, then Wait()s. The exclusive section guarantees no in-flight Add(1) is straddling the Wait() and that any new caller arriving after the close sees state=closed and returns ErrConnectionClosed without touching the WaitGroup. Closes #1483. Signed-off-by: SAY-5 --- pulsar/internal/connection.go | 58 +++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index cbb21b6aed..2418a74dd7 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -159,11 +159,17 @@ type connection struct { log log.Logger - incomingRequestsWG sync.WaitGroup - incomingRequestsCh chan *request - closeCh chan struct{} - readyCh chan struct{} - writeRequestsCh chan *dataRequest + // incomingRequestsWG tracks in-flight SendRequest/SendRequestNoWait + // callers. incomingRequestsLock serialises Add(1) with the Wait() in + // failLeftRequestsWhenClose so a late SendRequest cannot Add to a + // WaitGroup whose counter is already draining (which would trip + // "sync: WaitGroup is reused before previous Wait has returned"). + incomingRequestsLock sync.RWMutex + incomingRequestsWG sync.WaitGroup + incomingRequestsCh chan *request + closeCh chan struct{} + readyCh chan struct{} + writeRequestsCh chan *dataRequest pendingLock sync.Mutex pendingReqs map[uint64]*request @@ -373,6 +379,14 @@ func (c *connection) waitUntilReady() error { } func (c *connection) failLeftRequestsWhenClose() { + // Stop new SendRequest/SendRequestNoWait callers from adding to the + // WaitGroup before draining the in-flight ones. Without this barrier, + // a concurrent Add(1) racing with Wait() reaching zero panics with + // "sync: WaitGroup is reused before previous Wait has returned". + c.incomingRequestsLock.Lock() + c.setStateClosed() + c.incomingRequestsLock.Unlock() + // wait for outstanding incoming requests to complete before draining // and closing the channel c.incomingRequestsWG.Wait() @@ -656,33 +670,37 @@ func (c *connection) checkServerError(err *pb.ServerError) { func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand, err error)) { + c.incomingRequestsLock.RLock() + if c.getState() == connectionClosed { + c.incomingRequestsLock.RUnlock() + callback(req, ErrConnectionClosed) + return + } c.incomingRequestsWG.Add(1) + c.incomingRequestsLock.RUnlock() defer c.incomingRequestsWG.Done() - if c.getState() == connectionClosed { + select { + case <-c.closeCh: callback(req, ErrConnectionClosed) - } else { - select { - case <-c.closeCh: - callback(req, ErrConnectionClosed) - - case c.incomingRequestsCh <- &request{ - id: &requestID, - cmd: req, - callback: callback, - }: - } + case c.incomingRequestsCh <- &request{ + id: &requestID, + cmd: req, + callback: callback, + }: } } func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error { - c.incomingRequestsWG.Add(1) - defer c.incomingRequestsWG.Done() - + c.incomingRequestsLock.RLock() if c.getState() == connectionClosed { + c.incomingRequestsLock.RUnlock() return ErrConnectionClosed } + c.incomingRequestsWG.Add(1) + c.incomingRequestsLock.RUnlock() + defer c.incomingRequestsWG.Done() select { case <-c.closeCh: