Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- `PoolConfig.MaxInFlightPerWorker` to cap in-flight requests per worker (default: 1)

### Changed
- `MaxInFlight` now limits total concurrent requests across the pool
- Dispatch now enforces per-worker serialization to prevent head-of-line blocking

### Fixed
- Data race when `Pool.Call` and `Pool.Shutdown` run concurrently

## [0.1.0] - 2025-08-15

### Added
Expand Down Expand Up @@ -121,8 +131,9 @@ worker.Start(ctx)
```go
pool, _ := pyproc.NewPool(pyproc.PoolOptions{
Config: pyproc.PoolConfig{
Workers: 4,
MaxInFlight: 10,
Workers: 4,
MaxInFlight: 10,
MaxInFlightPerWorker: 1,
},
WorkerConfig: cfg,
}, nil)
Expand All @@ -142,4 +153,4 @@ Thanks to all contributors and early adopters who provided valuable feedback to
Special thanks to the Go and Python communities for the excellent libraries and tools that make pyproc possible.

[Unreleased]: https://github.com/YuminosukeSato/pyproc/compare/v0.1.0...HEAD
[0.1.0]: https://github.com/YuminosukeSato/pyproc/releases/tag/v0.1.0
[0.1.0]: https://github.com/YuminosukeSato/pyproc/releases/tag/v0.1.0
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,9 @@ func main() {
// Create a pool of Python workers
pool, err := pyproc.NewPool(pyproc.PoolOptions{
Config: pyproc.PoolConfig{
Workers: 4, // Run 4 Python processes
MaxInFlight: 10, // Max concurrent requests per worker
Workers: 4, // Run 4 Python processes
MaxInFlight: 10, // Global concurrent requests
MaxInFlightPerWorker: 1, // Per-worker in-flight cap
},
WorkerConfig: pyproc.WorkerConfig{
SocketPath: "/tmp/pyproc.sock",
Expand Down Expand Up @@ -302,9 +303,10 @@ cfg := pyproc.WorkerConfig{
#### Pool Configuration
```go
poolCfg := pyproc.PoolConfig{
Workers: 4, // Number of Python processes
MaxInFlight: 10, // Max concurrent requests per worker
HealthInterval: 30 * time.Second, // Health check frequency
Workers: 4, // Number of Python processes
MaxInFlight: 10, // Global concurrent requests
MaxInFlightPerWorker: 1, // Per-worker in-flight cap
HealthInterval: 30 * time.Second, // Health check frequency
}
```

Expand Down Expand Up @@ -680,8 +682,9 @@ The benchmarks show near-linear scaling with worker count, demonstrating the eff
```go
pool, _ := pyproc.NewPool(pyproc.PoolOptions{
Config: pyproc.PoolConfig{
Workers: 4,
MaxInFlight: 10,
Workers: 4,
MaxInFlight: 10,
MaxInFlightPerWorker: 1,
},
WorkerConfig: pyproc.WorkerConfig{
SocketPath: "/tmp/pyproc.sock",
Expand Down
7 changes: 5 additions & 2 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ pool:
# Number of Python worker processes
workers: 4

# Maximum concurrent requests per worker
# Maximum concurrent requests across the pool
max_in_flight: 10

# Maximum in-flight requests per worker
max_in_flight_per_worker: 1

# Worker startup timeout in seconds
start_timeout: 30
Expand Down Expand Up @@ -72,4 +75,4 @@ metrics:
endpoint: ":9090"

# Metrics path
path: "/metrics"
path: "/metrics"
2 changes: 1 addition & 1 deletion docs/deployment/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

func main() {
pool, err := pyproc.NewPoolWithMetrics(pyproc.PoolOptions{
Config: pyproc.PoolConfig{Workers: 4, MaxInFlight: 10},
Config: pyproc.PoolConfig{Workers: 4, MaxInFlight: 10, MaxInFlightPerWorker: 1},
WorkerConfig: pyproc.WorkerConfig{
PythonExec: "python3",
WorkerScript: "worker.py",
Expand Down
6 changes: 4 additions & 2 deletions docs/deployment/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ Enable metrics to identify bottleneck:
```go
pool, _ := pyproc.NewPool(pyproc.PoolOptions{
Config: pyproc.PoolConfig{
Workers: 4,
MaxInFlight: 10,
Workers: 4,
MaxInFlight: 10,
MaxInFlightPerWorker: 1,
},
// ...
}, logger)
Expand Down Expand Up @@ -553,6 +554,7 @@ When filing an issue, include:
**Config**:
- Workers: 4
- MaxInFlight: 10
- MaxInFlightPerWorker: 1
- Socket path: /tmp/pyproc.sock

**Code**:
Expand Down
8 changes: 5 additions & 3 deletions docs/getting-started/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ func main() {
// Create a pool of Python workers
pool, err := pyproc.NewPool(pyproc.PoolOptions{
Config: pyproc.PoolConfig{
Workers: 4, // Run 4 Python processes
MaxInFlight: 10, // Max concurrent requests per worker
Workers: 4, // Run 4 Python processes
MaxInFlight: 10, // Global concurrent requests
MaxInFlightPerWorker: 1, // Per-worker in-flight cap
},
WorkerConfig: pyproc.WorkerConfig{
SocketPath: "/tmp/pyproc.sock",
Expand Down Expand Up @@ -149,7 +150,8 @@ func main() {
| Parameter | Description |
|-----------|-------------|
| `Workers` | Number of Python processes to spawn (recommended: 2-8 per CPU core) |
| `MaxInFlight` | Max concurrent requests per worker (prevents overload) |
| `MaxInFlight` | Max concurrent requests across the pool |
| `MaxInFlightPerWorker` | Max in-flight requests per worker |
| `SocketPath` | Unix Domain Socket file path (must be writable) |
| `PythonExec` | Python interpreter path (`python3`, or path to venv) |
| `WorkerScript` | Path to your Python worker script |
Expand Down
5 changes: 3 additions & 2 deletions docs/guides/performance-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ Workers: runtime.NumCPU() * 2

### 2. Connection Pooling

Use `MaxInFlight` to control backpressure:
Use `MaxInFlight` (global) and `MaxInFlightPerWorker` (per-worker) to control backpressure:

```go
MaxInFlight: 10 // 10 concurrent requests per worker
MaxInFlight: 10 // global concurrent requests
MaxInFlightPerWorker: 1 // per-worker in-flight cap
```

### 3. Benchmarking
Expand Down
5 changes: 3 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ Just your Go binary + Python scripts — no service mesh, no orchestration
// Create pool of Python workers
pool, _ := pyproc.NewPool(pyproc.PoolOptions{
Config: pyproc.PoolConfig{
Workers: 4, // 4 Python processes
MaxInFlight: 10, // Concurrent requests
Workers: 4, // 4 Python processes
MaxInFlight: 10, // Global concurrent requests
MaxInFlightPerWorker: 1, // Per-worker in-flight cap
},
WorkerConfig: pyproc.WorkerConfig{
SocketPath: "/tmp/pyproc.sock",
Expand Down
9 changes: 6 additions & 3 deletions docs/ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
# Standard deployment on a single machine
pool:
workers: 4 # Number of Python processes
max_in_flight: 10 # Max concurrent requests per worker
max_in_flight: 10 # Max concurrent requests across the pool
max_in_flight_per_worker: 1 # Max in-flight requests per worker
health_interval: 30s # Health check frequency

python:
Expand Down Expand Up @@ -100,7 +101,8 @@ cfg := pyproc.WorkerConfig{
```go
poolCfg := pyproc.PoolConfig{
Workers: 4, // Number of workers
MaxInFlight: 10, // Per-worker concurrency
MaxInFlight: 10, // Global concurrency across the pool
MaxInFlightPerWorker: 1, // Per-worker in-flight cap
HealthInterval: 30 * time.Second, // Health check frequency
Restart: pyproc.RestartConfig{
MaxAttempts: 5,
Expand Down Expand Up @@ -260,6 +262,8 @@ conn.SetWriteBuffer(1024 * 1024) // 1MB
```go
// Match MaxInFlight to expected concurrency
MaxInFlight: runtime.NumCPU() * 2
// Keep per-worker at 1 unless the Python worker can process concurrent requests
MaxInFlightPerWorker: 1
```

## Troubleshooting
Expand Down Expand Up @@ -327,4 +331,3 @@ echo '{"id":1,"method":"health","body":{}}' | \
- [ ] Test failure scenarios
- [ ] Document worker dependencies
- [ ] Create runbooks for common issues

17 changes: 10 additions & 7 deletions docs/reference/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,13 @@ worker := p.workers[idx % uint64(len(p.workers))]

### Backpressure Mechanism

Prevents overwhelming workers using a **semaphore-based** approach:
Prevents overwhelming workers using a **global semaphore + per-worker gate** approach:

```go
// Limit total in-flight requests across all workers
semaphore := make(chan struct{}, workers * maxInFlight)
// Limit total in-flight requests across the pool
semaphore := make(chan struct{}, maxInFlight)
// Limit per-worker in-flight requests
inflightGate := make(chan struct{}, maxInFlightPerWorker)

// Before each request
semaphore <- struct{}{} // Blocks if limit reached
Expand All @@ -135,9 +137,10 @@ defer func() { <-semaphore }() // Release after completion
**Configuration**:
```go
Config: pyproc.PoolConfig{
Workers: 4, // 4 Python processes
MaxInFlight: 10, // Max 10 requests per worker
// Total capacity: 4 * 10 = 40 concurrent requests
Workers: 4, // 4 Python processes
MaxInFlight: 10, // Max concurrent requests across the pool
MaxInFlightPerWorker: 1, // Max in-flight per worker
// Effective capacity: min(10, 4*1) = 4 concurrent requests
}
```

Expand Down Expand Up @@ -287,7 +290,7 @@ respData, err := framer.ReadMessage()
**Benefits**:
- No socket open/close overhead per request
- Connection reuse reduces latency
- Backpressure via MaxInFlight limit
- Backpressure via MaxInFlight (global) + MaxInFlightPerWorker (per worker)

### Graceful Shutdown

Expand Down
24 changes: 14 additions & 10 deletions docs/reference/failure-behavior.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,20 @@ Automatic restart is not yet implemented. The `RestartConfig` struct exists in t

## Backpressure

The Pool uses a semaphore to limit concurrency.
The Pool uses a global semaphore and a per-worker gate to limit concurrency.

### Semaphore Mechanism

```go
semaphore := make(chan struct{}, Workers*MaxInFlight)
semaphore := make(chan struct{}, MaxInFlight)
// per-worker gate
inflightGate := make(chan struct{}, MaxInFlightPerWorker)
```

- `Workers`: Number of worker processes (default: 4)
- `MaxInFlight`: Maximum concurrent requests per worker (default: 10)
- Semaphore size = `Workers * MaxInFlight` (default: 40)
- `MaxInFlight`: Maximum concurrent requests across the pool (default: 10)
- `MaxInFlightPerWorker`: Maximum in-flight requests per worker (default: 1)
- Effective max concurrency = `min(MaxInFlight, Workers * MaxInFlightPerWorker)`

### Behavior When Semaphore Is Full

Expand All @@ -117,13 +120,14 @@ Callers can control the maximum wait time for backpressure by setting a timeout

### Capacity Planning

| Workers | MaxInFlight | Max Concurrent Requests |
|---------|-------------|------------------------|
| 4 | 10 | 40 |
| 8 | 10 | 80 |
| 4 | 20 | 80 |
| Workers | MaxInFlight | MaxInFlightPerWorker | Max Concurrent Requests |
|---------|-------------|----------------------|------------------------|
| 4 | 10 | 1 | 4 |
| 8 | 10 | 1 | 8 |
| 4 | 40 | 1 | 4 |
| 4 | 40 | 2 | 8 |

Requests exceeding `Workers * MaxInFlight` are queued until capacity becomes available.
Requests exceeding `min(MaxInFlight, Workers * MaxInFlightPerWorker)` are queued until capacity becomes available.

## SLO Definition Template

Expand Down
9 changes: 6 additions & 3 deletions docs/reference/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
# Standard deployment on a single machine
pool:
workers: 4 # Number of Python processes
max_in_flight: 10 # Max concurrent requests per worker
max_in_flight: 10 # Max concurrent requests across the pool
max_in_flight_per_worker: 1 # Max in-flight requests per worker
health_interval: 30s # Health check frequency

python:
Expand Down Expand Up @@ -100,7 +101,8 @@ cfg := pyproc.WorkerConfig{
```go
poolCfg := pyproc.PoolConfig{
Workers: 4, // Number of workers
MaxInFlight: 10, // Per-worker concurrency
MaxInFlight: 10, // Global concurrency across the pool
MaxInFlightPerWorker: 1, // Per-worker in-flight cap
HealthInterval: 30 * time.Second, // Health check frequency
Restart: pyproc.RestartConfig{
MaxAttempts: 5,
Expand Down Expand Up @@ -260,6 +262,8 @@ conn.SetWriteBuffer(1024 * 1024) // 1MB
```go
// Match MaxInFlight to expected concurrency
MaxInFlight: runtime.NumCPU() * 2
// Keep per-worker at 1 unless the Python worker can process concurrent requests
MaxInFlightPerWorker: 1
```

## Troubleshooting
Expand Down Expand Up @@ -327,4 +331,3 @@ echo '{"id":1,"method":"health","body":{}}' | \
- [ ] Test failure scenarios
- [ ] Document worker dependencies
- [ ] Create runbooks for common issues

15 changes: 10 additions & 5 deletions pkg/pyproc/cancellation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

// createTestPool creates a new pool for testing
func createTestPool(t *testing.T, id string) *Pool {
func createTestPoolWithConfig(t *testing.T, id string, workers int, maxInFlight int, perWorker int) *Pool {
requireUnixSocket(t)
// Use /tmp directly with short names to avoid 104 char Unix socket path limit on macOS
// Note: pool.go adds "-0" for worker ID, so keep the base path very short
Expand All @@ -19,9 +19,10 @@ func createTestPool(t *testing.T, id string) *Pool {

poolOpts := PoolOptions{
Config: PoolConfig{
Workers: 1,
MaxInFlight: 3, // Allow 3 concurrent requests for the concurrent test
HealthInterval: 100 * time.Millisecond,
Workers: workers,
MaxInFlight: maxInFlight,
MaxInFlightPerWorker: perWorker,
HealthInterval: 100 * time.Millisecond,
},
WorkerConfig: WorkerConfig{
ID: id,
Expand Down Expand Up @@ -59,6 +60,10 @@ func createTestPool(t *testing.T, id string) *Pool {
return pool
}

func createTestPool(t *testing.T, id string) *Pool {
return createTestPoolWithConfig(t, id, 1, 3, 1)
}

// TestContextCancellation tests that context cancellation propagates to Python workers
func TestContextCancellation(t *testing.T) {
// Skip if running in CI without Python
Expand Down Expand Up @@ -121,7 +126,7 @@ func TestContextCancellation(t *testing.T) {
})

t.Run("MultipleConcurrentCancellations", func(t *testing.T) {
pool := createTestPool(t, "mc")
pool := createTestPoolWithConfig(t, "mc", 3, 3, 1)
t.Cleanup(func() {
if err := pool.Shutdown(context.Background()); err != nil {
t.Errorf("Failed to shutdown pool: %v", err)
Expand Down
12 changes: 7 additions & 5 deletions pkg/pyproc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ type Config struct {

// PoolConfig defines worker pool settings
type PoolConfig struct {
Workers int `mapstructure:"workers"`
MaxInFlight int `mapstructure:"max_in_flight"`
StartTimeout time.Duration `mapstructure:"start_timeout"`
HealthInterval time.Duration `mapstructure:"health_interval"`
Restart RestartConfig `mapstructure:"restart"`
Workers int `mapstructure:"workers"`
MaxInFlight int `mapstructure:"max_in_flight"`
MaxInFlightPerWorker int `mapstructure:"max_in_flight_per_worker"`
StartTimeout time.Duration `mapstructure:"start_timeout"`
HealthInterval time.Duration `mapstructure:"health_interval"`
Restart RestartConfig `mapstructure:"restart"`
}

// RestartConfig defines restart policy
Expand Down Expand Up @@ -122,6 +123,7 @@ func setDefaults(v *viper.Viper) {
// Pool defaults
v.SetDefault("pool.workers", 4)
v.SetDefault("pool.max_in_flight", 10)
v.SetDefault("pool.max_in_flight_per_worker", 1)
v.SetDefault("pool.start_timeout", 30)
v.SetDefault("pool.health_interval", 30)
v.SetDefault("pool.restart.max_attempts", 5)
Expand Down
Loading
Loading