fix(remoting): use atomic.Bool for ExchangeClient.init to fix data race#3438
fix(remoting): use atomic.Bool for ExchangeClient.init to fix data race#3438Aias00 wants to merge 5 commits into
Conversation
The init field was a plain bool with a FIXME comment acknowledging the race. It is written in doInit/Close and read in Request/AsyncRequest/Send without synchronization. This causes a data race when multiple goroutines concurrently trigger lazy initialization. Replace init bool with uatomic.Bool (already imported in this file) and: - Use CAS in doInit to ensure only one goroutine performs Connect - Spin-wait for in-flight initialization to complete - Reset init flag on Connect failure so retries can succeed - Use atomic Store in Close This follows the same pattern used by BaseInvoker (uatomic.Bool), BaseClusterInvoker (atomic.Bool with CAS), and Directory (atomic.Bool with CAS + mutex) elsewhere in the codebase. Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR aims to make ExchangeClient initialization safe under concurrent access by switching the init flag to an atomic type and adding tests for concurrent initialization and retry behavior.
Changes:
- Replace
ExchangeClient.initfrombooltouatomic.Booland use atomic operations indoInit/Close. - Add a concurrency test to ensure only one
Connectoccurs under concurrentdoInitcalls. - Add a failure/retry test to ensure a failed
doInitresets state and can be retried.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| remoting/exchange_client.go | Converts init flag to atomic and adds CAS-based coordination for doInit, plus reset on failure/close. |
| remoting/exchange_client_test.go | Updates existing init assertion and adds concurrent-init + failure-retry tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (cl *ExchangeClient) doInit(url *common.URL) error { | ||
| if cl.init { | ||
| if cl.init.Load() { | ||
| return nil | ||
| } | ||
| // Use CompareAndSwap to ensure only one goroutine performs initialization. | ||
| // If CAS fails, another goroutine already initialized; spin-wait until init completes. | ||
| if !cl.init.CAS(false, true) { | ||
| // Another goroutine is initializing; wait for it to complete. | ||
| for !cl.init.Load() { | ||
| time.Sleep(10 * time.Millisecond) | ||
| } | ||
| return nil | ||
| } | ||
| // This goroutine won the CAS — perform the actual initialization. | ||
| if cl.client.Connect(url) != nil { | ||
| // retry for a while | ||
| time.Sleep(100 * time.Millisecond) | ||
| if cl.client.Connect(url) != nil { | ||
| logger.Errorf("[Remoting] failed to connect server, url=%v", url.Location) | ||
| cl.init.Store(false) // reset on failure so future calls can retry | ||
| return errors.New("Failed to connect server " + url.Location) | ||
| } | ||
| } | ||
| // FIXME atomic operation | ||
| cl.init = true | ||
| return nil | ||
| } |
| for !cl.init.Load() { | ||
| time.Sleep(10 * time.Millisecond) | ||
| } |
| for i := 0; i < goroutines; i++ { | ||
| go func() { | ||
| defer wg.Done() | ||
| // All goroutines trigger lazy init concurrently via Request-like paths | ||
| if err := ec.doInit(testURL()); err != nil { | ||
| t.Errorf("doInit failed: %v", err) | ||
| } | ||
| }() | ||
| } |
Address Copilot review feedback: - Replace uatomic.Bool with uatomic.Int32 for 3-state init tracking (0=uninitialized, 1=initializing, 2=initialized) to prevent callers from observing a partially-initialized state - Replace spin-wait with sync.Cond for proper coordination: waiters block until signaled instead of polling - All concurrent waiters now receive the same error result when init fails, rather than silently returning nil - Close() properly resets state and broadcasts to wake any waiters - Fix test: use buffered channel for concurrent error collection instead of calling t.Errorf from multiple goroutines Co-Authored-By: Claude <noreply@anthropic.com>
2f7d6cb to
8bb4ab5
Compare
Addressed Copilot Review FeedbackAll three review comments have been addressed in the latest commit (8bb4ab5): 1. Three-state atomic instead of two-state boolBefore: After: Replaced with
CAS now transitions 2. sync.Cond instead of spin-waitBefore: After: 3. Test: buffered channel instead of t.Errorf from goroutinesBefore: After: Errors are collected into a buffered channel and asserted from the main test goroutine after Additionally, all concurrent waiters now receive the same error result when initialization fails (previously they silently returned nil). |
testifylint requires require.Error/require.NoError instead of assert.Error/assert.NoError for error assertions. Co-Authored-By: Claude <noreply@anthropic.com>
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## develop #3438 +/- ##
===========================================
+ Coverage 46.76% 53.60% +6.83%
===========================================
Files 295 493 +198
Lines 17172 38411 +21239
===========================================
+ Hits 8031 20589 +12558
- Misses 8287 16171 +7884
- Partials 854 1651 +797 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|



Problem
ExchangeClient.initis a plainboolfield with a// FIXME atomic operationcomment acknowledging the race. It is written indoInit/Closeand read (indirectly viadoInit) inRequest/AsyncRequest/Sendwithout any synchronization.Race scenarios:
Request/AsyncRequest/Send→ all enterdoInit→ all seeinit == false→Connectis called multiple timesClose()writesinit = falseconcurrently withdoInitreading/writinginit→ memory visibility issueFix
Replace
init boolwithuatomic.Bool(already imported in this file asgo.uber.org/atomic), consistent with the existingactiveNum uatomic.Uint32field in the same struct:init bool→init uatomic.BoolactiveNum, already importeddoInitcheck-then-set →CAS(false, true)ConnectStore(false)Close→Store(false)This follows the same pattern used by
BaseInvoker(uatomic.Bool),BaseClusterInvoker(atomic.Boolwith CAS), andDirectory(atomic.Boolwith CAS + mutex) elsewhere in the codebase.Tests
TestExchangeClientConcurrentDoInit: 20 goroutines concurrently calldoInit— verifiesConnectis called exactly onceTestExchangeClientDoInitFailureResets: verifies faileddoInitresets the init flag and retry succeeds-raceflag enabled🤖 Generated with Claude Code