From 6b6ec60ffa737e7f55f7be205f8f84d4eaf9719e Mon Sep 17 00:00:00 2001 From: YuminosukeSato Date: Fri, 6 Feb 2026 23:03:14 +0900 Subject: [PATCH 1/5] test: stabilize unix socket test paths --- pkg/pyproc/connection_test.go | 13 ++++-------- pkg/pyproc/pool_external_test.go | 7 +++++- pkg/pyproc/pool_test.go | 27 ++++++++++++++---------- pkg/pyproc/socket_hmac_test.go | 10 +++------ pkg/pyproc/testutil_test.go | 19 +++++++++++++++++ pkg/pyproc/transport_multiplexed_test.go | 8 +++---- pkg/pyproc/transport_uds_test.go | 16 ++++++-------- pkg/pyproc/worker_external_test.go | 7 +++++- 8 files changed, 64 insertions(+), 43 deletions(-) create mode 100644 pkg/pyproc/testutil_test.go diff --git a/pkg/pyproc/connection_test.go b/pkg/pyproc/connection_test.go index 20e52eb..179507f 100644 --- a/pkg/pyproc/connection_test.go +++ b/pkg/pyproc/connection_test.go @@ -3,15 +3,13 @@ package pyproc import ( "context" "net" - "path/filepath" "testing" "time" ) func TestConnectToWorker_Success(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "test.sock") + socketPath := tempSocketPath(t, "connect-ok") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -36,8 +34,7 @@ func TestConnectToWorker_Success(t *testing.T) { func TestConnectToWorker_Timeout(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "nonexistent.sock") + socketPath := tempSocketPath(t, "connect-timeout") start := time.Now() _, err := ConnectToWorker(socketPath, 200*time.Millisecond) @@ -104,8 +101,7 @@ func TestSleepWithCtx_ContextAlreadyCanceled(t *testing.T) { func TestConnectToWorker_TimeoutDuringSleep(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "nonexistent.sock") + socketPath := tempSocketPath(t, "connect-timeout-sleep") // Use very short timeout to ensure timeout occurs during sleep // The timeout must be shorter than defaultSleepDuration (100ms) but long enough @@ -129,8 +125,7 @@ func TestConnectToWorker_TimeoutDuringSleep(t *testing.T) { func TestConnectToWorker_TimeoutAfterSleep(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "nonexistent.sock") + socketPath := tempSocketPath(t, "connect-timeout-after") // Use timeout longer than one sleep cycle (100ms) to ensure at least one // sleep completes successfully. Try multiple times to increase likelihood diff --git a/pkg/pyproc/pool_external_test.go b/pkg/pyproc/pool_external_test.go index cd5c8a1..b55c5c5 100644 --- a/pkg/pyproc/pool_external_test.go +++ b/pkg/pyproc/pool_external_test.go @@ -3,6 +3,7 @@ package pyproc import ( "net" "os" + "path/filepath" "testing" "time" ) @@ -10,7 +11,11 @@ import ( // startTestSocket creates a temporary Unix socket listener for external pool tests. func startTestSocket(t *testing.T) (net.Listener, string) { t.Helper() - f, err := os.CreateTemp("/tmp", "pyproc-ext-pool-*.sock") + baseDir := filepath.Join(os.TempDir(), "pyproc") + if err := os.MkdirAll(baseDir, 0755); err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + f, err := os.CreateTemp(baseDir, "pyproc-ext-pool-*.sock") if err != nil { t.Fatalf("failed to create temp file: %v", err) } diff --git a/pkg/pyproc/pool_test.go b/pkg/pyproc/pool_test.go index 3e4484a..a0710ff 100644 --- a/pkg/pyproc/pool_test.go +++ b/pkg/pyproc/pool_test.go @@ -54,17 +54,23 @@ func newPoolWithWorkers(cfg PoolConfig, workers []workerHandle) *Pool { if cfg.MaxInFlight == 0 { cfg.MaxInFlight = 1 } + if cfg.MaxInFlightPerWorker == 0 { + cfg.MaxInFlightPerWorker = 1 + } p := &Pool{ - opts: PoolOptions{Config: cfg}, - logger: NewLogger(LoggingConfig{Level: "error", Format: "json"}), - workers: make([]*poolWorker, len(workers)), - semaphore: make(chan struct{}, cfg.Workers*cfg.MaxInFlight), - activeRequests: make(map[uint64]*activeRequest), + opts: PoolOptions{Config: cfg}, + logger: NewLogger(LoggingConfig{Level: "error", Format: "json"}), + workers: make([]*poolWorker, len(workers)), + semaphore: make(chan struct{}, cfg.MaxInFlight), + workerAvailable: make(chan struct{}, cfg.Workers*cfg.MaxInFlightPerWorker), + shutdownCh: make(chan struct{}), + activeRequests: make(map[uint64]*activeRequest), } for i, w := range workers { p.workers[i] = &poolWorker{ - worker: w, - connPool: make(chan net.Conn, cfg.MaxInFlight), + worker: w, + connPool: make(chan net.Conn, cfg.MaxInFlightPerWorker), + inflightGate: make(chan struct{}, cfg.MaxInFlightPerWorker), } } return p @@ -189,8 +195,7 @@ func TestNewPoolDefaultsAndInvalidWorkers(t *testing.T) { } func TestPoolStartPrepopulateAndHealth(t *testing.T) { - tmp := t.TempDir() - paths := []string{filepath.Join(tmp, "w0.sock"), filepath.Join(tmp, "w1.sock")} + paths := []string{tempSocketPath(t, "prepop-w0"), tempSocketPath(t, "prepop-w1")} servers := []func(){ startUnixServer(t, paths[0], func(req protocol.Request) *protocol.Response { resp, _ := protocol.NewResponse(req.ID, map[string]string{"ok": "yes"}) @@ -333,7 +338,7 @@ func TestPoolCallConnectError(t *testing.T) { } func TestPoolCallCreatesConnectionAndReturns(t *testing.T) { - path := "/tmp/pool-creates-conn.sock" + path := tempSocketPath(t, "pool-create-conn") stop := startUnixServer(t, path, func(req protocol.Request) *protocol.Response { resp, _ := protocol.NewResponse(req.ID, map[string]string{"ok": "yes"}) return resp @@ -551,7 +556,7 @@ func TestWorkerIsHealthyStates(t *testing.T) { } func TestPoolConnect(t *testing.T) { - path := filepath.Join(t.TempDir(), "connect.sock") + path := tempSocketPath(t, "connect") stop := startUnixServer(t, path, func(req protocol.Request) *protocol.Response { resp, _ := protocol.NewResponse(req.ID, map[string]string{"ok": "yes"}) return resp diff --git a/pkg/pyproc/socket_hmac_test.go b/pkg/pyproc/socket_hmac_test.go index 8b93bd3..2c6ac67 100644 --- a/pkg/pyproc/socket_hmac_test.go +++ b/pkg/pyproc/socket_hmac_test.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/hex" "net" - "path/filepath" "testing" ) @@ -83,8 +82,7 @@ func TestSecretFromHex_InvalidHex(t *testing.T) { func TestHMACAuthClientServer(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "hmac-test.sock") + socketPath := tempSocketPath(t, "hmac-test") secret := []byte("shared-secret-key-for-testing") serverAuth := NewHMACAuth(secret) @@ -125,8 +123,7 @@ func TestHMACAuthClientServer(t *testing.T) { func TestHMACAuthWrongSecret(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "w.sock") + socketPath := tempSocketPath(t, "hmac-wrong") serverAuth := NewHMACAuth([]byte("server-secret")) clientAuth := NewHMACAuth([]byte("wrong-client-secret")) @@ -166,8 +163,7 @@ func TestHMACAuthWrongSecret(t *testing.T) { func TestNewHMACListener(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "hmac-listener.sock") + socketPath := tempSocketPath(t, "hmac-listener") listener, err := net.Listen("unix", socketPath) if err != nil { diff --git a/pkg/pyproc/testutil_test.go b/pkg/pyproc/testutil_test.go new file mode 100644 index 0000000..ea97f35 --- /dev/null +++ b/pkg/pyproc/testutil_test.go @@ -0,0 +1,19 @@ +package pyproc + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" +) + +func tempSocketPath(t *testing.T, prefix string) string { + t.Helper() + base := filepath.Join(os.TempDir(), "pyproc") + if err := os.MkdirAll(base, 0755); err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + name := fmt.Sprintf("%s-%d.sock", prefix, time.Now().UnixNano()) + return filepath.Join(base, name) +} diff --git a/pkg/pyproc/transport_multiplexed_test.go b/pkg/pyproc/transport_multiplexed_test.go index 6106e38..a6e5358 100644 --- a/pkg/pyproc/transport_multiplexed_test.go +++ b/pkg/pyproc/transport_multiplexed_test.go @@ -243,7 +243,7 @@ func TestNewMultiplexedTransport_NonExistentSocket(t *testing.T) { func TestMultiplexedTransport_CallAfterClose(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/mux-call-close.sock" + socketPath := tempSocketPath(t, "mux-call-close") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -282,7 +282,7 @@ func TestMultiplexedTransport_CallAfterClose(t *testing.T) { func TestMultiplexedTransport_IsHealthy(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/mux-health.sock" + socketPath := tempSocketPath(t, "mux-health") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -323,7 +323,7 @@ func TestMultiplexedTransport_IsHealthy(t *testing.T) { func TestMultiplexedTransport_ReadError(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/mux-read-error.sock" + socketPath := tempSocketPath(t, "mux-read-error") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -362,7 +362,7 @@ func TestMultiplexedTransport_ReadError(t *testing.T) { func TestMultiplexedTransport_DoubleClose(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/mux-dclose.sock" + socketPath := tempSocketPath(t, "mux-dclose") listener, err := net.Listen("unix", socketPath) if err != nil { diff --git a/pkg/pyproc/transport_uds_test.go b/pkg/pyproc/transport_uds_test.go index cd63f22..9322a72 100644 --- a/pkg/pyproc/transport_uds_test.go +++ b/pkg/pyproc/transport_uds_test.go @@ -3,7 +3,6 @@ package pyproc import ( "context" "net" - "path/filepath" "testing" "time" @@ -54,8 +53,7 @@ func TestNewUDSTransport_NonExistentSocket(t *testing.T) { func TestUDSTransport_CallAfterClose(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "t.sock") + socketPath := tempSocketPath(t, "uds-call-close") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -93,8 +91,7 @@ func TestUDSTransport_CallAfterClose(t *testing.T) { func TestUDSTransport_Health(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "h.sock") + socketPath := tempSocketPath(t, "uds-health") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -131,7 +128,7 @@ func TestUDSTransport_Health(t *testing.T) { func TestUDSTransport_Reconnect(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/reconnect-test.sock" + socketPath := tempSocketPath(t, "uds-reconnect") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -166,7 +163,7 @@ func TestUDSTransport_Reconnect(t *testing.T) { func TestUDSTransport_ReconnectFail(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/reconnect-fail.sock" + socketPath := tempSocketPath(t, "uds-reconnect-fail") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -202,7 +199,7 @@ func TestUDSTransport_ReconnectFail(t *testing.T) { func TestUDSTransport_PingFail(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/ping-fail.sock" + socketPath := tempSocketPath(t, "uds-ping-fail") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -243,8 +240,7 @@ func TestUDSTransport_PingFail(t *testing.T) { func TestUDSTransport_DoubleClose(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "d.sock") + socketPath := tempSocketPath(t, "uds-dclose") listener, err := net.Listen("unix", socketPath) if err != nil { diff --git a/pkg/pyproc/worker_external_test.go b/pkg/pyproc/worker_external_test.go index b8d2932..3117dda 100644 --- a/pkg/pyproc/worker_external_test.go +++ b/pkg/pyproc/worker_external_test.go @@ -4,6 +4,7 @@ import ( "context" "net" "os" + "path/filepath" "testing" "time" ) @@ -12,7 +13,11 @@ import ( // It uses /tmp directly to avoid macOS socket path length limits. func startTestUnixListener(t *testing.T) (net.Listener, string) { t.Helper() - f, err := os.CreateTemp("/tmp", "pyproc-test-*.sock") + baseDir := filepath.Join(os.TempDir(), "pyproc") + if err := os.MkdirAll(baseDir, 0755); err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + f, err := os.CreateTemp(baseDir, "pyproc-test-*.sock") if err != nil { t.Fatalf("failed to create temp file: %v", err) } From ba20c4c7fd7691b822bb66931452062b40ab0dd6 Mon Sep 17 00:00:00 2001 From: YuminosukeSato Date: Fri, 6 Feb 2026 23:03:34 +0900 Subject: [PATCH 2/5] feat: serialize dispatch per worker --- pkg/pyproc/config.go | 12 +-- pkg/pyproc/pool.go | 163 +++++++++++++++++++++++++---------- pkg/pyproc/pool_transport.go | 5 +- 3 files changed, 128 insertions(+), 52 deletions(-) diff --git a/pkg/pyproc/config.go b/pkg/pyproc/config.go index 8fbf61c..07a58b5 100644 --- a/pkg/pyproc/config.go +++ b/pkg/pyproc/config.go @@ -21,11 +21,12 @@ type Config struct { // PoolConfig defines worker pool settings type PoolConfig struct { - Workers int `mapstructure:"workers"` - MaxInFlight int `mapstructure:"max_in_flight"` - StartTimeout time.Duration `mapstructure:"start_timeout"` - HealthInterval time.Duration `mapstructure:"health_interval"` - Restart RestartConfig `mapstructure:"restart"` + Workers int `mapstructure:"workers"` + MaxInFlight int `mapstructure:"max_in_flight"` + MaxInFlightPerWorker int `mapstructure:"max_in_flight_per_worker"` + StartTimeout time.Duration `mapstructure:"start_timeout"` + HealthInterval time.Duration `mapstructure:"health_interval"` + Restart RestartConfig `mapstructure:"restart"` } // RestartConfig defines restart policy @@ -122,6 +123,7 @@ func setDefaults(v *viper.Viper) { // Pool defaults v.SetDefault("pool.workers", 4) v.SetDefault("pool.max_in_flight", 10) + v.SetDefault("pool.max_in_flight_per_worker", 1) v.SetDefault("pool.start_timeout", 30) v.SetDefault("pool.health_interval", 30) v.SetDefault("pool.restart.max_attempts", 5) diff --git a/pkg/pyproc/pool.go b/pkg/pyproc/pool.go index 1fad2fb..ffadd23 100644 --- a/pkg/pyproc/pool.go +++ b/pkg/pyproc/pool.go @@ -42,16 +42,20 @@ type workerHandle interface { // Pool manages multiple Python workers with load balancing type Pool struct { - opts PoolOptions - logger *Logger - workers []*poolWorker - nextIdx atomic.Uint64 - shutdown atomic.Bool - started atomic.Bool - wg sync.WaitGroup + opts PoolOptions + logger *Logger + workers []*poolWorker + nextIdx atomic.Uint64 + shutdown atomic.Bool + started atomic.Bool + activeCallsWG sync.WaitGroup + callsMu sync.Mutex + wg sync.WaitGroup // Backpressure control - semaphore chan struct{} + semaphore chan struct{} + workerAvailable chan struct{} + shutdownCh chan struct{} // Health monitoring healthMu sync.RWMutex @@ -74,10 +78,11 @@ type activeRequest struct { // poolWorker wraps a Worker with connection pooling type poolWorker struct { - worker workerHandle - connPool chan net.Conn - requestID atomic.Uint64 - healthy atomic.Bool + worker workerHandle + connPool chan net.Conn + inflightGate chan struct{} + requestID atomic.Uint64 + healthy atomic.Bool } // HealthStatus represents the health of the pool @@ -101,6 +106,9 @@ func NewPool(opts PoolOptions, logger *Logger) (*Pool, error) { if opts.Config.MaxInFlight <= 0 { opts.Config.MaxInFlight = 10 } + if opts.Config.MaxInFlightPerWorker <= 0 { + opts.Config.MaxInFlightPerWorker = 1 + } if opts.Config.HealthInterval <= 0 { opts.Config.HealthInterval = 30 * time.Second } @@ -110,11 +118,13 @@ func NewPool(opts PoolOptions, logger *Logger) (*Pool, error) { } pool := &Pool{ - opts: opts, - logger: logger, - workers: make([]*poolWorker, opts.Config.Workers), - semaphore: make(chan struct{}, opts.Config.Workers*opts.Config.MaxInFlight), - activeRequests: make(map[uint64]*activeRequest), + opts: opts, + logger: logger, + workers: make([]*poolWorker, opts.Config.Workers), + semaphore: make(chan struct{}, opts.Config.MaxInFlight), + workerAvailable: make(chan struct{}, opts.Config.Workers*opts.Config.MaxInFlightPerWorker), + shutdownCh: make(chan struct{}), + activeRequests: make(map[uint64]*activeRequest), } // Create workers @@ -128,8 +138,9 @@ func NewPool(opts PoolOptions, logger *Logger) (*Pool, error) { worker := NewWorker(workerCfg, logger) pool.workers[i] = &poolWorker{ - worker: worker, - connPool: make(chan net.Conn, opts.Config.MaxInFlight), + worker: worker, + connPool: make(chan net.Conn, opts.Config.MaxInFlightPerWorker), + inflightGate: make(chan struct{}, opts.Config.MaxInFlightPerWorker), } } @@ -148,6 +159,9 @@ func newExternalPool(opts PoolOptions, logger *Logger) (*Pool, error) { if opts.Config.MaxInFlight <= 0 { opts.Config.MaxInFlight = 10 } + if opts.Config.MaxInFlightPerWorker <= 0 { + opts.Config.MaxInFlightPerWorker = 1 + } if opts.Config.HealthInterval <= 0 { opts.Config.HealthInterval = 30 * time.Second } @@ -157,11 +171,13 @@ func newExternalPool(opts PoolOptions, logger *Logger) (*Pool, error) { } pool := &Pool{ - opts: opts, - logger: logger, - workers: make([]*poolWorker, numWorkers), - semaphore: make(chan struct{}, numWorkers*opts.Config.MaxInFlight), - activeRequests: make(map[uint64]*activeRequest), + opts: opts, + logger: logger, + workers: make([]*poolWorker, numWorkers), + semaphore: make(chan struct{}, opts.Config.MaxInFlight), + workerAvailable: make(chan struct{}, numWorkers*opts.Config.MaxInFlightPerWorker), + shutdownCh: make(chan struct{}), + activeRequests: make(map[uint64]*activeRequest), } for i, sockPath := range opts.ExternalSocketPaths { @@ -172,8 +188,9 @@ func newExternalPool(opts PoolOptions, logger *Logger) (*Pool, error) { RetryInterval: opts.ExternalRetryInterval, }) pool.workers[i] = &poolWorker{ - worker: worker, - connPool: make(chan net.Conn, opts.Config.MaxInFlight), + worker: worker, + connPool: make(chan net.Conn, opts.Config.MaxInFlightPerWorker), + inflightGate: make(chan struct{}, opts.Config.MaxInFlightPerWorker), } } @@ -210,7 +227,7 @@ func (p *Pool) Start(ctx context.Context) error { select { case pw.connPool <- conn: default: - // Pool is full (shouldn't happen with MaxInFlight=1), close connection + // Pool is full (shouldn't happen with MaxInFlightPerWorker=1), close connection if err := conn.Close(); err != nil { p.logger.Error("failed to close connection", "error", err) } @@ -236,9 +253,14 @@ func (p *Pool) Start(ctx context.Context) error { // Call invokes a method on one of the workers using round-robin func (p *Pool) Call(ctx context.Context, method string, input interface{}, output interface{}) error { + p.callsMu.Lock() if p.shutdown.Load() { + p.callsMu.Unlock() return errors.New("pool is shut down") } + p.activeCallsWG.Add(1) + p.callsMu.Unlock() + defer p.activeCallsWG.Done() // Acquire semaphore for backpressure select { @@ -246,31 +268,27 @@ func (p *Pool) Call(ctx context.Context, method string, input interface{}, outpu defer func() { <-p.semaphore }() case <-ctx.Done(): return ctx.Err() + case <-p.shutdownCh: + return errors.New("pool is shut down") } - // Select worker using round-robin - idx := p.nextIdx.Add(1) - 1 - workerIdx := int(idx % uint64(len(p.workers))) - pw := p.workers[workerIdx] - - if !pw.healthy.Load() { - // Try to find a healthy worker - for i, w := range p.workers { - if w.healthy.Load() { - pw = w - workerIdx = i - break - } - } - if !pw.healthy.Load() { - return errors.New("no healthy workers available") - } + pw, workerIdx, err := p.acquireWorker(ctx) + if err != nil { + return err } + defer func() { + <-pw.inflightGate + p.signalWorkerAvailable() + }() // Get connection from pool var conn net.Conn select { - case conn = <-pw.connPool: + case pooledConn, ok := <-pw.connPool: + if !ok { + return errors.New("connection pool closed") + } + conn = pooledConn default: // Create new connection if pool is empty var err error @@ -308,12 +326,14 @@ func (p *Pool) Call(ctx context.Context, method string, input interface{}, outpu p.activeRequestsMu.Unlock() // Return connection to pool only if not closed - if !connClosed { + if !connClosed && !p.shutdown.Load() { select { case pw.connPool <- conn: default: _ = conn.Close() } + } else if !connClosed { + _ = conn.Close() } }() @@ -382,6 +402,7 @@ func (p *Pool) Shutdown(_ context.Context) error { if !p.shutdown.CompareAndSwap(false, true) { return nil // Already shutting down } + close(p.shutdownCh) p.logger.Info("shutting down worker pool") @@ -390,6 +411,11 @@ func (p *Pool) Shutdown(_ context.Context) error { p.healthCancel() } + // Wait for in-flight calls to complete before closing pools + p.callsMu.Lock() + p.callsMu.Unlock() + p.activeCallsWG.Wait() + // Close all connection pools for _, pw := range p.workers { close(pw.connPool) @@ -417,6 +443,51 @@ func (p *Pool) Shutdown(_ context.Context) error { return nil } +func (p *Pool) signalWorkerAvailable() { + select { + case p.workerAvailable <- struct{}{}: + default: + } +} + +func (p *Pool) acquireWorker(ctx context.Context) (*poolWorker, int, error) { + workers := p.workers + if len(workers) == 0 { + return nil, -1, errors.New("no workers available") + } + + for { + startIdx := int(p.nextIdx.Add(1) - 1) + healthyFound := false + + for i := 0; i < len(workers); i++ { + idx := (startIdx + i) % len(workers) + pw := workers[idx] + if !pw.healthy.Load() { + continue + } + healthyFound = true + select { + case pw.inflightGate <- struct{}{}: + return pw, idx, nil + default: + } + } + + if !healthyFound { + return nil, -1, errors.New("no healthy workers available") + } + + select { + case <-ctx.Done(): + return nil, -1, ctx.Err() + case <-p.shutdownCh: + return nil, -1, errors.New("pool is shut down") + case <-p.workerAvailable: + } + } +} + // Health returns the current health status of the pool func (p *Pool) Health() HealthStatus { p.healthMu.RLock() diff --git a/pkg/pyproc/pool_transport.go b/pkg/pyproc/pool_transport.go index 57a695a..38536f9 100644 --- a/pkg/pyproc/pool_transport.go +++ b/pkg/pyproc/pool_transport.go @@ -134,6 +134,9 @@ func NewPoolWithTransport(opts PoolOptions, logger *Logger) (*PoolWithTransport, if opts.Config.MaxInFlight <= 0 { opts.Config.MaxInFlight = 10 } + if opts.Config.MaxInFlightPerWorker <= 0 { + opts.Config.MaxInFlightPerWorker = 1 + } if opts.Config.HealthInterval <= 0 { opts.Config.HealthInterval = 30 * time.Second } @@ -146,7 +149,7 @@ func NewPoolWithTransport(opts PoolOptions, logger *Logger) (*PoolWithTransport, opts: opts, logger: logger, workers: make([]*Worker, opts.Config.Workers), - semaphore: make(chan struct{}, opts.Config.Workers*opts.Config.MaxInFlight), + semaphore: make(chan struct{}, opts.Config.MaxInFlight), } // Create workers (they still manage the Python processes) From dff16abf98b40e2b4847c7764c0c6c21524819a5 Mon Sep 17 00:00:00 2001 From: YuminosukeSato Date: Sat, 7 Feb 2026 00:08:47 +0900 Subject: [PATCH 3/5] fix(pool): move WaitGroup.Wait inside lock to fix SA2001 Move activeCallsWG.Wait() inside the callsMu critical section to satisfy staticcheck SA2001 (empty critical section). The lock still serves as a barrier ensuring all in-progress Call() goroutines have completed their Add(1) before Wait() is called. Co-Authored-By: Claude Opus 4.6 --- pkg/pyproc/pool.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/pyproc/pool.go b/pkg/pyproc/pool.go index ffadd23..59fa9d7 100644 --- a/pkg/pyproc/pool.go +++ b/pkg/pyproc/pool.go @@ -411,10 +411,12 @@ func (p *Pool) Shutdown(_ context.Context) error { p.healthCancel() } - // Wait for in-flight calls to complete before closing pools + // Wait for in-flight calls to complete before closing pools. + // The lock ensures all in-progress Call() goroutines have finished + // their activeCallsWG.Add(1) before we start waiting. p.callsMu.Lock() - p.callsMu.Unlock() p.activeCallsWG.Wait() + p.callsMu.Unlock() // Close all connection pools for _, pw := range p.workers { From c8179aadf7a4b2e1ebbc0a858dd648e1b2ecc725 Mon Sep 17 00:00:00 2001 From: YuminosukeSato Date: Fri, 6 Feb 2026 23:03:46 +0900 Subject: [PATCH 4/5] test: add pool concurrency coverage --- pkg/pyproc/cancellation_test.go | 15 ++- pkg/pyproc/pool_concurrency_test.go | 149 ++++++++++++++++++++++++++++ pkg/pyproc/pool_error_test.go | 22 ++++ 3 files changed, 181 insertions(+), 5 deletions(-) create mode 100644 pkg/pyproc/pool_concurrency_test.go diff --git a/pkg/pyproc/cancellation_test.go b/pkg/pyproc/cancellation_test.go index 3b2d841..a4edef7 100644 --- a/pkg/pyproc/cancellation_test.go +++ b/pkg/pyproc/cancellation_test.go @@ -9,7 +9,7 @@ import ( ) // createTestPool creates a new pool for testing -func createTestPool(t *testing.T, id string) *Pool { +func createTestPoolWithConfig(t *testing.T, id string, workers int, maxInFlight int, perWorker int) *Pool { requireUnixSocket(t) // Use /tmp directly with short names to avoid 104 char Unix socket path limit on macOS // Note: pool.go adds "-0" for worker ID, so keep the base path very short @@ -19,9 +19,10 @@ func createTestPool(t *testing.T, id string) *Pool { poolOpts := PoolOptions{ Config: PoolConfig{ - Workers: 1, - MaxInFlight: 3, // Allow 3 concurrent requests for the concurrent test - HealthInterval: 100 * time.Millisecond, + Workers: workers, + MaxInFlight: maxInFlight, + MaxInFlightPerWorker: perWorker, + HealthInterval: 100 * time.Millisecond, }, WorkerConfig: WorkerConfig{ ID: id, @@ -59,6 +60,10 @@ func createTestPool(t *testing.T, id string) *Pool { return pool } +func createTestPool(t *testing.T, id string) *Pool { + return createTestPoolWithConfig(t, id, 1, 3, 1) +} + // TestContextCancellation tests that context cancellation propagates to Python workers func TestContextCancellation(t *testing.T) { // Skip if running in CI without Python @@ -121,7 +126,7 @@ func TestContextCancellation(t *testing.T) { }) t.Run("MultipleConcurrentCancellations", func(t *testing.T) { - pool := createTestPool(t, "mc") + pool := createTestPoolWithConfig(t, "mc", 3, 3, 1) t.Cleanup(func() { if err := pool.Shutdown(context.Background()); err != nil { t.Errorf("Failed to shutdown pool: %v", err) diff --git a/pkg/pyproc/pool_concurrency_test.go b/pkg/pyproc/pool_concurrency_test.go new file mode 100644 index 0000000..9212533 --- /dev/null +++ b/pkg/pyproc/pool_concurrency_test.go @@ -0,0 +1,149 @@ +package pyproc + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/YuminosukeSato/pyproc/internal/protocol" +) + +func TestPoolCall_SerializesPerWorker(t *testing.T) { + paths := []string{ + tempSocketPath(t, "serial-w0"), + tempSocketPath(t, "serial-w1"), + } + var collisions atomic.Int32 + for _, path := range paths { + path := path + var inflight atomic.Int32 + stop := startUnixServer(t, path, func(req protocol.Request) *protocol.Response { + if inflight.Add(1) > 1 { + collisions.Add(1) + } + time.Sleep(120 * time.Millisecond) + inflight.Add(-1) + resp, _ := protocol.NewResponse(req.ID, map[string]bool{"ok": true}) + return resp + }) + t.Cleanup(stop) + } + + workers := []workerHandle{ + newStubWorker(paths[0], true), + newStubWorker(paths[1], true), + } + p := newPoolWithWorkers(PoolConfig{ + Workers: 2, + MaxInFlight: 4, + MaxInFlightPerWorker: 1, + HealthInterval: 10 * time.Millisecond, + }, workers) + for _, pw := range p.workers { + pw.healthy.Store(true) + } + + var wg sync.WaitGroup + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := p.Call(ctx, "echo", map[string]string{"msg": "x"}, &map[string]any{}); err != nil { + t.Errorf("call failed: %v", err) + } + }() + } + wg.Wait() + + if collisions.Load() != 0 { + t.Fatalf("expected no concurrent requests per worker, got %d", collisions.Load()) + } +} + +func TestPoolCall_OversubscribeBlocksWithContext(t *testing.T) { + path := tempSocketPath(t, "oversub") + firstStarted := make(chan struct{}) + var reqCount atomic.Int32 + stop := startUnixServer(t, path, func(req protocol.Request) *protocol.Response { + if reqCount.Add(1) == 1 { + close(firstStarted) + time.Sleep(200 * time.Millisecond) + } + resp, _ := protocol.NewResponse(req.ID, map[string]bool{"ok": true}) + return resp + }) + t.Cleanup(stop) + + workers := []workerHandle{newStubWorker(path, true)} + p := newPoolWithWorkers(PoolConfig{ + Workers: 1, + MaxInFlight: 2, + MaxInFlightPerWorker: 1, + }, workers) + p.workers[0].healthy.Store(true) + + firstErrCh := make(chan error, 1) + go func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + firstErrCh <- p.Call(ctx, "echo", map[string]string{"msg": "first"}, &map[string]any{}) + }() + + <-firstStarted + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + err := p.Call(ctx, "echo", map[string]string{"msg": "second"}, &map[string]any{}) + if err == nil { + t.Fatal("expected deadline exceeded for oversubscribed call") + } + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expected deadline exceeded, got %v", err) + } + + if firstErr := <-firstErrCh; firstErr != nil { + t.Fatalf("first call failed: %v", firstErr) + } +} + +func TestPoolCall_ShutdownConcurrent(t *testing.T) { + path := tempSocketPath(t, "shutdown") + started := make(chan struct{}) + stop := startUnixServer(t, path, func(req protocol.Request) *protocol.Response { + close(started) + time.Sleep(150 * time.Millisecond) + resp, _ := protocol.NewResponse(req.ID, map[string]bool{"ok": true}) + return resp + }) + t.Cleanup(stop) + + workers := []workerHandle{newStubWorker(path, true)} + p := newPoolWithWorkers(PoolConfig{ + Workers: 1, + MaxInFlight: 1, + MaxInFlightPerWorker: 1, + }, workers) + p.workers[0].healthy.Store(true) + + callErrCh := make(chan error, 1) + go func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + callErrCh <- p.Call(ctx, "echo", map[string]string{"msg": "x"}, &map[string]any{}) + }() + + <-started + + if err := p.Shutdown(context.Background()); err != nil { + t.Fatalf("shutdown failed: %v", err) + } + + if callErr := <-callErrCh; callErr != nil { + t.Fatalf("call failed during shutdown: %v", callErr) + } +} diff --git a/pkg/pyproc/pool_error_test.go b/pkg/pyproc/pool_error_test.go index e5c65d7..ddecfd1 100644 --- a/pkg/pyproc/pool_error_test.go +++ b/pkg/pyproc/pool_error_test.go @@ -165,6 +165,28 @@ func TestPool_DefaultMaxInFlight(t *testing.T) { } } +func TestPool_DefaultMaxInFlightPerWorker(t *testing.T) { + opts := PoolOptions{ + Config: PoolConfig{ + Workers: 1, + MaxInFlight: 1, + MaxInFlightPerWorker: 0, + }, + WorkerConfig: WorkerConfig{ + SocketPath: "/tmp/test.sock", + }, + } + + pool, err := NewPool(opts, nil) + if err != nil { + t.Fatalf("NewPool failed: %v", err) + } + + if pool.opts.Config.MaxInFlightPerWorker != 1 { + t.Errorf("expected default MaxInFlightPerWorker 1, got %d", pool.opts.Config.MaxInFlightPerWorker) + } +} + func TestPool_DefaultHealthInterval(t *testing.T) { opts := PoolOptions{ Config: PoolConfig{ From 012cf3f121ff773bd25cb7660b21ba4832f3e21d Mon Sep 17 00:00:00 2001 From: YuminosukeSato Date: Fri, 6 Feb 2026 23:03:57 +0900 Subject: [PATCH 5/5] docs: document inflight limits --- CHANGELOG.md | 17 ++++++++++++++--- README.md | 17 ++++++++++------- config.yaml | 7 +++++-- docs/deployment/monitoring.md | 2 +- docs/deployment/troubleshooting.md | 6 ++++-- docs/getting-started/quick-start.md | 8 +++++--- docs/guides/performance-tuning.md | 5 +++-- docs/index.md | 5 +++-- docs/ops.md | 9 ++++++--- docs/reference/architecture.md | 17 ++++++++++------- docs/reference/failure-behavior.md | 24 ++++++++++++++---------- docs/reference/operations.md | 9 ++++++--- worker/python/README.md | 7 ++++--- 13 files changed, 85 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1013a06..2098813 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- `PoolConfig.MaxInFlightPerWorker` to cap in-flight requests per worker (default: 1) + +### Changed +- `MaxInFlight` now limits total concurrent requests across the pool +- Dispatch now enforces per-worker serialization to prevent head-of-line blocking + +### Fixed +- Data race when `Pool.Call` and `Pool.Shutdown` run concurrently + ## [0.1.0] - 2025-08-15 ### Added @@ -121,8 +131,9 @@ worker.Start(ctx) ```go pool, _ := pyproc.NewPool(pyproc.PoolOptions{ Config: pyproc.PoolConfig{ - Workers: 4, - MaxInFlight: 10, + Workers: 4, + MaxInFlight: 10, + MaxInFlightPerWorker: 1, }, WorkerConfig: cfg, }, nil) @@ -142,4 +153,4 @@ Thanks to all contributors and early adopters who provided valuable feedback to Special thanks to the Go and Python communities for the excellent libraries and tools that make pyproc possible. [Unreleased]: https://github.com/YuminosukeSato/pyproc/compare/v0.1.0...HEAD -[0.1.0]: https://github.com/YuminosukeSato/pyproc/releases/tag/v0.1.0 \ No newline at end of file +[0.1.0]: https://github.com/YuminosukeSato/pyproc/releases/tag/v0.1.0 diff --git a/README.md b/README.md index 7c1eb18..ebfd392 100644 --- a/README.md +++ b/README.md @@ -213,8 +213,9 @@ func main() { // Create a pool of Python workers pool, err := pyproc.NewPool(pyproc.PoolOptions{ Config: pyproc.PoolConfig{ - Workers: 4, // Run 4 Python processes - MaxInFlight: 10, // Max concurrent requests per worker + Workers: 4, // Run 4 Python processes + MaxInFlight: 10, // Global concurrent requests + MaxInFlightPerWorker: 1, // Per-worker in-flight cap }, WorkerConfig: pyproc.WorkerConfig{ SocketPath: "/tmp/pyproc.sock", @@ -302,9 +303,10 @@ cfg := pyproc.WorkerConfig{ #### Pool Configuration ```go poolCfg := pyproc.PoolConfig{ - Workers: 4, // Number of Python processes - MaxInFlight: 10, // Max concurrent requests per worker - HealthInterval: 30 * time.Second, // Health check frequency + Workers: 4, // Number of Python processes + MaxInFlight: 10, // Global concurrent requests + MaxInFlightPerWorker: 1, // Per-worker in-flight cap + HealthInterval: 30 * time.Second, // Health check frequency } ``` @@ -680,8 +682,9 @@ The benchmarks show near-linear scaling with worker count, demonstrating the eff ```go pool, _ := pyproc.NewPool(pyproc.PoolOptions{ Config: pyproc.PoolConfig{ - Workers: 4, - MaxInFlight: 10, + Workers: 4, + MaxInFlight: 10, + MaxInFlightPerWorker: 1, }, WorkerConfig: pyproc.WorkerConfig{ SocketPath: "/tmp/pyproc.sock", diff --git a/config.yaml b/config.yaml index 3edf587..c690c42 100644 --- a/config.yaml +++ b/config.yaml @@ -3,8 +3,11 @@ pool: # Number of Python worker processes workers: 4 - # Maximum concurrent requests per worker + # Maximum concurrent requests across the pool max_in_flight: 10 + + # Maximum in-flight requests per worker + max_in_flight_per_worker: 1 # Worker startup timeout in seconds start_timeout: 30 @@ -72,4 +75,4 @@ metrics: endpoint: ":9090" # Metrics path - path: "/metrics" \ No newline at end of file + path: "/metrics" diff --git a/docs/deployment/monitoring.md b/docs/deployment/monitoring.md index 3d0feee..5f4ac46 100644 --- a/docs/deployment/monitoring.md +++ b/docs/deployment/monitoring.md @@ -25,7 +25,7 @@ import ( func main() { pool, err := pyproc.NewPoolWithMetrics(pyproc.PoolOptions{ - Config: pyproc.PoolConfig{Workers: 4, MaxInFlight: 10}, + Config: pyproc.PoolConfig{Workers: 4, MaxInFlight: 10, MaxInFlightPerWorker: 1}, WorkerConfig: pyproc.WorkerConfig{ PythonExec: "python3", WorkerScript: "worker.py", diff --git a/docs/deployment/troubleshooting.md b/docs/deployment/troubleshooting.md index 6ae4eec..8679eb1 100644 --- a/docs/deployment/troubleshooting.md +++ b/docs/deployment/troubleshooting.md @@ -129,8 +129,9 @@ Enable metrics to identify bottleneck: ```go pool, _ := pyproc.NewPool(pyproc.PoolOptions{ Config: pyproc.PoolConfig{ - Workers: 4, - MaxInFlight: 10, + Workers: 4, + MaxInFlight: 10, + MaxInFlightPerWorker: 1, }, // ... }, logger) @@ -553,6 +554,7 @@ When filing an issue, include: **Config**: - Workers: 4 - MaxInFlight: 10 +- MaxInFlightPerWorker: 1 - Socket path: /tmp/pyproc.sock **Code**: diff --git a/docs/getting-started/quick-start.md b/docs/getting-started/quick-start.md index 3d98744..dbd4fc1 100644 --- a/docs/getting-started/quick-start.md +++ b/docs/getting-started/quick-start.md @@ -110,8 +110,9 @@ func main() { // Create a pool of Python workers pool, err := pyproc.NewPool(pyproc.PoolOptions{ Config: pyproc.PoolConfig{ - Workers: 4, // Run 4 Python processes - MaxInFlight: 10, // Max concurrent requests per worker + Workers: 4, // Run 4 Python processes + MaxInFlight: 10, // Global concurrent requests + MaxInFlightPerWorker: 1, // Per-worker in-flight cap }, WorkerConfig: pyproc.WorkerConfig{ SocketPath: "/tmp/pyproc.sock", @@ -149,7 +150,8 @@ func main() { | Parameter | Description | |-----------|-------------| | `Workers` | Number of Python processes to spawn (recommended: 2-8 per CPU core) | -| `MaxInFlight` | Max concurrent requests per worker (prevents overload) | +| `MaxInFlight` | Max concurrent requests across the pool | +| `MaxInFlightPerWorker` | Max in-flight requests per worker | | `SocketPath` | Unix Domain Socket file path (must be writable) | | `PythonExec` | Python interpreter path (`python3`, or path to venv) | | `WorkerScript` | Path to your Python worker script | diff --git a/docs/guides/performance-tuning.md b/docs/guides/performance-tuning.md index f181dcd..283698c 100644 --- a/docs/guides/performance-tuning.md +++ b/docs/guides/performance-tuning.md @@ -21,10 +21,11 @@ Workers: runtime.NumCPU() * 2 ### 2. Connection Pooling -Use `MaxInFlight` to control backpressure: +Use `MaxInFlight` (global) and `MaxInFlightPerWorker` (per-worker) to control backpressure: ```go -MaxInFlight: 10 // 10 concurrent requests per worker +MaxInFlight: 10 // global concurrent requests +MaxInFlightPerWorker: 1 // per-worker in-flight cap ``` ### 3. Benchmarking diff --git a/docs/index.md b/docs/index.md index e87b219..0d1acba 100644 --- a/docs/index.md +++ b/docs/index.md @@ -84,8 +84,9 @@ Just your Go binary + Python scripts — no service mesh, no orchestration // Create pool of Python workers pool, _ := pyproc.NewPool(pyproc.PoolOptions{ Config: pyproc.PoolConfig{ - Workers: 4, // 4 Python processes - MaxInFlight: 10, // Concurrent requests + Workers: 4, // 4 Python processes + MaxInFlight: 10, // Global concurrent requests + MaxInFlightPerWorker: 1, // Per-worker in-flight cap }, WorkerConfig: pyproc.WorkerConfig{ SocketPath: "/tmp/pyproc.sock", diff --git a/docs/ops.md b/docs/ops.md index 68b6d29..36350c1 100644 --- a/docs/ops.md +++ b/docs/ops.md @@ -8,7 +8,8 @@ # Standard deployment on a single machine pool: workers: 4 # Number of Python processes - max_in_flight: 10 # Max concurrent requests per worker + max_in_flight: 10 # Max concurrent requests across the pool + max_in_flight_per_worker: 1 # Max in-flight requests per worker health_interval: 30s # Health check frequency python: @@ -100,7 +101,8 @@ cfg := pyproc.WorkerConfig{ ```go poolCfg := pyproc.PoolConfig{ Workers: 4, // Number of workers - MaxInFlight: 10, // Per-worker concurrency + MaxInFlight: 10, // Global concurrency across the pool + MaxInFlightPerWorker: 1, // Per-worker in-flight cap HealthInterval: 30 * time.Second, // Health check frequency Restart: pyproc.RestartConfig{ MaxAttempts: 5, @@ -260,6 +262,8 @@ conn.SetWriteBuffer(1024 * 1024) // 1MB ```go // Match MaxInFlight to expected concurrency MaxInFlight: runtime.NumCPU() * 2 +// Keep per-worker at 1 unless the Python worker can process concurrent requests +MaxInFlightPerWorker: 1 ``` ## Troubleshooting @@ -327,4 +331,3 @@ echo '{"id":1,"method":"health","body":{}}' | \ - [ ] Test failure scenarios - [ ] Document worker dependencies - [ ] Create runbooks for common issues - diff --git a/docs/reference/architecture.md b/docs/reference/architecture.md index 49dd9b6..dc1f2dd 100644 --- a/docs/reference/architecture.md +++ b/docs/reference/architecture.md @@ -121,11 +121,13 @@ worker := p.workers[idx % uint64(len(p.workers))] ### Backpressure Mechanism -Prevents overwhelming workers using a **semaphore-based** approach: +Prevents overwhelming workers using a **global semaphore + per-worker gate** approach: ```go -// Limit total in-flight requests across all workers -semaphore := make(chan struct{}, workers * maxInFlight) +// Limit total in-flight requests across the pool +semaphore := make(chan struct{}, maxInFlight) +// Limit per-worker in-flight requests +inflightGate := make(chan struct{}, maxInFlightPerWorker) // Before each request semaphore <- struct{}{} // Blocks if limit reached @@ -135,9 +137,10 @@ defer func() { <-semaphore }() // Release after completion **Configuration**: ```go Config: pyproc.PoolConfig{ - Workers: 4, // 4 Python processes - MaxInFlight: 10, // Max 10 requests per worker - // Total capacity: 4 * 10 = 40 concurrent requests + Workers: 4, // 4 Python processes + MaxInFlight: 10, // Max concurrent requests across the pool + MaxInFlightPerWorker: 1, // Max in-flight per worker + // Effective capacity: min(10, 4*1) = 4 concurrent requests } ``` @@ -287,7 +290,7 @@ respData, err := framer.ReadMessage() **Benefits**: - No socket open/close overhead per request - Connection reuse reduces latency -- Backpressure via MaxInFlight limit +- Backpressure via MaxInFlight (global) + MaxInFlightPerWorker (per worker) ### Graceful Shutdown diff --git a/docs/reference/failure-behavior.md b/docs/reference/failure-behavior.md index a880d2b..e696277 100644 --- a/docs/reference/failure-behavior.md +++ b/docs/reference/failure-behavior.md @@ -86,17 +86,20 @@ Automatic restart is not yet implemented. The `RestartConfig` struct exists in t ## Backpressure -The Pool uses a semaphore to limit concurrency. +The Pool uses a global semaphore and a per-worker gate to limit concurrency. ### Semaphore Mechanism ```go -semaphore := make(chan struct{}, Workers*MaxInFlight) +semaphore := make(chan struct{}, MaxInFlight) +// per-worker gate +inflightGate := make(chan struct{}, MaxInFlightPerWorker) ``` - `Workers`: Number of worker processes (default: 4) -- `MaxInFlight`: Maximum concurrent requests per worker (default: 10) -- Semaphore size = `Workers * MaxInFlight` (default: 40) +- `MaxInFlight`: Maximum concurrent requests across the pool (default: 10) +- `MaxInFlightPerWorker`: Maximum in-flight requests per worker (default: 1) +- Effective max concurrency = `min(MaxInFlight, Workers * MaxInFlightPerWorker)` ### Behavior When Semaphore Is Full @@ -117,13 +120,14 @@ Callers can control the maximum wait time for backpressure by setting a timeout ### Capacity Planning -| Workers | MaxInFlight | Max Concurrent Requests | -|---------|-------------|------------------------| -| 4 | 10 | 40 | -| 8 | 10 | 80 | -| 4 | 20 | 80 | +| Workers | MaxInFlight | MaxInFlightPerWorker | Max Concurrent Requests | +|---------|-------------|----------------------|------------------------| +| 4 | 10 | 1 | 4 | +| 8 | 10 | 1 | 8 | +| 4 | 40 | 1 | 4 | +| 4 | 40 | 2 | 8 | -Requests exceeding `Workers * MaxInFlight` are queued until capacity becomes available. +Requests exceeding `min(MaxInFlight, Workers * MaxInFlightPerWorker)` are queued until capacity becomes available. ## SLO Definition Template diff --git a/docs/reference/operations.md b/docs/reference/operations.md index 68b6d29..36350c1 100644 --- a/docs/reference/operations.md +++ b/docs/reference/operations.md @@ -8,7 +8,8 @@ # Standard deployment on a single machine pool: workers: 4 # Number of Python processes - max_in_flight: 10 # Max concurrent requests per worker + max_in_flight: 10 # Max concurrent requests across the pool + max_in_flight_per_worker: 1 # Max in-flight requests per worker health_interval: 30s # Health check frequency python: @@ -100,7 +101,8 @@ cfg := pyproc.WorkerConfig{ ```go poolCfg := pyproc.PoolConfig{ Workers: 4, // Number of workers - MaxInFlight: 10, // Per-worker concurrency + MaxInFlight: 10, // Global concurrency across the pool + MaxInFlightPerWorker: 1, // Per-worker in-flight cap HealthInterval: 30 * time.Second, // Health check frequency Restart: pyproc.RestartConfig{ MaxAttempts: 5, @@ -260,6 +262,8 @@ conn.SetWriteBuffer(1024 * 1024) // 1MB ```go // Match MaxInFlight to expected concurrency MaxInFlight: runtime.NumCPU() * 2 +// Keep per-worker at 1 unless the Python worker can process concurrent requests +MaxInFlightPerWorker: 1 ``` ## Troubleshooting @@ -327,4 +331,3 @@ echo '{"id":1,"method":"health","body":{}}' | \ - [ ] Test failure scenarios - [ ] Document worker dependencies - [ ] Create runbooks for common issues - diff --git a/worker/python/README.md b/worker/python/README.md index f20bb0d..0dfbffe 100644 --- a/worker/python/README.md +++ b/worker/python/README.md @@ -36,8 +36,9 @@ Then call it from Go using pyproc: ```go pool, _ := pyproc.NewPool(pyproc.PoolOptions{ Config: pyproc.PoolConfig{ - Workers: 4, - MaxInFlight: 10, + Workers: 4, + MaxInFlight: 10, + MaxInFlightPerWorker: 1, }, WorkerConfig: pyproc.WorkerConfig{ SocketPath: "/tmp/pyproc.sock", @@ -190,4 +191,4 @@ Apache 2.0 - See [LICENSE](https://github.com/YuminosukeSato/pyproc/blob/main/LI - [pyproc Go Library](https://github.com/YuminosukeSato/pyproc) - [Documentation](https://github.com/YuminosukeSato/pyproc#readme) - [Examples](https://github.com/YuminosukeSato/pyproc/tree/main/examples) -- [Issue Tracker](https://github.com/YuminosukeSato/pyproc/issues) \ No newline at end of file +- [Issue Tracker](https://github.com/YuminosukeSato/pyproc/issues)