fix(cluster): fix goroutine leak in forking cluster invoker#3436
fix(cluster): fix goroutine leak in forking cluster invoker#3436Aias00 wants to merge 3 commits into
Conversation
The forking cluster invoker spawns multiple goroutines to invoke providers concurrently, but only consumes the first result via Poll(1, timeout). The remaining goroutines are never cancelled: 1. No context cancellation: the original ctx is passed to all forked goroutines, so even after the first result is returned, the other Invoke calls continue running indefinitely. 2. Queue never disposed: resultQ is never closed/disposed, so goroutines that complete after the first result still successfully Put into a queue that nobody reads from, and the queue itself is never cleaned up. 3. Excessive error logging: when the queue is eventually GC'd or when remaining goroutines try to Put after disposal, the error-level log creates noise in normal timeout scenarios. Fix: - Use context.WithCancel to create a cancellable forkCtx; defer cancel() so all forked goroutines are signaled to stop when Invoke returns. - Pass forkCtx instead of ctx to forked goroutines so their Invoke calls respect the cancellation. - Call resultQ.Dispose() immediately after Poll to release queue resources and make remaining goroutines' Put calls return ErrDisposed promptly. - Downgrade Put error log from Errorf to Debugf since ErrDisposed is an expected condition after the queue is disposed. Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds cancellation and queue disposal to the forking cluster invoker to reduce noisy logs and help forked goroutines exit sooner after an invocation completes.
Changes:
- Introduces a derived cancellable context (
forkCtx) for forked invocations. - Disposes the result queue after polling to force remaining
Putcalls to fail fast. - Downgrades
Putfailure logging to debug with a rationale comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| forkCtx, cancel := context.WithCancel(ctx) | ||
| defer cancel() // cancel forkCtx when Invoke returns, signaling all forked goroutines to stop |
| // ErrDisposed is expected when another goroutine's result was already | ||
| // consumed and the queue disposed; log at debug level to avoid noise. | ||
| logger.Debugf("[Cluster][Forking] resultQ put failed, the queue is probably disposed: %v", err) |
| @@ -72,16 +72,25 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro | |||
| } | |||
|
|
|||
| resultQ := queue.New(1) | |||
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## develop #3436 +/- ##
===========================================
+ Coverage 46.76% 53.58% +6.81%
===========================================
Files 295 493 +198
Lines 17172 38393 +21221
===========================================
+ Hits 8031 20571 +12540
- Misses 8287 16171 +7884
- Partials 854 1651 +797 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|



Problem
The forking cluster invoker (
cluster/cluster/forking/cluster_invoker.go) spawns multiple goroutines to invoke providers concurrently, but only consumes the first result viaPoll(1, timeout). The remaining goroutines are never cancelled, causing a goroutine leak:ctxis passed to all forked goroutines, so even after the first result is returned, the otherInvokecalls continue running indefinitely.resultQis never closed/disposed, so goroutines that complete after the first result still successfullyPutinto a queue that nobody reads from, and the queue itself is never cleaned up.Putafter disposal, the error-level log creates noise in normal timeout scenarios.Fix
context.WithCancelto create a cancellableforkCtx;defer cancel()so all forked goroutines are signaled to stop whenInvokereturns.forkCtxinstead ofctxto forked goroutines so theirInvokecalls respect the cancellation.resultQ.Dispose()immediately afterPollto release queue resources and make remaining goroutines'Putcalls returnErrDisposedpromptly.Puterror log fromErrorftoDebugfsinceErrDisposedis an expected condition after the queue is disposed.Testing
All existing tests pass with no regression:
Co-Authored-By: Claude noreply@anthropic.com