Skip to content

feat: serialize dispatch per worker with inflight limits#98

Merged
YuminosukeSato merged 5 commits intomainfrom
feat/dispatch-serialization
Feb 6, 2026
Merged

feat: serialize dispatch per worker with inflight limits#98
YuminosukeSato merged 5 commits intomainfrom
feat/dispatch-serialization

Conversation

@YuminosukeSato
Copy link
Owner

Summary

Combines PRs #90, #91, #92, #93 into a single PR targeting main.

Test plan

  • go test -race ./pkg/pyproc/ passes
  • golangci-lint clean (SA2001 fix included)
  • All ExternalWorker and Pool tests pass

🤖 Generated with Claude Code

YuminosukeSato and others added 5 commits February 7, 2026 00:21
Move activeCallsWG.Wait() inside the callsMu critical section to
satisfy staticcheck SA2001 (empty critical section). The lock still
serves as a barrier ensuring all in-progress Call() goroutines have
completed their Add(1) before Wait() is called.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings February 6, 2026 15:22
@github-actions github-actions bot added documentation Improvements or additions to documentation enhancement New feature or request lang/docs Documentation changes lang/go Go code changes area/pool Worker pool management area/worker Python worker process labels Feb 6, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces per-worker serialization with configurable in-flight limits to prevent head-of-line blocking and improve concurrency control. The changes refactor the pool's backpressure mechanism to use both a global semaphore and per-worker gates, add comprehensive concurrency tests, and update documentation to reflect the new configuration option.

Changes:

  • Introduced MaxInFlightPerWorker configuration parameter to control per-worker in-flight requests
  • Refactored pool backpressure to use global semaphore + per-worker gates for better concurrency control
  • Added comprehensive pool concurrency tests covering serialization, oversubscription, and shutdown scenarios
  • Standardized Unix socket test paths across all test files using a shared helper function

Reviewed changes

Copilot reviewed 27 out of 27 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
pkg/pyproc/config.go Added MaxInFlightPerWorker field to PoolConfig with default value of 1
pkg/pyproc/pool.go Implemented per-worker serialization with inflightGate channels and worker acquisition logic
pkg/pyproc/pool_transport.go Updated NewPoolWithTransport to initialize MaxInFlightPerWorker defaults and adjust semaphore size
pkg/pyproc/pool_test.go Updated test helper to support new MaxInFlightPerWorker configuration
pkg/pyproc/pool_error_test.go Added test coverage for MaxInFlightPerWorker default value validation
pkg/pyproc/pool_concurrency_test.go Added comprehensive concurrency tests for worker serialization and backpressure
pkg/pyproc/testutil_test.go Created shared helper function for generating unique Unix socket paths
pkg/pyproc/worker_external_test.go Updated to use shared socket path helper function
pkg/pyproc/pool_external_test.go Updated to use shared socket path helper function
pkg/pyproc/transport_uds_test.go Replaced inline socket path generation with shared helper
pkg/pyproc/transport_multiplexed_test.go Replaced hardcoded paths with shared helper function calls
pkg/pyproc/socket_hmac_test.go Replaced inline socket path generation with shared helper
pkg/pyproc/connection_test.go Replaced inline socket path generation with shared helper
pkg/pyproc/cancellation_test.go Refactored createTestPool to support configurable MaxInFlightPerWorker
worker/python/README.md Added MaxInFlightPerWorker to example configuration
docs/reference/operations.md Updated documentation to clarify MaxInFlight and MaxInFlightPerWorker semantics
docs/reference/failure-behavior.md Updated backpressure documentation with new concurrency model
docs/reference/architecture.md Updated architecture documentation to describe dual-level backpressure mechanism
docs/ops.md Updated operational guide with new configuration parameter
docs/index.md Updated main documentation with MaxInFlightPerWorker
docs/guides/performance-tuning.md Added MaxInFlightPerWorker to performance tuning guide
docs/getting-started/quick-start.md Updated quick start guide with new configuration parameter
docs/deployment/troubleshooting.md Added MaxInFlightPerWorker to troubleshooting examples
docs/deployment/monitoring.md Updated monitoring example with MaxInFlightPerWorker
config.yaml Updated default configuration with MaxInFlightPerWorker
README.md Updated README with new configuration parameter
CHANGELOG.md Documented breaking changes and new features

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

MaxInFlightPerWorker: 0,
},
WorkerConfig: WorkerConfig{
SocketPath: "/tmp/test.sock",
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses a hardcoded /tmp/test.sock path instead of the shared tempSocketPath() helper. Consider using the helper for consistency with other tests, even though this test doesn't actually create the socket.

Copilot uses AI. Check for mistakes.
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 012cf3f121

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +417 to +419
p.callsMu.Lock()
p.activeCallsWG.Wait()
p.callsMu.Unlock()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid unbounded wait before stopping workers

Shutdown now waits on activeCallsWG before closing connections or stopping workers, so any in-flight call that never returns on its own (for example, a handler stuck in Python while the caller used context.Background()) will block shutdown indefinitely. Because worker teardown happens only after this wait, shutdown has no mechanism to force those calls to fail and drain, which can hang process termination during deploys/incidents.

Useful? React with 👍 / 👎.

Comment on lines 268 to +271
defer func() { <-p.semaphore }()
case <-ctx.Done():
return ctx.Err()
case <-p.shutdownCh:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Prevent semaphore waiters from continuing after shutdown

This select can admit queued calls after shutdown starts: once shutdownCh is closed, if semaphore capacity also becomes available, both branches are ready and Go chooses one pseudo-randomly, so some blocked callers proceed instead of returning pool is shut down. That makes shutdown behavior nondeterministic and can execute additional RPCs after shutdown has been initiated.

Useful? React with 👍 / 👎.

@YuminosukeSato YuminosukeSato merged commit e9f14c6 into main Feb 6, 2026
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/pool Worker pool management area/worker Python worker process documentation Improvements or additions to documentation enhancement New feature or request lang/docs Documentation changes lang/go Go code changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants