diff --git a/pkg/pyproc/pool.go b/pkg/pyproc/pool.go index 2f4d543..1fad2fb 100644 --- a/pkg/pyproc/pool.go +++ b/pkg/pyproc/pool.go @@ -24,6 +24,13 @@ type PoolOptions struct { // spawning child processes. ExternalSocketPaths must list the UDS paths. ExternalMode bool ExternalSocketPaths []string + + // ExternalMaxRetries controls the number of connection retry attempts + // for external workers. If zero, defaults to 10. + ExternalMaxRetries int + // ExternalRetryInterval is the initial retry interval for external + // workers. Each retry doubles the interval. If zero, defaults to 500ms. + ExternalRetryInterval time.Duration } type workerHandle interface { @@ -158,7 +165,12 @@ func newExternalPool(opts PoolOptions, logger *Logger) (*Pool, error) { } for i, sockPath := range opts.ExternalSocketPaths { - worker := NewExternalWorker(sockPath, opts.WorkerConfig.StartTimeout) + worker := NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: sockPath, + ConnectTimeout: opts.WorkerConfig.StartTimeout, + MaxRetries: opts.ExternalMaxRetries, + RetryInterval: opts.ExternalRetryInterval, + }) pool.workers[i] = &poolWorker{ worker: worker, connPool: make(chan net.Conn, opts.Config.MaxInFlight), diff --git a/pkg/pyproc/worker_external.go b/pkg/pyproc/worker_external.go index bda9302..4fe160c 100644 --- a/pkg/pyproc/worker_external.go +++ b/pkg/pyproc/worker_external.go @@ -18,7 +18,29 @@ const ( ExternalWorkerRunning ) -const defaultConnectTimeout = 5 * time.Second +const ( + defaultConnectTimeout = 5 * time.Second + // defaultMaxRetries with defaultRetryInterval yields ~4 min total wait + // (500ms + 1s + 2s + 4s + 8s + 16s + 32s + 64s + 128s ≈ 255.5s). + defaultMaxRetries = 10 + defaultRetryInterval = 500 * time.Millisecond +) + +// ExternalWorkerOptions configures an ExternalWorker. +type ExternalWorkerOptions struct { + // SocketPath is the Unix Domain Socket path to connect to. + SocketPath string + // ConnectTimeout controls how long each dial attempt waits. + // If zero, defaults to 5s. + ConnectTimeout time.Duration + // MaxRetries is the maximum number of connection retry attempts in Start. + // If zero, defaults to 10. + MaxRetries int + // RetryInterval is the initial interval between retries. Each subsequent + // retry doubles the interval (exponential backoff). + // If zero, defaults to 500ms. + RetryInterval time.Duration +} // ExternalWorker represents a pre-existing Python worker process managed // outside of pyproc (e.g. a Kubernetes sidecar container). It connects to the @@ -27,33 +49,73 @@ const defaultConnectTimeout = 5 * time.Second type ExternalWorker struct { socketPath string connectTimeout time.Duration + maxRetries int + retryInterval time.Duration state atomic.Int32 } // NewExternalWorker creates a new ExternalWorker that connects to the given // Unix Domain Socket path. The connectTimeout controls how long dial attempts -// wait; if zero, a default of 5 s is used. +// wait; if zero, a default of 5 s is used. For retry support, use +// NewExternalWorkerWithOptions instead. func NewExternalWorker(socketPath string, connectTimeout time.Duration) *ExternalWorker { - if connectTimeout <= 0 { - connectTimeout = defaultConnectTimeout + return NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: socketPath, + ConnectTimeout: connectTimeout, + MaxRetries: 1, // no retry for backward compat + }) +} + +// NewExternalWorkerWithOptions creates a new ExternalWorker from options. +func NewExternalWorkerWithOptions(opts ExternalWorkerOptions) *ExternalWorker { + if opts.ConnectTimeout <= 0 { + opts.ConnectTimeout = defaultConnectTimeout + } + if opts.MaxRetries <= 0 { + opts.MaxRetries = defaultMaxRetries + } + if opts.RetryInterval <= 0 { + opts.RetryInterval = defaultRetryInterval } return &ExternalWorker{ - socketPath: socketPath, - connectTimeout: connectTimeout, + socketPath: opts.SocketPath, + connectTimeout: opts.ConnectTimeout, + maxRetries: opts.MaxRetries, + retryInterval: opts.RetryInterval, } } -// Start verifies that the external worker's socket is reachable. It does not -// spawn a process; the worker must already be running. -func (w *ExternalWorker) Start(_ context.Context) error { - conn, err := net.DialTimeout("unix", w.socketPath, w.connectTimeout) - if err != nil { - return fmt.Errorf("external worker socket unreachable at %s: %w", w.socketPath, err) +// Start verifies that the external worker's socket is reachable. It retries +// with exponential backoff according to the configured MaxRetries and +// RetryInterval. It does not spawn a process; the worker must already be +// running. +// +// In production, callers should pass a context with a deadline to bound the +// total wait time (e.g. context.WithTimeout). Without a deadline, Start may +// block for the full backoff duration (~4 min with defaults). +func (w *ExternalWorker) Start(ctx context.Context) error { + var lastErr error + interval := w.retryInterval + for i := 0; i < w.maxRetries; i++ { + if i > 0 { + select { + case <-ctx.Done(): + return fmt.Errorf("external worker connection cancelled: %w", ctx.Err()) + case <-time.After(interval): + } + interval *= 2 + } + conn, err := net.DialTimeout("unix", w.socketPath, w.connectTimeout) + if err != nil { + lastErr = err + continue + } + _ = conn.Close() + w.state.Store(int32(ExternalWorkerRunning)) + return nil } - _ = conn.Close() - - w.state.Store(int32(ExternalWorkerRunning)) - return nil + return fmt.Errorf("external worker socket unreachable at %s after %d attempts: %w", + w.socketPath, w.maxRetries, lastErr) } // Stop transitions the external worker to the stopped state. It does not diff --git a/pkg/pyproc/worker_external_test.go b/pkg/pyproc/worker_external_test.go index 50b6840..b8d2932 100644 --- a/pkg/pyproc/worker_external_test.go +++ b/pkg/pyproc/worker_external_test.go @@ -143,3 +143,133 @@ func TestExternalWorker_IsHealthy_AfterListenerClose(t *testing.T) { // Verify ExternalWorker satisfies workerHandle interface at compile time. var _ workerHandle = (*ExternalWorker)(nil) + +// --- New tests for ExternalWorkerOptions and retry logic --- + +func TestExternalWorkerWithOptions_Defaults(t *testing.T) { + w := NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: "/tmp/test.sock", + }) + if w.connectTimeout != defaultConnectTimeout { + t.Errorf("expected default connect timeout %v, got %v", defaultConnectTimeout, w.connectTimeout) + } + if w.maxRetries != defaultMaxRetries { + t.Errorf("expected default max retries %d, got %d", defaultMaxRetries, w.maxRetries) + } + if w.retryInterval != defaultRetryInterval { + t.Errorf("expected default retry interval %v, got %v", defaultRetryInterval, w.retryInterval) + } +} + +func TestExternalWorkerWithOptions_CustomValues(t *testing.T) { + w := NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: "/tmp/custom.sock", + ConnectTimeout: 2 * time.Second, + MaxRetries: 3, + RetryInterval: 100 * time.Millisecond, + }) + if w.socketPath != "/tmp/custom.sock" { + t.Errorf("expected socketPath /tmp/custom.sock, got %q", w.socketPath) + } + if w.connectTimeout != 2*time.Second { + t.Errorf("expected connect timeout 2s, got %v", w.connectTimeout) + } + if w.maxRetries != 3 { + t.Errorf("expected max retries 3, got %d", w.maxRetries) + } + if w.retryInterval != 100*time.Millisecond { + t.Errorf("expected retry interval 100ms, got %v", w.retryInterval) + } +} + +func TestNewExternalWorker_BackwardCompat(t *testing.T) { + w := NewExternalWorker("/tmp/compat.sock", 3*time.Second) + if w.maxRetries != 1 { + t.Errorf("expected maxRetries=1 for backward compat, got %d", w.maxRetries) + } + if w.connectTimeout != 3*time.Second { + t.Errorf("expected connectTimeout 3s, got %v", w.connectTimeout) + } +} + +func TestExternalWorker_Start_RetrySuccess(t *testing.T) { + f, err := os.CreateTemp("/tmp", "pyproc-retry-*.sock") + if err != nil { + t.Fatal(err) + } + sockPath := f.Name() + _ = f.Close() + _ = os.Remove(sockPath) + t.Cleanup(func() { _ = os.Remove(sockPath) }) + + // Start listener after a short delay to simulate slow sidecar startup + lnCh := make(chan net.Listener, 1) + go func() { + time.Sleep(300 * time.Millisecond) + ln, listenErr := net.Listen("unix", sockPath) + if listenErr != nil { + return + } + lnCh <- ln + for { + conn, acceptErr := ln.Accept() + if acceptErr != nil { + return + } + _ = conn.Close() + } + }() + t.Cleanup(func() { + select { + case ln := <-lnCh: + _ = ln.Close() + default: + } + }) + + w := NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: sockPath, + ConnectTimeout: 100 * time.Millisecond, + MaxRetries: 5, + RetryInterval: 100 * time.Millisecond, + }) + err = w.Start(context.Background()) + if err != nil { + t.Fatalf("expected Start with retry to succeed, got: %v", err) + } + if ExternalWorkerState(w.state.Load()) != ExternalWorkerRunning { + t.Error("expected Running state") + } +} + +func TestExternalWorker_Start_AllRetriesFail(t *testing.T) { + w := NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: "/tmp/pyproc-noexist-retry.sock", + ConnectTimeout: 50 * time.Millisecond, + MaxRetries: 3, + RetryInterval: 50 * time.Millisecond, + }) + err := w.Start(context.Background()) + if err == nil { + t.Fatal("expected Start to fail after all retries") + } +} + +func TestExternalWorker_Start_ContextCancelled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + + w := NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: "/tmp/pyproc-cancel-retry.sock", + ConnectTimeout: 50 * time.Millisecond, + MaxRetries: 10, + RetryInterval: 200 * time.Millisecond, + }) + err := w.Start(ctx) + if err == nil { + t.Fatal("expected Start to fail when context cancelled") + } +}