Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pkg/pyproc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ type PoolOptions struct {
// spawning child processes. ExternalSocketPaths must list the UDS paths.
ExternalMode bool
ExternalSocketPaths []string

// ExternalMaxRetries controls the number of connection retry attempts
// for external workers. If zero, defaults to 10.
ExternalMaxRetries int
// ExternalRetryInterval is the initial retry interval for external
// workers. Each retry doubles the interval. If zero, defaults to 500ms.
ExternalRetryInterval time.Duration
}

type workerHandle interface {
Expand Down Expand Up @@ -158,7 +165,12 @@ func newExternalPool(opts PoolOptions, logger *Logger) (*Pool, error) {
}

for i, sockPath := range opts.ExternalSocketPaths {
worker := NewExternalWorker(sockPath, opts.WorkerConfig.StartTimeout)
worker := NewExternalWorkerWithOptions(ExternalWorkerOptions{
SocketPath: sockPath,
ConnectTimeout: opts.WorkerConfig.StartTimeout,
MaxRetries: opts.ExternalMaxRetries,
RetryInterval: opts.ExternalRetryInterval,
})
pool.workers[i] = &poolWorker{
worker: worker,
connPool: make(chan net.Conn, opts.Config.MaxInFlight),
Expand Down
94 changes: 78 additions & 16 deletions pkg/pyproc/worker_external.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,29 @@ const (
ExternalWorkerRunning
)

const defaultConnectTimeout = 5 * time.Second
const (
defaultConnectTimeout = 5 * time.Second
// defaultMaxRetries with defaultRetryInterval yields ~4 min total wait
// (500ms + 1s + 2s + 4s + 8s + 16s + 32s + 64s + 128s ≈ 255.5s).
defaultMaxRetries = 10
defaultRetryInterval = 500 * time.Millisecond
)

// ExternalWorkerOptions configures an ExternalWorker.
type ExternalWorkerOptions struct {
// SocketPath is the Unix Domain Socket path to connect to.
SocketPath string
// ConnectTimeout controls how long each dial attempt waits.
// If zero, defaults to 5s.
ConnectTimeout time.Duration
// MaxRetries is the maximum number of connection retry attempts in Start.
// If zero, defaults to 10.
MaxRetries int
// RetryInterval is the initial interval between retries. Each subsequent
// retry doubles the interval (exponential backoff).
// If zero, defaults to 500ms.
RetryInterval time.Duration
}

// ExternalWorker represents a pre-existing Python worker process managed
// outside of pyproc (e.g. a Kubernetes sidecar container). It connects to the
Expand All @@ -27,33 +49,73 @@ const defaultConnectTimeout = 5 * time.Second
type ExternalWorker struct {
socketPath string
connectTimeout time.Duration
maxRetries int
retryInterval time.Duration
state atomic.Int32
}

// NewExternalWorker creates a new ExternalWorker that connects to the given
// Unix Domain Socket path. The connectTimeout controls how long dial attempts
// wait; if zero, a default of 5 s is used.
// wait; if zero, a default of 5 s is used. For retry support, use
// NewExternalWorkerWithOptions instead.
func NewExternalWorker(socketPath string, connectTimeout time.Duration) *ExternalWorker {
if connectTimeout <= 0 {
connectTimeout = defaultConnectTimeout
return NewExternalWorkerWithOptions(ExternalWorkerOptions{
SocketPath: socketPath,
ConnectTimeout: connectTimeout,
MaxRetries: 1, // no retry for backward compat

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Route external pool workers through retry-capable constructor

This hardcoded MaxRetries: 1 keeps the legacy single-attempt behavior for every NewExternalWorker call, and newExternalPool still constructs workers via NewExternalWorker (pkg/pyproc/pool.go), so the sidecar startup path in ExternalMode does not actually get the new retry/backoff behavior and will still fail immediately when the Python socket appears shortly after pool startup.

Useful? React with 👍 / 👎.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional design choice. This PR adds the retry mechanism to ExternalWorker as a building block. The backward-compatible NewExternalWorker (MaxRetries=1) preserves existing behavior for all current callers including pool.go.

Integrating retry into the Pool's ExternalMode path requires adding retry configuration to PoolOptions and updating newExternalPool to use NewExternalWorkerWithOptions. That is planned as a follow-up PR to keep changes atomic and reviewable (one responsibility per PR).

Users who need retry today can construct workers directly with NewExternalWorkerWithOptions.

})
}

// NewExternalWorkerWithOptions creates a new ExternalWorker from options.
func NewExternalWorkerWithOptions(opts ExternalWorkerOptions) *ExternalWorker {
if opts.ConnectTimeout <= 0 {
opts.ConnectTimeout = defaultConnectTimeout
}
if opts.MaxRetries <= 0 {
opts.MaxRetries = defaultMaxRetries
}
if opts.RetryInterval <= 0 {
opts.RetryInterval = defaultRetryInterval
}
return &ExternalWorker{
socketPath: socketPath,
connectTimeout: connectTimeout,
socketPath: opts.SocketPath,
connectTimeout: opts.ConnectTimeout,
maxRetries: opts.MaxRetries,
retryInterval: opts.RetryInterval,
}
}

// Start verifies that the external worker's socket is reachable. It does not
// spawn a process; the worker must already be running.
func (w *ExternalWorker) Start(_ context.Context) error {
conn, err := net.DialTimeout("unix", w.socketPath, w.connectTimeout)
if err != nil {
return fmt.Errorf("external worker socket unreachable at %s: %w", w.socketPath, err)
// Start verifies that the external worker's socket is reachable. It retries
// with exponential backoff according to the configured MaxRetries and
// RetryInterval. It does not spawn a process; the worker must already be
// running.
//
// In production, callers should pass a context with a deadline to bound the
// total wait time (e.g. context.WithTimeout). Without a deadline, Start may
// block for the full backoff duration (~4 min with defaults).
func (w *ExternalWorker) Start(ctx context.Context) error {
var lastErr error
interval := w.retryInterval
for i := 0; i < w.maxRetries; i++ {
if i > 0 {
select {
case <-ctx.Done():
return fmt.Errorf("external worker connection cancelled: %w", ctx.Err())
case <-time.After(interval):
}
interval *= 2
}
conn, err := net.DialTimeout("unix", w.socketPath, w.connectTimeout)
if err != nil {
lastErr = err
continue
}
_ = conn.Close()
w.state.Store(int32(ExternalWorkerRunning))
return nil
}
_ = conn.Close()

w.state.Store(int32(ExternalWorkerRunning))
return nil
return fmt.Errorf("external worker socket unreachable at %s after %d attempts: %w",
w.socketPath, w.maxRetries, lastErr)
}

// Stop transitions the external worker to the stopped state. It does not
Expand Down
130 changes: 130 additions & 0 deletions pkg/pyproc/worker_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,133 @@ func TestExternalWorker_IsHealthy_AfterListenerClose(t *testing.T) {

// Verify ExternalWorker satisfies workerHandle interface at compile time.
var _ workerHandle = (*ExternalWorker)(nil)

// --- New tests for ExternalWorkerOptions and retry logic ---

func TestExternalWorkerWithOptions_Defaults(t *testing.T) {
w := NewExternalWorkerWithOptions(ExternalWorkerOptions{
SocketPath: "/tmp/test.sock",
})
if w.connectTimeout != defaultConnectTimeout {
t.Errorf("expected default connect timeout %v, got %v", defaultConnectTimeout, w.connectTimeout)
}
if w.maxRetries != defaultMaxRetries {
t.Errorf("expected default max retries %d, got %d", defaultMaxRetries, w.maxRetries)
}
if w.retryInterval != defaultRetryInterval {
t.Errorf("expected default retry interval %v, got %v", defaultRetryInterval, w.retryInterval)
}
}

func TestExternalWorkerWithOptions_CustomValues(t *testing.T) {
w := NewExternalWorkerWithOptions(ExternalWorkerOptions{
SocketPath: "/tmp/custom.sock",
ConnectTimeout: 2 * time.Second,
MaxRetries: 3,
RetryInterval: 100 * time.Millisecond,
})
if w.socketPath != "/tmp/custom.sock" {
t.Errorf("expected socketPath /tmp/custom.sock, got %q", w.socketPath)
}
if w.connectTimeout != 2*time.Second {
t.Errorf("expected connect timeout 2s, got %v", w.connectTimeout)
}
if w.maxRetries != 3 {
t.Errorf("expected max retries 3, got %d", w.maxRetries)
}
if w.retryInterval != 100*time.Millisecond {
t.Errorf("expected retry interval 100ms, got %v", w.retryInterval)
}
}

func TestNewExternalWorker_BackwardCompat(t *testing.T) {
w := NewExternalWorker("/tmp/compat.sock", 3*time.Second)
if w.maxRetries != 1 {
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backward compatibility test validates maxRetries and connectTimeout but doesn't verify that retryInterval is also set correctly. Consider adding a check for w.retryInterval != defaultRetryInterval to ensure the old constructor doesn't inadvertently enable retry behavior beyond a single attempt.

Copilot uses AI. Check for mistakes.
t.Errorf("expected maxRetries=1 for backward compat, got %d", w.maxRetries)
}
if w.connectTimeout != 3*time.Second {
t.Errorf("expected connectTimeout 3s, got %v", w.connectTimeout)
}
}

func TestExternalWorker_Start_RetrySuccess(t *testing.T) {
f, err := os.CreateTemp("/tmp", "pyproc-retry-*.sock")
if err != nil {
t.Fatal(err)
}
sockPath := f.Name()
_ = f.Close()
_ = os.Remove(sockPath)
Comment on lines +196 to +202
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using /tmp directly may cause issues on systems where /tmp is not writable or has different permissions. Consider using t.TempDir() instead, which creates a temporary directory that's automatically cleaned up and works consistently across platforms.

Copilot uses AI. Check for mistakes.
t.Cleanup(func() { _ = os.Remove(sockPath) })

// Start listener after a short delay to simulate slow sidecar startup
lnCh := make(chan net.Listener, 1)
go func() {
time.Sleep(300 * time.Millisecond)
ln, listenErr := net.Listen("unix", sockPath)
if listenErr != nil {
return
}
lnCh <- ln
for {
conn, acceptErr := ln.Accept()
if acceptErr != nil {
return
}
_ = conn.Close()
}
}()
t.Cleanup(func() {
select {
case ln := <-lnCh:
_ = ln.Close()
default:
}
})

w := NewExternalWorkerWithOptions(ExternalWorkerOptions{
SocketPath: sockPath,
ConnectTimeout: 100 * time.Millisecond,
MaxRetries: 5,
RetryInterval: 100 * time.Millisecond,
})
err = w.Start(context.Background())
if err != nil {
t.Fatalf("expected Start with retry to succeed, got: %v", err)
}
if ExternalWorkerState(w.state.Load()) != ExternalWorkerRunning {
t.Error("expected Running state")
}
}

func TestExternalWorker_Start_AllRetriesFail(t *testing.T) {
w := NewExternalWorkerWithOptions(ExternalWorkerOptions{
SocketPath: "/tmp/pyproc-noexist-retry.sock",
ConnectTimeout: 50 * time.Millisecond,
MaxRetries: 3,
RetryInterval: 50 * time.Millisecond,
})
err := w.Start(context.Background())
if err == nil {
t.Fatal("expected Start to fail after all retries")
}
}

func TestExternalWorker_Start_ContextCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()

w := NewExternalWorkerWithOptions(ExternalWorkerOptions{
SocketPath: "/tmp/pyproc-cancel-retry.sock",
ConnectTimeout: 50 * time.Millisecond,
MaxRetries: 10,
RetryInterval: 200 * time.Millisecond,
})
err := w.Start(ctx)
if err == nil {
t.Fatal("expected Start to fail when context cancelled")
}
}
Loading