diff --git a/pkg/pyproc/connection_test.go b/pkg/pyproc/connection_test.go index 20e52eb..179507f 100644 --- a/pkg/pyproc/connection_test.go +++ b/pkg/pyproc/connection_test.go @@ -3,15 +3,13 @@ package pyproc import ( "context" "net" - "path/filepath" "testing" "time" ) func TestConnectToWorker_Success(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "test.sock") + socketPath := tempSocketPath(t, "connect-ok") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -36,8 +34,7 @@ func TestConnectToWorker_Success(t *testing.T) { func TestConnectToWorker_Timeout(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "nonexistent.sock") + socketPath := tempSocketPath(t, "connect-timeout") start := time.Now() _, err := ConnectToWorker(socketPath, 200*time.Millisecond) @@ -104,8 +101,7 @@ func TestSleepWithCtx_ContextAlreadyCanceled(t *testing.T) { func TestConnectToWorker_TimeoutDuringSleep(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "nonexistent.sock") + socketPath := tempSocketPath(t, "connect-timeout-sleep") // Use very short timeout to ensure timeout occurs during sleep // The timeout must be shorter than defaultSleepDuration (100ms) but long enough @@ -129,8 +125,7 @@ func TestConnectToWorker_TimeoutDuringSleep(t *testing.T) { func TestConnectToWorker_TimeoutAfterSleep(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "nonexistent.sock") + socketPath := tempSocketPath(t, "connect-timeout-after") // Use timeout longer than one sleep cycle (100ms) to ensure at least one // sleep completes successfully. Try multiple times to increase likelihood diff --git a/pkg/pyproc/pool_external_test.go b/pkg/pyproc/pool_external_test.go index cd5c8a1..b55c5c5 100644 --- a/pkg/pyproc/pool_external_test.go +++ b/pkg/pyproc/pool_external_test.go @@ -3,6 +3,7 @@ package pyproc import ( "net" "os" + "path/filepath" "testing" "time" ) @@ -10,7 +11,11 @@ import ( // startTestSocket creates a temporary Unix socket listener for external pool tests. func startTestSocket(t *testing.T) (net.Listener, string) { t.Helper() - f, err := os.CreateTemp("/tmp", "pyproc-ext-pool-*.sock") + baseDir := filepath.Join(os.TempDir(), "pyproc") + if err := os.MkdirAll(baseDir, 0755); err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + f, err := os.CreateTemp(baseDir, "pyproc-ext-pool-*.sock") if err != nil { t.Fatalf("failed to create temp file: %v", err) } diff --git a/pkg/pyproc/pool_test.go b/pkg/pyproc/pool_test.go index 3e4484a..a0710ff 100644 --- a/pkg/pyproc/pool_test.go +++ b/pkg/pyproc/pool_test.go @@ -54,17 +54,23 @@ func newPoolWithWorkers(cfg PoolConfig, workers []workerHandle) *Pool { if cfg.MaxInFlight == 0 { cfg.MaxInFlight = 1 } + if cfg.MaxInFlightPerWorker == 0 { + cfg.MaxInFlightPerWorker = 1 + } p := &Pool{ - opts: PoolOptions{Config: cfg}, - logger: NewLogger(LoggingConfig{Level: "error", Format: "json"}), - workers: make([]*poolWorker, len(workers)), - semaphore: make(chan struct{}, cfg.Workers*cfg.MaxInFlight), - activeRequests: make(map[uint64]*activeRequest), + opts: PoolOptions{Config: cfg}, + logger: NewLogger(LoggingConfig{Level: "error", Format: "json"}), + workers: make([]*poolWorker, len(workers)), + semaphore: make(chan struct{}, cfg.MaxInFlight), + workerAvailable: make(chan struct{}, cfg.Workers*cfg.MaxInFlightPerWorker), + shutdownCh: make(chan struct{}), + activeRequests: make(map[uint64]*activeRequest), } for i, w := range workers { p.workers[i] = &poolWorker{ - worker: w, - connPool: make(chan net.Conn, cfg.MaxInFlight), + worker: w, + connPool: make(chan net.Conn, cfg.MaxInFlightPerWorker), + inflightGate: make(chan struct{}, cfg.MaxInFlightPerWorker), } } return p @@ -189,8 +195,7 @@ func TestNewPoolDefaultsAndInvalidWorkers(t *testing.T) { } func TestPoolStartPrepopulateAndHealth(t *testing.T) { - tmp := t.TempDir() - paths := []string{filepath.Join(tmp, "w0.sock"), filepath.Join(tmp, "w1.sock")} + paths := []string{tempSocketPath(t, "prepop-w0"), tempSocketPath(t, "prepop-w1")} servers := []func(){ startUnixServer(t, paths[0], func(req protocol.Request) *protocol.Response { resp, _ := protocol.NewResponse(req.ID, map[string]string{"ok": "yes"}) @@ -333,7 +338,7 @@ func TestPoolCallConnectError(t *testing.T) { } func TestPoolCallCreatesConnectionAndReturns(t *testing.T) { - path := "/tmp/pool-creates-conn.sock" + path := tempSocketPath(t, "pool-create-conn") stop := startUnixServer(t, path, func(req protocol.Request) *protocol.Response { resp, _ := protocol.NewResponse(req.ID, map[string]string{"ok": "yes"}) return resp @@ -551,7 +556,7 @@ func TestWorkerIsHealthyStates(t *testing.T) { } func TestPoolConnect(t *testing.T) { - path := filepath.Join(t.TempDir(), "connect.sock") + path := tempSocketPath(t, "connect") stop := startUnixServer(t, path, func(req protocol.Request) *protocol.Response { resp, _ := protocol.NewResponse(req.ID, map[string]string{"ok": "yes"}) return resp diff --git a/pkg/pyproc/socket_hmac_test.go b/pkg/pyproc/socket_hmac_test.go index 8b93bd3..2c6ac67 100644 --- a/pkg/pyproc/socket_hmac_test.go +++ b/pkg/pyproc/socket_hmac_test.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/hex" "net" - "path/filepath" "testing" ) @@ -83,8 +82,7 @@ func TestSecretFromHex_InvalidHex(t *testing.T) { func TestHMACAuthClientServer(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "hmac-test.sock") + socketPath := tempSocketPath(t, "hmac-test") secret := []byte("shared-secret-key-for-testing") serverAuth := NewHMACAuth(secret) @@ -125,8 +123,7 @@ func TestHMACAuthClientServer(t *testing.T) { func TestHMACAuthWrongSecret(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "w.sock") + socketPath := tempSocketPath(t, "hmac-wrong") serverAuth := NewHMACAuth([]byte("server-secret")) clientAuth := NewHMACAuth([]byte("wrong-client-secret")) @@ -166,8 +163,7 @@ func TestHMACAuthWrongSecret(t *testing.T) { func TestNewHMACListener(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "hmac-listener.sock") + socketPath := tempSocketPath(t, "hmac-listener") listener, err := net.Listen("unix", socketPath) if err != nil { diff --git a/pkg/pyproc/testutil_test.go b/pkg/pyproc/testutil_test.go new file mode 100644 index 0000000..ea97f35 --- /dev/null +++ b/pkg/pyproc/testutil_test.go @@ -0,0 +1,19 @@ +package pyproc + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" +) + +func tempSocketPath(t *testing.T, prefix string) string { + t.Helper() + base := filepath.Join(os.TempDir(), "pyproc") + if err := os.MkdirAll(base, 0755); err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + name := fmt.Sprintf("%s-%d.sock", prefix, time.Now().UnixNano()) + return filepath.Join(base, name) +} diff --git a/pkg/pyproc/transport_multiplexed_test.go b/pkg/pyproc/transport_multiplexed_test.go index 6106e38..a6e5358 100644 --- a/pkg/pyproc/transport_multiplexed_test.go +++ b/pkg/pyproc/transport_multiplexed_test.go @@ -243,7 +243,7 @@ func TestNewMultiplexedTransport_NonExistentSocket(t *testing.T) { func TestMultiplexedTransport_CallAfterClose(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/mux-call-close.sock" + socketPath := tempSocketPath(t, "mux-call-close") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -282,7 +282,7 @@ func TestMultiplexedTransport_CallAfterClose(t *testing.T) { func TestMultiplexedTransport_IsHealthy(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/mux-health.sock" + socketPath := tempSocketPath(t, "mux-health") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -323,7 +323,7 @@ func TestMultiplexedTransport_IsHealthy(t *testing.T) { func TestMultiplexedTransport_ReadError(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/mux-read-error.sock" + socketPath := tempSocketPath(t, "mux-read-error") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -362,7 +362,7 @@ func TestMultiplexedTransport_ReadError(t *testing.T) { func TestMultiplexedTransport_DoubleClose(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/mux-dclose.sock" + socketPath := tempSocketPath(t, "mux-dclose") listener, err := net.Listen("unix", socketPath) if err != nil { diff --git a/pkg/pyproc/transport_uds_test.go b/pkg/pyproc/transport_uds_test.go index cd63f22..9322a72 100644 --- a/pkg/pyproc/transport_uds_test.go +++ b/pkg/pyproc/transport_uds_test.go @@ -3,7 +3,6 @@ package pyproc import ( "context" "net" - "path/filepath" "testing" "time" @@ -54,8 +53,7 @@ func TestNewUDSTransport_NonExistentSocket(t *testing.T) { func TestUDSTransport_CallAfterClose(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "t.sock") + socketPath := tempSocketPath(t, "uds-call-close") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -93,8 +91,7 @@ func TestUDSTransport_CallAfterClose(t *testing.T) { func TestUDSTransport_Health(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "h.sock") + socketPath := tempSocketPath(t, "uds-health") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -131,7 +128,7 @@ func TestUDSTransport_Health(t *testing.T) { func TestUDSTransport_Reconnect(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/reconnect-test.sock" + socketPath := tempSocketPath(t, "uds-reconnect") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -166,7 +163,7 @@ func TestUDSTransport_Reconnect(t *testing.T) { func TestUDSTransport_ReconnectFail(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/reconnect-fail.sock" + socketPath := tempSocketPath(t, "uds-reconnect-fail") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -202,7 +199,7 @@ func TestUDSTransport_ReconnectFail(t *testing.T) { func TestUDSTransport_PingFail(t *testing.T) { requireUnixSocket(t) - socketPath := "/tmp/ping-fail.sock" + socketPath := tempSocketPath(t, "uds-ping-fail") listener, err := net.Listen("unix", socketPath) if err != nil { @@ -243,8 +240,7 @@ func TestUDSTransport_PingFail(t *testing.T) { func TestUDSTransport_DoubleClose(t *testing.T) { requireUnixSocket(t) - tmpDir := t.TempDir() - socketPath := filepath.Join(tmpDir, "d.sock") + socketPath := tempSocketPath(t, "uds-dclose") listener, err := net.Listen("unix", socketPath) if err != nil { diff --git a/pkg/pyproc/worker_external_test.go b/pkg/pyproc/worker_external_test.go index 50b6840..e9badf7 100644 --- a/pkg/pyproc/worker_external_test.go +++ b/pkg/pyproc/worker_external_test.go @@ -4,6 +4,7 @@ import ( "context" "net" "os" + "path/filepath" "testing" "time" ) @@ -12,7 +13,11 @@ import ( // It uses /tmp directly to avoid macOS socket path length limits. func startTestUnixListener(t *testing.T) (net.Listener, string) { t.Helper() - f, err := os.CreateTemp("/tmp", "pyproc-test-*.sock") + baseDir := filepath.Join(os.TempDir(), "pyproc") + if err := os.MkdirAll(baseDir, 0755); err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + f, err := os.CreateTemp(baseDir, "pyproc-test-*.sock") if err != nil { t.Fatalf("failed to create temp file: %v", err) }