From b1f593514a5e2c3f15f4c47e452a1cfa342f55af Mon Sep 17 00:00:00 2001 From: YuminosukeSato Date: Fri, 6 Feb 2026 22:43:47 +0900 Subject: [PATCH 1/4] feat(worker): add ExternalWorker retry with exponential backoff Stabilize sidecar mode where the Python worker may start after the Go process by adding configurable retry logic to ExternalWorker.Start(). Co-Authored-By: Claude Opus 4.6 --- pkg/pyproc/worker_external.go | 88 +++++++++++++++++---- pkg/pyproc/worker_external_test.go | 122 +++++++++++++++++++++++++++++ 2 files changed, 194 insertions(+), 16 deletions(-) diff --git a/pkg/pyproc/worker_external.go b/pkg/pyproc/worker_external.go index bda9302..ed8131a 100644 --- a/pkg/pyproc/worker_external.go +++ b/pkg/pyproc/worker_external.go @@ -18,7 +18,27 @@ const ( ExternalWorkerRunning ) -const defaultConnectTimeout = 5 * time.Second +const ( + defaultConnectTimeout = 5 * time.Second + defaultMaxRetries = 10 + defaultRetryInterval = 500 * time.Millisecond +) + +// ExternalWorkerOptions configures an ExternalWorker. +type ExternalWorkerOptions struct { + // SocketPath is the Unix Domain Socket path to connect to. + SocketPath string + // ConnectTimeout controls how long each dial attempt waits. + // If zero, defaults to 5s. + ConnectTimeout time.Duration + // MaxRetries is the maximum number of connection retry attempts in Start. + // If zero, defaults to 10. + MaxRetries int + // RetryInterval is the initial interval between retries. Each subsequent + // retry doubles the interval (exponential backoff). + // If zero, defaults to 500ms. + RetryInterval time.Duration +} // ExternalWorker represents a pre-existing Python worker process managed // outside of pyproc (e.g. a Kubernetes sidecar container). It connects to the @@ -27,33 +47,69 @@ const defaultConnectTimeout = 5 * time.Second type ExternalWorker struct { socketPath string connectTimeout time.Duration + maxRetries int + retryInterval time.Duration state atomic.Int32 } // NewExternalWorker creates a new ExternalWorker that connects to the given // Unix Domain Socket path. The connectTimeout controls how long dial attempts -// wait; if zero, a default of 5 s is used. +// wait; if zero, a default of 5 s is used. For retry support, use +// NewExternalWorkerWithOptions instead. func NewExternalWorker(socketPath string, connectTimeout time.Duration) *ExternalWorker { - if connectTimeout <= 0 { - connectTimeout = defaultConnectTimeout + return NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: socketPath, + ConnectTimeout: connectTimeout, + MaxRetries: 1, // no retry for backward compat + }) +} + +// NewExternalWorkerWithOptions creates a new ExternalWorker from options. +func NewExternalWorkerWithOptions(opts ExternalWorkerOptions) *ExternalWorker { + if opts.ConnectTimeout <= 0 { + opts.ConnectTimeout = defaultConnectTimeout + } + if opts.MaxRetries <= 0 { + opts.MaxRetries = defaultMaxRetries + } + if opts.RetryInterval <= 0 { + opts.RetryInterval = defaultRetryInterval } return &ExternalWorker{ - socketPath: socketPath, - connectTimeout: connectTimeout, + socketPath: opts.SocketPath, + connectTimeout: opts.ConnectTimeout, + maxRetries: opts.MaxRetries, + retryInterval: opts.RetryInterval, } } -// Start verifies that the external worker's socket is reachable. It does not -// spawn a process; the worker must already be running. -func (w *ExternalWorker) Start(_ context.Context) error { - conn, err := net.DialTimeout("unix", w.socketPath, w.connectTimeout) - if err != nil { - return fmt.Errorf("external worker socket unreachable at %s: %w", w.socketPath, err) +// Start verifies that the external worker's socket is reachable. It retries +// with exponential backoff according to the configured MaxRetries and +// RetryInterval. It does not spawn a process; the worker must already be +// running. +func (w *ExternalWorker) Start(ctx context.Context) error { + var lastErr error + interval := w.retryInterval + for i := 0; i < w.maxRetries; i++ { + if i > 0 { + select { + case <-ctx.Done(): + return fmt.Errorf("external worker connection cancelled: %w", ctx.Err()) + case <-time.After(interval): + } + interval *= 2 + } + conn, err := net.DialTimeout("unix", w.socketPath, w.connectTimeout) + if err != nil { + lastErr = err + continue + } + _ = conn.Close() + w.state.Store(int32(ExternalWorkerRunning)) + return nil } - _ = conn.Close() - - w.state.Store(int32(ExternalWorkerRunning)) - return nil + return fmt.Errorf("external worker socket unreachable at %s after %d attempts: %w", + w.socketPath, w.maxRetries, lastErr) } // Stop transitions the external worker to the stopped state. It does not diff --git a/pkg/pyproc/worker_external_test.go b/pkg/pyproc/worker_external_test.go index 50b6840..e6a2d44 100644 --- a/pkg/pyproc/worker_external_test.go +++ b/pkg/pyproc/worker_external_test.go @@ -143,3 +143,125 @@ func TestExternalWorker_IsHealthy_AfterListenerClose(t *testing.T) { // Verify ExternalWorker satisfies workerHandle interface at compile time. var _ workerHandle = (*ExternalWorker)(nil) + +// --- New tests for ExternalWorkerOptions and retry logic --- + +func TestExternalWorkerWithOptions_Defaults(t *testing.T) { + w := NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: "/tmp/test.sock", + }) + if w.connectTimeout != defaultConnectTimeout { + t.Errorf("expected default connect timeout %v, got %v", defaultConnectTimeout, w.connectTimeout) + } + if w.maxRetries != defaultMaxRetries { + t.Errorf("expected default max retries %d, got %d", defaultMaxRetries, w.maxRetries) + } + if w.retryInterval != defaultRetryInterval { + t.Errorf("expected default retry interval %v, got %v", defaultRetryInterval, w.retryInterval) + } +} + +func TestExternalWorkerWithOptions_CustomValues(t *testing.T) { + w := NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: "/tmp/custom.sock", + ConnectTimeout: 2 * time.Second, + MaxRetries: 3, + RetryInterval: 100 * time.Millisecond, + }) + if w.socketPath != "/tmp/custom.sock" { + t.Errorf("expected socketPath /tmp/custom.sock, got %q", w.socketPath) + } + if w.connectTimeout != 2*time.Second { + t.Errorf("expected connect timeout 2s, got %v", w.connectTimeout) + } + if w.maxRetries != 3 { + t.Errorf("expected max retries 3, got %d", w.maxRetries) + } + if w.retryInterval != 100*time.Millisecond { + t.Errorf("expected retry interval 100ms, got %v", w.retryInterval) + } +} + +func TestNewExternalWorker_BackwardCompat(t *testing.T) { + w := NewExternalWorker("/tmp/compat.sock", 3*time.Second) + if w.maxRetries != 1 { + t.Errorf("expected maxRetries=1 for backward compat, got %d", w.maxRetries) + } + if w.connectTimeout != 3*time.Second { + t.Errorf("expected connectTimeout 3s, got %v", w.connectTimeout) + } +} + +func TestExternalWorker_Start_RetrySuccess(t *testing.T) { + f, err := os.CreateTemp("/tmp", "pyproc-retry-*.sock") + if err != nil { + t.Fatal(err) + } + sockPath := f.Name() + _ = f.Close() + _ = os.Remove(sockPath) + t.Cleanup(func() { _ = os.Remove(sockPath) }) + + // Start listener after a short delay to simulate slow sidecar startup + go func() { + time.Sleep(300 * time.Millisecond) + ln, err := net.Listen("unix", sockPath) + if err != nil { + return + } + defer ln.Close() + for { + conn, err := ln.Accept() + if err != nil { + return + } + _ = conn.Close() + } + }() + + w := NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: sockPath, + ConnectTimeout: 100 * time.Millisecond, + MaxRetries: 5, + RetryInterval: 100 * time.Millisecond, + }) + err = w.Start(context.Background()) + if err != nil { + t.Fatalf("expected Start with retry to succeed, got: %v", err) + } + if ExternalWorkerState(w.state.Load()) != ExternalWorkerRunning { + t.Error("expected Running state") + } +} + +func TestExternalWorker_Start_AllRetriesFail(t *testing.T) { + w := NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: "/tmp/pyproc-noexist-retry.sock", + ConnectTimeout: 50 * time.Millisecond, + MaxRetries: 3, + RetryInterval: 50 * time.Millisecond, + }) + err := w.Start(context.Background()) + if err == nil { + t.Fatal("expected Start to fail after all retries") + } +} + +func TestExternalWorker_Start_ContextCancelled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + + w := NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: "/tmp/pyproc-cancel-retry.sock", + ConnectTimeout: 50 * time.Millisecond, + MaxRetries: 10, + RetryInterval: 200 * time.Millisecond, + }) + err := w.Start(ctx) + if err == nil { + t.Fatal("expected Start to fail when context cancelled") + } +} From d33ea1845e114efb976177a70084ed7459f7115f Mon Sep 17 00:00:00 2001 From: YuminosukeSato Date: Fri, 6 Feb 2026 22:56:00 +0900 Subject: [PATCH 2/4] fix(worker): suppress errcheck for deferred listener close in test golangci-lint v1.64.8 flags unchecked error return from defer ln.Close() inside a goroutine. Add nolint:errcheck directive. Co-Authored-By: Claude Opus 4.6 --- pkg/pyproc/worker_external_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pyproc/worker_external_test.go b/pkg/pyproc/worker_external_test.go index e6a2d44..84119e7 100644 --- a/pkg/pyproc/worker_external_test.go +++ b/pkg/pyproc/worker_external_test.go @@ -209,7 +209,7 @@ func TestExternalWorker_Start_RetrySuccess(t *testing.T) { if err != nil { return } - defer ln.Close() + defer ln.Close() //nolint:errcheck for { conn, err := ln.Accept() if err != nil { From f58d8de5751ef9d6eb3bfefbbdac8ed37f271d5a Mon Sep 17 00:00:00 2001 From: YuminosukeSato Date: Fri, 6 Feb 2026 23:09:07 +0900 Subject: [PATCH 3/4] feat(pool): route external workers through retry-capable constructor Update newExternalPool to use NewExternalWorkerWithOptions so that ExternalMode pools benefit from the retry/backoff mechanism. Add ExternalMaxRetries and ExternalRetryInterval fields to PoolOptions. Co-Authored-By: Claude Opus 4.6 --- pkg/pyproc/pool.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/pyproc/pool.go b/pkg/pyproc/pool.go index 2f4d543..1fad2fb 100644 --- a/pkg/pyproc/pool.go +++ b/pkg/pyproc/pool.go @@ -24,6 +24,13 @@ type PoolOptions struct { // spawning child processes. ExternalSocketPaths must list the UDS paths. ExternalMode bool ExternalSocketPaths []string + + // ExternalMaxRetries controls the number of connection retry attempts + // for external workers. If zero, defaults to 10. + ExternalMaxRetries int + // ExternalRetryInterval is the initial retry interval for external + // workers. Each retry doubles the interval. If zero, defaults to 500ms. + ExternalRetryInterval time.Duration } type workerHandle interface { @@ -158,7 +165,12 @@ func newExternalPool(opts PoolOptions, logger *Logger) (*Pool, error) { } for i, sockPath := range opts.ExternalSocketPaths { - worker := NewExternalWorker(sockPath, opts.WorkerConfig.StartTimeout) + worker := NewExternalWorkerWithOptions(ExternalWorkerOptions{ + SocketPath: sockPath, + ConnectTimeout: opts.WorkerConfig.StartTimeout, + MaxRetries: opts.ExternalMaxRetries, + RetryInterval: opts.ExternalRetryInterval, + }) pool.workers[i] = &poolWorker{ worker: worker, connPool: make(chan net.Conn, opts.Config.MaxInFlight), From 3f02f761f7ebb6fe2cf88d5c5826a87e3fd576ee Mon Sep 17 00:00:00 2001 From: YuminosukeSato Date: Fri, 6 Feb 2026 23:41:01 +0900 Subject: [PATCH 4/4] docs(worker): add deadline guidance to Start and fix test data race Add doc comments to Start() recommending context.WithTimeout in production, and annotate defaultMaxRetries with total wait time. Fix data race in TestExternalWorker_Start_RetrySuccess by using a channel to safely pass the listener between goroutines. Co-Authored-By: Claude Opus 4.6 --- pkg/pyproc/worker_external.go | 10 ++++++++-- pkg/pyproc/worker_external_test.go | 18 +++++++++++++----- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/pkg/pyproc/worker_external.go b/pkg/pyproc/worker_external.go index ed8131a..4fe160c 100644 --- a/pkg/pyproc/worker_external.go +++ b/pkg/pyproc/worker_external.go @@ -20,8 +20,10 @@ const ( const ( defaultConnectTimeout = 5 * time.Second - defaultMaxRetries = 10 - defaultRetryInterval = 500 * time.Millisecond + // defaultMaxRetries with defaultRetryInterval yields ~4 min total wait + // (500ms + 1s + 2s + 4s + 8s + 16s + 32s + 64s + 128s ≈ 255.5s). + defaultMaxRetries = 10 + defaultRetryInterval = 500 * time.Millisecond ) // ExternalWorkerOptions configures an ExternalWorker. @@ -87,6 +89,10 @@ func NewExternalWorkerWithOptions(opts ExternalWorkerOptions) *ExternalWorker { // with exponential backoff according to the configured MaxRetries and // RetryInterval. It does not spawn a process; the worker must already be // running. +// +// In production, callers should pass a context with a deadline to bound the +// total wait time (e.g. context.WithTimeout). Without a deadline, Start may +// block for the full backoff duration (~4 min with defaults). func (w *ExternalWorker) Start(ctx context.Context) error { var lastErr error interval := w.retryInterval diff --git a/pkg/pyproc/worker_external_test.go b/pkg/pyproc/worker_external_test.go index 84119e7..b8d2932 100644 --- a/pkg/pyproc/worker_external_test.go +++ b/pkg/pyproc/worker_external_test.go @@ -203,21 +203,29 @@ func TestExternalWorker_Start_RetrySuccess(t *testing.T) { t.Cleanup(func() { _ = os.Remove(sockPath) }) // Start listener after a short delay to simulate slow sidecar startup + lnCh := make(chan net.Listener, 1) go func() { time.Sleep(300 * time.Millisecond) - ln, err := net.Listen("unix", sockPath) - if err != nil { + ln, listenErr := net.Listen("unix", sockPath) + if listenErr != nil { return } - defer ln.Close() //nolint:errcheck + lnCh <- ln for { - conn, err := ln.Accept() - if err != nil { + conn, acceptErr := ln.Accept() + if acceptErr != nil { return } _ = conn.Close() } }() + t.Cleanup(func() { + select { + case ln := <-lnCh: + _ = ln.Close() + default: + } + }) w := NewExternalWorkerWithOptions(ExternalWorkerOptions{ SocketPath: sockPath,