feat: serialize dispatch per worker with inflight limits#98
feat: serialize dispatch per worker with inflight limits#98YuminosukeSato merged 5 commits intomainfrom
Conversation
Move activeCallsWG.Wait() inside the callsMu critical section to satisfy staticcheck SA2001 (empty critical section). The lock still serves as a barrier ensuring all in-progress Call() goroutines have completed their Add(1) before Wait() is called. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces per-worker serialization with configurable in-flight limits to prevent head-of-line blocking and improve concurrency control. The changes refactor the pool's backpressure mechanism to use both a global semaphore and per-worker gates, add comprehensive concurrency tests, and update documentation to reflect the new configuration option.
Changes:
- Introduced
MaxInFlightPerWorkerconfiguration parameter to control per-worker in-flight requests - Refactored pool backpressure to use global semaphore + per-worker gates for better concurrency control
- Added comprehensive pool concurrency tests covering serialization, oversubscription, and shutdown scenarios
- Standardized Unix socket test paths across all test files using a shared helper function
Reviewed changes
Copilot reviewed 27 out of 27 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| pkg/pyproc/config.go | Added MaxInFlightPerWorker field to PoolConfig with default value of 1 |
| pkg/pyproc/pool.go | Implemented per-worker serialization with inflightGate channels and worker acquisition logic |
| pkg/pyproc/pool_transport.go | Updated NewPoolWithTransport to initialize MaxInFlightPerWorker defaults and adjust semaphore size |
| pkg/pyproc/pool_test.go | Updated test helper to support new MaxInFlightPerWorker configuration |
| pkg/pyproc/pool_error_test.go | Added test coverage for MaxInFlightPerWorker default value validation |
| pkg/pyproc/pool_concurrency_test.go | Added comprehensive concurrency tests for worker serialization and backpressure |
| pkg/pyproc/testutil_test.go | Created shared helper function for generating unique Unix socket paths |
| pkg/pyproc/worker_external_test.go | Updated to use shared socket path helper function |
| pkg/pyproc/pool_external_test.go | Updated to use shared socket path helper function |
| pkg/pyproc/transport_uds_test.go | Replaced inline socket path generation with shared helper |
| pkg/pyproc/transport_multiplexed_test.go | Replaced hardcoded paths with shared helper function calls |
| pkg/pyproc/socket_hmac_test.go | Replaced inline socket path generation with shared helper |
| pkg/pyproc/connection_test.go | Replaced inline socket path generation with shared helper |
| pkg/pyproc/cancellation_test.go | Refactored createTestPool to support configurable MaxInFlightPerWorker |
| worker/python/README.md | Added MaxInFlightPerWorker to example configuration |
| docs/reference/operations.md | Updated documentation to clarify MaxInFlight and MaxInFlightPerWorker semantics |
| docs/reference/failure-behavior.md | Updated backpressure documentation with new concurrency model |
| docs/reference/architecture.md | Updated architecture documentation to describe dual-level backpressure mechanism |
| docs/ops.md | Updated operational guide with new configuration parameter |
| docs/index.md | Updated main documentation with MaxInFlightPerWorker |
| docs/guides/performance-tuning.md | Added MaxInFlightPerWorker to performance tuning guide |
| docs/getting-started/quick-start.md | Updated quick start guide with new configuration parameter |
| docs/deployment/troubleshooting.md | Added MaxInFlightPerWorker to troubleshooting examples |
| docs/deployment/monitoring.md | Updated monitoring example with MaxInFlightPerWorker |
| config.yaml | Updated default configuration with MaxInFlightPerWorker |
| README.md | Updated README with new configuration parameter |
| CHANGELOG.md | Documented breaking changes and new features |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| MaxInFlightPerWorker: 0, | ||
| }, | ||
| WorkerConfig: WorkerConfig{ | ||
| SocketPath: "/tmp/test.sock", |
There was a problem hiding this comment.
This test uses a hardcoded /tmp/test.sock path instead of the shared tempSocketPath() helper. Consider using the helper for consistency with other tests, even though this test doesn't actually create the socket.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 012cf3f121
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| p.callsMu.Lock() | ||
| p.activeCallsWG.Wait() | ||
| p.callsMu.Unlock() |
There was a problem hiding this comment.
Avoid unbounded wait before stopping workers
Shutdown now waits on activeCallsWG before closing connections or stopping workers, so any in-flight call that never returns on its own (for example, a handler stuck in Python while the caller used context.Background()) will block shutdown indefinitely. Because worker teardown happens only after this wait, shutdown has no mechanism to force those calls to fail and drain, which can hang process termination during deploys/incidents.
Useful? React with 👍 / 👎.
| defer func() { <-p.semaphore }() | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case <-p.shutdownCh: |
There was a problem hiding this comment.
Prevent semaphore waiters from continuing after shutdown
This select can admit queued calls after shutdown starts: once shutdownCh is closed, if semaphore capacity also becomes available, both branches are ready and Go chooses one pseudo-randomly, so some blocked callers proceed instead of returning pool is shut down. That makes shutdown behavior nondeterministic and can execute additional RPCs after shutdown has been initiated.
Useful? React with 👍 / 👎.
Summary
Combines PRs #90, #91, #92, #93 into a single PR targeting main.
Test plan
go test -race ./pkg/pyproc/passes🤖 Generated with Claude Code