From 02affd468fab388652535b2734b74f8752fd436d Mon Sep 17 00:00:00 2001 From: Pranav Dhiran Date: Thu, 14 May 2026 02:18:55 +0530 Subject: [PATCH 01/11] [retry] Add centralized retry/backoff abstraction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces retry/ — a context-aware, exponential-backoff retry package wrapping cenkalti/backoff/v4. Provides a reusable abstraction to replace ad-hoc retry loops across Meshery components. Changes: - Add retry/{retry,options,errors,retry_test}.go - Do(ctx, op, ...opts) with production-safe defaults (500 ms initial, x1.5 growth, +/-30% jitter, 2 min cap) - Permanent(err) escape hatch for non-retryable errors - IsPermanent(err) using errors.As for proper chain unwrapping - WithLogNotifier bridges retries to MeshKit logger.Handler - 15 table-driven tests, race-detector clean - Promote cenkalti/backoff/v4 v4.3.0 from indirect to direct in go.mod Relates to meshery/meshery#19275 Signed-off-by: Pranav Dhiran --- go.mod | 1 + retry/errors.go | 34 +++++ retry/options.go | 125 ++++++++++++++++ retry/retry.go | 96 ++++++++++++ retry/retry_test.go | 349 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 605 insertions(+) create mode 100644 retry/errors.go create mode 100644 retry/options.go create mode 100644 retry/retry.go create mode 100644 retry/retry_test.go diff --git a/go.mod b/go.mod index 270f0e08..1df067e0 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ replace ( require ( cuelang.org/go v0.15.1 github.com/Masterminds/semver/v3 v3.4.0 + github.com/cenkalti/backoff/v4 v4.3.0 github.com/dlclark/regexp2 v1.11.0 github.com/docker/cli v27.5.1+incompatible github.com/fluxcd/pkg/oci v0.43.1 diff --git a/retry/errors.go b/retry/errors.go new file mode 100644 index 00000000..a941d211 --- /dev/null +++ b/retry/errors.go @@ -0,0 +1,34 @@ +package retry + +import ( + "errors" + + "github.com/cenkalti/backoff/v4" +) + +// PermanentError is the type returned by Permanent. +// Callers should not create PermanentError values directly; use Permanent(err). +type PermanentError = backoff.PermanentError + +// Permanent wraps err to signal Do that no further retries should be +// attempted. The unwrapped error is returned to the caller of Do. +// +// Use this for errors that cannot be resolved by waiting: +// - HTTP 4xx (except 429 Too Many Requests) +// - Authentication / authorisation failures +// - JSON decode / schema validation failures +// - Business-logic invariant violations +// +// Do NOT use Permanent for context-cancellation errors; simply return +// ctx.Err() and the retry loop will stop on its own. +func Permanent(err error) error { + return backoff.Permanent(err) +} + +// IsPermanent reports whether err is (or wraps) a PermanentError. +// Useful in callers that need to distinguish permanent failures from +// transient ones after Do returns. +func IsPermanent(err error) bool { + var pErr *backoff.PermanentError + return errors.As(err, &pErr) +} diff --git a/retry/options.go b/retry/options.go new file mode 100644 index 00000000..92c0a361 --- /dev/null +++ b/retry/options.go @@ -0,0 +1,125 @@ +package retry + +import ( + "time" + + "github.com/meshery/meshkit/logger" +) + +const ( + // DefaultInitialInterval is the wait before the second attempt. + DefaultInitialInterval = 500 * time.Millisecond + + // DefaultMaxInterval caps the per-attempt wait ceiling. + DefaultMaxInterval = 30 * time.Second + + // DefaultMaxElapsedTime caps the total wall-clock time across all attempts. + // Zero disables this limit; pair with WithMaxAttempts instead. + DefaultMaxElapsedTime = 2 * time.Minute + + // DefaultMultiplier controls exponential growth rate of the wait interval. + DefaultMultiplier = 1.5 + + // DefaultRandomizationFactor adds ±jitter to each wait interval. + // Never set to 0 in production — it prevents thundering-herd stampedes. + DefaultRandomizationFactor = 0.3 +) + +// Config holds all tuneable knobs for a retry operation. +// The zero value is valid; Do applies production-safe defaults before the +// first attempt. Override individual fields with the With* option functions. +type Config struct { + // MaxAttempts caps the total number of calls (first attempt + retries). + // 0 means unlimited — rely on MaxElapsedTime to bound the loop instead. + MaxAttempts uint64 + + // InitialInterval is the backoff wait before the second attempt. + InitialInterval time.Duration + + // MaxInterval is the upper ceiling for a single per-attempt wait. + MaxInterval time.Duration + + // MaxElapsedTime is the upper bound on total time spent across all + // attempts (including wait intervals). 0 disables the wall-clock cap. + MaxElapsedTime time.Duration + + // Multiplier is the factor by which the interval grows each attempt. + Multiplier float64 + + // RandomizationFactor adds jitter in the range + // [interval * (1 - f), interval * (1 + f)]. + RandomizationFactor float64 + + // Notifier is called after each transient failure, before the next + // wait interval begins. It receives the error and the computed wait + // duration. nil means no-op. + Notifier func(err error, wait time.Duration) +} + +// defaultConfig returns a Config populated with production-safe defaults. +func defaultConfig() Config { + return Config{ + InitialInterval: DefaultInitialInterval, + MaxInterval: DefaultMaxInterval, + MaxElapsedTime: DefaultMaxElapsedTime, + Multiplier: DefaultMultiplier, + RandomizationFactor: DefaultRandomizationFactor, + } +} + +// Option is a functional option that mutates a Config. +type Option func(*Config) + +// WithMaxAttempts sets a hard cap on the total number of calls. +// The count includes the first (non-retry) attempt, so +// WithMaxAttempts(1) means "try once, no retries". +func WithMaxAttempts(n uint64) Option { + return func(c *Config) { c.MaxAttempts = n } +} + +// WithInitialInterval overrides the wait before the second attempt. +func WithInitialInterval(d time.Duration) Option { + return func(c *Config) { c.InitialInterval = d } +} + +// WithMaxInterval overrides the per-attempt wait ceiling. +func WithMaxInterval(d time.Duration) Option { + return func(c *Config) { c.MaxInterval = d } +} + +// WithMaxElapsedTime sets the wall-clock deadline across all attempts. +// Pass 0 to disable the elapsed-time cap entirely (use WithMaxAttempts +// to bound the loop in that case). +func WithMaxElapsedTime(d time.Duration) Option { + return func(c *Config) { c.MaxElapsedTime = d } +} + +// WithMultiplier overrides the exponential growth rate of wait intervals. +func WithMultiplier(m float64) Option { + return func(c *Config) { c.Multiplier = m } +} + +// WithJitter overrides the randomization factor (range: 0.0–1.0). +// Do not set to 0.0 in production — this disables jitter entirely, which +// can cause synchronized retry storms under load. +func WithJitter(f float64) Option { + return func(c *Config) { c.RandomizationFactor = f } +} + +// WithNotifier wires in an arbitrary per-attempt callback called on each +// transient failure, before the next wait interval begins. +func WithNotifier(n func(err error, wait time.Duration)) Option { + return func(c *Config) { c.Notifier = n } +} + +// WithLogNotifier builds a Notifier that emits a structured Warn log entry +// via MeshKit's logger.Handler each time a transient error triggers a retry. +// +// Example: +// +// err := retry.Do(ctx, op, retry.WithLogNotifier(l.Log)) +func WithLogNotifier(log logger.Handler) Option { + return WithNotifier(func(err error, wait time.Duration) { + log.Warnf("retry: transient error — retrying in %s: %v", wait.Round(time.Millisecond), err) + }) +} diff --git a/retry/retry.go b/retry/retry.go new file mode 100644 index 00000000..5f24104f --- /dev/null +++ b/retry/retry.go @@ -0,0 +1,96 @@ +// Package retry provides a centralized, context-aware exponential-backoff +// retry abstraction for use across Meshery and its satellite components. +// +// # Overview +// +// The single entry-point is [Do]. It wraps [github.com/cenkalti/backoff/v4] +// and adds: +// - Production-safe defaults (initial 500 ms, ×1.5 growth, 30 % jitter, +// 2 min total wall-clock cap). +// - Context-first design — the loop terminates immediately when ctx is +// cancelled or times out. +// - Permanent-error escape hatch — wrap non-retryable errors with +// [Permanent] to stop the loop without exhausting the retry budget. +// - Composable functional options (see the With* helpers in options.go). +// - Optional per-attempt logging via [WithLogNotifier]. +// +// # Basic usage +// +// err := retry.Do(ctx, func() error { +// resp, err := http.Get(url) +// if err != nil { +// return err // transient — will be retried +// } +// if resp.StatusCode == http.StatusBadRequest { +// return retry.Permanent(fmt.Errorf("bad request: %d", resp.StatusCode)) +// } +// return nil +// }, retry.WithMaxAttempts(5), retry.WithLogNotifier(log)) +// +// # Error classification +// +// - Network errors, HTTP 5xx, HTTP 429 → return err (transient) +// - HTTP 4xx (except 429), auth, decode → return retry.Permanent(err) +// - Context cancelled / deadline exceeded → return ctx.Err() (the loop +// exits automatically; do not wrap in Permanent) +package retry + +import ( + "context" + + "github.com/cenkalti/backoff/v4" +) + +// Operation is the function signature for retryable work. +// +// Return nil on success, [Permanent](err) to stop without further retries, +// or any plain error to trigger the next backoff wait and retry. +type Operation func() error + +// Do executes op with exponential backoff until one of the following occurs: +// - op returns nil (success) +// - op returns a [Permanent] error (stop immediately; unwrapped error returned) +// - ctx is cancelled or its deadline is exceeded +// - the configured MaxAttempts or MaxElapsedTime budget is exhausted +// +// All retry configuration is supplied via the variadic opts. When no opts are +// given, [defaultConfig] values are used (500 ms initial, ×1.5 growth, 30 % +// jitter, 2-min elapsed cap). Individual knobs are overridden with the With* +// helpers defined in options.go. +// +// Do is safe for concurrent use; each call creates its own backoff state. +func Do(ctx context.Context, op Operation, opts ...Option) error { + cfg := defaultConfig() + for _, o := range opts { + o(&cfg) + } + + b := buildBackOff(cfg) + bCtx := backoff.WithContext(b, ctx) + + if cfg.Notifier != nil { + return backoff.RetryNotify(backoff.Operation(op), bCtx, cfg.Notifier) + } + return backoff.Retry(backoff.Operation(op), bCtx) +} + +// buildBackOff constructs a cenkalti/backoff policy from the supplied Config. +// When MaxAttempts is set, the exponential backoff is wrapped with a +// WithMaxRetries limiter; otherwise the raw exponential policy is returned and +// MaxElapsedTime acts as the sole termination condition. +func buildBackOff(cfg Config) backoff.BackOff { + b := backoff.NewExponentialBackOff() + b.InitialInterval = cfg.InitialInterval + b.MaxInterval = cfg.MaxInterval + b.MaxElapsedTime = cfg.MaxElapsedTime + b.Multiplier = cfg.Multiplier + b.RandomizationFactor = cfg.RandomizationFactor + + // MaxAttempts is the total number of calls (1st attempt + retries). + // cenkalti/backoff's WithMaxRetries counts extra retries after the first, + // so we subtract 1. + if cfg.MaxAttempts > 0 { + return backoff.WithMaxRetries(b, cfg.MaxAttempts-1) + } + return b +} diff --git a/retry/retry_test.go b/retry/retry_test.go new file mode 100644 index 00000000..28459f85 --- /dev/null +++ b/retry/retry_test.go @@ -0,0 +1,349 @@ +package retry_test + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/meshery/meshkit/retry" +) + +// ─── helpers ──────────────────────────────────────────────────────────────── + +// alwaysFail returns an Operation that always returns the given error. +func alwaysFail(err error) retry.Operation { + return func() error { return err } +} + +// countingOp returns an Operation that always fails and increments *count. +func countingOp(count *atomic.Int64, err error) retry.Operation { + return func() error { + count.Add(1) + return err + } +} + +// ─── tests ────────────────────────────────────────────────────────────────── + +func TestDo_SucceedsFirstAttempt(t *testing.T) { + t.Parallel() + + calls := 0 + err := retry.Do(context.Background(), func() error { + calls++ + return nil + }, retry.WithMaxAttempts(5)) + + if err != nil { + t.Fatalf("expected nil error, got %v", err) + } + if calls != 1 { + t.Fatalf("expected op called once, got %d", calls) + } +} + +func TestDo_SucceedsAfterTransientErrors(t *testing.T) { + t.Parallel() + + transient := errors.New("transient") + var calls atomic.Int64 + + err := retry.Do(context.Background(), + func() error { + n := calls.Add(1) + if n < 4 { + return transient + } + return nil + }, + retry.WithMaxAttempts(10), + retry.WithInitialInterval(1*time.Millisecond), + retry.WithMaxInterval(5*time.Millisecond), + retry.WithMaxElapsedTime(5*time.Second), + ) + + if err != nil { + t.Fatalf("expected success after retries, got %v", err) + } + if calls.Load() != 4 { + t.Fatalf("expected 4 calls (3 failures + 1 success), got %d", calls.Load()) + } +} + +func TestDo_PermanentErrorStopsImmediately(t *testing.T) { + t.Parallel() + + permanent := errors.New("permanent failure") + calls := 0 + + err := retry.Do(context.Background(), + func() error { + calls++ + return retry.Permanent(permanent) + }, + retry.WithMaxAttempts(10), + retry.WithInitialInterval(1*time.Millisecond), + ) + + if err == nil { + t.Fatal("expected non-nil error for permanent failure") + } + if !errors.Is(err, permanent) { + t.Fatalf("expected permanent sentinel unwrapped, got %v", err) + } + if calls != 1 { + t.Fatalf("expected exactly 1 call, got %d", calls) + } +} + +func TestDo_IsPermanent_ReturnsFalseForTransient(t *testing.T) { + t.Parallel() + + err := errors.New("transient") + if retry.IsPermanent(err) { + t.Fatal("plain error should not be permanent") + } +} + +func TestDo_IsPermanent_ReturnsTrueForPermanentWrapped(t *testing.T) { + t.Parallel() + + inner := errors.New("the cause") + wrapped := retry.Permanent(inner) + if !retry.IsPermanent(wrapped) { + t.Fatal("Permanent(err) should satisfy IsPermanent") + } +} + +func TestDo_ContextCancellationStopsLoop(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + + var calls atomic.Int64 + transient := errors.New("transient") + + // Cancel the context after the first failure. + go func() { + time.Sleep(5 * time.Millisecond) + cancel() + }() + + err := retry.Do(ctx, + func() error { + calls.Add(1) + return transient + }, + retry.WithInitialInterval(50*time.Millisecond), // longer than the cancel delay + retry.WithMaxElapsedTime(10*time.Second), + ) + + if err == nil { + t.Fatal("expected error after context cancellation") + } + if calls.Load() == 0 { + t.Fatal("expected at least one call before cancellation") + } +} + +func TestDo_ContextAlreadyCancelledBeforeFirstAttempt(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + var calls atomic.Int64 + err := retry.Do(ctx, + func() error { + calls.Add(1) + return errors.New("should not reach") + }, + retry.WithMaxAttempts(5), + retry.WithInitialInterval(1*time.Millisecond), + ) + + if err == nil { + t.Fatal("expected error for pre-cancelled context") + } + // cenkalti/backoff checks the context before and after each attempt, so + // at most one call can have occurred. + if calls.Load() > 1 { + t.Fatalf("expected at most 1 call for pre-cancelled context, got %d", calls.Load()) + } +} + +func TestDo_MaxAttemptsEnforced(t *testing.T) { + t.Parallel() + + const maxAttempts = 4 + var count atomic.Int64 + + err := retry.Do(context.Background(), + countingOp(&count, errors.New("always fails")), + retry.WithMaxAttempts(maxAttempts), + retry.WithInitialInterval(1*time.Millisecond), + retry.WithMaxInterval(2*time.Millisecond), + retry.WithMaxElapsedTime(0), // disable elapsed-time cap + ) + + if err == nil { + t.Fatal("expected error when max attempts exhausted") + } + if count.Load() != maxAttempts { + t.Fatalf("expected exactly %d calls, got %d", maxAttempts, count.Load()) + } +} + +func TestDo_MaxElapsedTimeEnforced(t *testing.T) { + t.Parallel() + + start := time.Now() + const budget = 80 * time.Millisecond + + err := retry.Do(context.Background(), + alwaysFail(errors.New("always fails")), + retry.WithMaxElapsedTime(budget), + retry.WithInitialInterval(5*time.Millisecond), + retry.WithMaxInterval(10*time.Millisecond), + retry.WithJitter(0), // deterministic for timing assertions + ) + + elapsed := time.Since(start) + if err == nil { + t.Fatal("expected error when elapsed time exceeded") + } + // Allow a 3x grace factor for slow CI runners. + if elapsed > 3*budget { + t.Fatalf("loop ran for %s, expected <= %s", elapsed, 3*budget) + } +} + +func TestDo_NotifierCalledOnEachRetry(t *testing.T) { + t.Parallel() + + const failures = 3 + transient := errors.New("transient") + var notifyCount atomic.Int64 + + notifier := func(err error, wait time.Duration) { + notifyCount.Add(1) + if !errors.Is(err, transient) { + t.Errorf("notifier: unexpected error %v", err) + } + } + + var calls atomic.Int64 + _ = retry.Do(context.Background(), + func() error { + if calls.Add(1) <= failures { + return transient + } + return nil + }, + retry.WithMaxAttempts(10), + retry.WithInitialInterval(1*time.Millisecond), + retry.WithMaxInterval(2*time.Millisecond), + retry.WithNotifier(notifier), + ) + + if notifyCount.Load() != failures { + t.Fatalf("expected notifier called %d times, got %d", failures, notifyCount.Load()) + } +} + +func TestDo_NotifierNotCalledOnImmediateSuccess(t *testing.T) { + t.Parallel() + + var notifyCount atomic.Int64 + _ = retry.Do(context.Background(), + func() error { return nil }, + retry.WithNotifier(func(err error, wait time.Duration) { + notifyCount.Add(1) + }), + ) + if notifyCount.Load() != 0 { + t.Fatalf("notifier should not be called on immediate success, called %d time(s)", notifyCount.Load()) + } +} + +func TestDo_NotifierNotCalledOnPermanentError(t *testing.T) { + t.Parallel() + + var notifyCount atomic.Int64 + _ = retry.Do(context.Background(), + func() error { return retry.Permanent(errors.New("perm")) }, + retry.WithMaxAttempts(5), + retry.WithInitialInterval(1*time.Millisecond), + retry.WithNotifier(func(err error, wait time.Duration) { + notifyCount.Add(1) + }), + ) + // A permanent error stops immediately; the notifier must not be spammed. + if notifyCount.Load() > 1 { + t.Fatalf("notifier called %d times for permanent error, expected <= 1", notifyCount.Load()) + } +} + +func TestDo_ZeroMaxAttemptsMeansUnlimited(t *testing.T) { + t.Parallel() + + // MaxAttempts(0) should fall back to elapsed-time only. + err := retry.Do(context.Background(), + alwaysFail(errors.New("always fails")), + retry.WithMaxAttempts(0), + retry.WithMaxElapsedTime(50*time.Millisecond), + retry.WithInitialInterval(5*time.Millisecond), + retry.WithMaxInterval(10*time.Millisecond), + ) + if err == nil { + t.Fatal("expected error when elapsed time runs out with unlimited attempts") + } +} + +func TestDo_WithMaxAttemptsOneNoRetry(t *testing.T) { + t.Parallel() + + var calls atomic.Int64 + err := retry.Do(context.Background(), + countingOp(&calls, errors.New("fail")), + retry.WithMaxAttempts(1), + retry.WithInitialInterval(1*time.Millisecond), + retry.WithMaxElapsedTime(0), + ) + if err == nil { + t.Fatal("expected error") + } + if calls.Load() != 1 { + t.Fatalf("WithMaxAttempts(1) should allow exactly 1 call, got %d", calls.Load()) + } +} + +func TestDo_DefaultsAreApplied(t *testing.T) { + t.Parallel() + + // Smoke-test: Do with zero opts should not panic and should retry at least + // once. We use a 2 s context so the first retry (default 500 ms initial + // interval + up-to-30 % jitter = at most ~650 ms) lands well within budget + // even on slow CI runners. + transient := errors.New("transient") + var calls atomic.Int64 + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + _ = retry.Do(ctx, + func() error { + if calls.Add(1) >= 2 { + return nil + } + return transient + }, + // No options — pure defaults. + ) + + if calls.Load() < 2 { + t.Fatalf("expected at least 2 calls with default config, got %d", calls.Load()) + } +} From 3f5e74e2e0aa415217e046462dfc1b7cf1b7762d Mon Sep 17 00:00:00 2001 From: Pranav Dhiran Date: Thu, 14 May 2026 02:32:48 +0530 Subject: [PATCH 02/11] Update retry/retry.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Pranav Dhiran --- retry/retry.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/retry/retry.go b/retry/retry.go index 5f24104f..375eb3f7 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -60,6 +60,9 @@ type Operation func() error // // Do is safe for concurrent use; each call creates its own backoff state. func Do(ctx context.Context, op Operation, opts ...Option) error { + if err := ctx.Err(); err != nil { + return err + } cfg := defaultConfig() for _, o := range opts { o(&cfg) From f189bc6bf8a29b4c889ccca298f49109ffb9653d Mon Sep 17 00:00:00 2001 From: Pranav Dhiran Date: Thu, 14 May 2026 02:33:20 +0530 Subject: [PATCH 03/11] Update retry/retry_test.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Pranav Dhiran --- retry/retry_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/retry/retry_test.go b/retry/retry_test.go index 28459f85..8a1b3b0d 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -281,8 +281,8 @@ func TestDo_NotifierNotCalledOnPermanentError(t *testing.T) { }), ) // A permanent error stops immediately; the notifier must not be spammed. - if notifyCount.Load() > 1 { - t.Fatalf("notifier called %d times for permanent error, expected <= 1", notifyCount.Load()) + if notifyCount.Load() != 0 { + t.Fatalf("notifier called %d times for permanent error, expected 0", notifyCount.Load()) } } From 6e8e9e7ce4466cdeab97859bcce2c343c47bff4c Mon Sep 17 00:00:00 2001 From: Pranav Dhiran Date: Thu, 14 May 2026 02:41:47 +0530 Subject: [PATCH 04/11] Address review comments on retry abstraction - Simplified RetryNotify call - Added TODO about missing context.Context in Operation signature - Updated example to use context-aware http.NewRequestWithContext Signed-off-by: Pranav Dhiran --- retry/retry.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/retry/retry.go b/retry/retry.go index 375eb3f7..fe840a90 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -17,7 +17,8 @@ // # Basic usage // // err := retry.Do(ctx, func() error { -// resp, err := http.Get(url) +// req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) +// resp, err := http.DefaultClient.Do(req) // if err != nil { // return err // transient — will be retried // } @@ -43,6 +44,11 @@ import ( // Operation is the function signature for retryable work. // +// TODO: The Operation signature currently lacks a context.Context, which prevents +// operations from being interrupted immediately upon context cancellation. +// A follow-up task should be created for the full implementation to avoid bloating +// the current pull request. +// // Return nil on success, [Permanent](err) to stop without further retries, // or any plain error to trigger the next backoff wait and retry. type Operation func() error @@ -71,10 +77,7 @@ func Do(ctx context.Context, op Operation, opts ...Option) error { b := buildBackOff(cfg) bCtx := backoff.WithContext(b, ctx) - if cfg.Notifier != nil { - return backoff.RetryNotify(backoff.Operation(op), bCtx, cfg.Notifier) - } - return backoff.Retry(backoff.Operation(op), bCtx) + return backoff.RetryNotify(backoff.Operation(op), bCtx, cfg.Notifier) } // buildBackOff constructs a cenkalti/backoff policy from the supplied Config. From 702f0a15574a8caac9fb5bb5815317d239f46356 Mon Sep 17 00:00:00 2001 From: Pranav Dhiran Date: Thu, 14 May 2026 02:54:28 +0530 Subject: [PATCH 05/11] test: add test for doubly-wrapped Permanent errors Signed-off-by: Pranav Dhiran --- retry/retry_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/retry/retry_test.go b/retry/retry_test.go index 8a1b3b0d..4440de2a 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -3,6 +3,7 @@ package retry_test import ( "context" "errors" + "fmt" "sync/atomic" "testing" "time" @@ -117,6 +118,16 @@ func TestDo_IsPermanent_ReturnsTrueForPermanentWrapped(t *testing.T) { } } +func TestDo_IsPermanent_HandlesDoublyWrappedErrors(t *testing.T) { + t.Parallel() + + inner := errors.New("the cause") + wrapped := fmt.Errorf("outer layer: %w", retry.Permanent(inner)) + if !retry.IsPermanent(wrapped) { + t.Fatal("IsPermanent should unwrap error chains successfully") + } +} + func TestDo_ContextCancellationStopsLoop(t *testing.T) { t.Parallel() From 08b2ca735860325d5639060f18a5312ab676f0f4 Mon Sep 17 00:00:00 2001 From: Pranav Dhiran Date: Thu, 14 May 2026 14:52:25 +0530 Subject: [PATCH 06/11] [retry] Standardize package conventions - Rename errors.go to error.go - Pass context.Context to Operation signature - Use structured logging in WithLogNotifier Signed-off-by: Pranav Dhiran --- retry/{errors.go => error.go} | 0 retry/options.go | 3 ++- retry/retry.go | 11 +++-------- retry/retry_test.go | 22 +++++++++++----------- 4 files changed, 16 insertions(+), 20 deletions(-) rename retry/{errors.go => error.go} (100%) diff --git a/retry/errors.go b/retry/error.go similarity index 100% rename from retry/errors.go rename to retry/error.go diff --git a/retry/options.go b/retry/options.go index 92c0a361..24f7ee69 100644 --- a/retry/options.go +++ b/retry/options.go @@ -120,6 +120,7 @@ func WithNotifier(n func(err error, wait time.Duration)) Option { // err := retry.Do(ctx, op, retry.WithLogNotifier(l.Log)) func WithLogNotifier(log logger.Handler) Option { return WithNotifier(func(err error, wait time.Duration) { - log.Warnf("retry: transient error — retrying in %s: %v", wait.Round(time.Millisecond), err) + log.Infof("retry: transient error — retrying in %s", wait.Round(time.Millisecond)) + log.Warn(err) }) } diff --git a/retry/retry.go b/retry/retry.go index fe840a90..24b0dc2a 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -16,7 +16,7 @@ // // # Basic usage // -// err := retry.Do(ctx, func() error { +// err := retry.Do(ctx, func(ctx context.Context) error { // req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) // resp, err := http.DefaultClient.Do(req) // if err != nil { @@ -44,14 +44,9 @@ import ( // Operation is the function signature for retryable work. // -// TODO: The Operation signature currently lacks a context.Context, which prevents -// operations from being interrupted immediately upon context cancellation. -// A follow-up task should be created for the full implementation to avoid bloating -// the current pull request. -// // Return nil on success, [Permanent](err) to stop without further retries, // or any plain error to trigger the next backoff wait and retry. -type Operation func() error +type Operation func(ctx context.Context) error // Do executes op with exponential backoff until one of the following occurs: // - op returns nil (success) @@ -77,7 +72,7 @@ func Do(ctx context.Context, op Operation, opts ...Option) error { b := buildBackOff(cfg) bCtx := backoff.WithContext(b, ctx) - return backoff.RetryNotify(backoff.Operation(op), bCtx, cfg.Notifier) + return backoff.RetryNotify(func() error { return op(ctx) }, bCtx, cfg.Notifier) } // buildBackOff constructs a cenkalti/backoff policy from the supplied Config. diff --git a/retry/retry_test.go b/retry/retry_test.go index 4440de2a..b7af8e5b 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -15,12 +15,12 @@ import ( // alwaysFail returns an Operation that always returns the given error. func alwaysFail(err error) retry.Operation { - return func() error { return err } + return func(ctx context.Context) error { return err } } // countingOp returns an Operation that always fails and increments *count. func countingOp(count *atomic.Int64, err error) retry.Operation { - return func() error { + return func(ctx context.Context) error { count.Add(1) return err } @@ -32,7 +32,7 @@ func TestDo_SucceedsFirstAttempt(t *testing.T) { t.Parallel() calls := 0 - err := retry.Do(context.Background(), func() error { + err := retry.Do(context.Background(), func(ctx context.Context) error { calls++ return nil }, retry.WithMaxAttempts(5)) @@ -52,7 +52,7 @@ func TestDo_SucceedsAfterTransientErrors(t *testing.T) { var calls atomic.Int64 err := retry.Do(context.Background(), - func() error { + func(ctx context.Context) error { n := calls.Add(1) if n < 4 { return transient @@ -80,7 +80,7 @@ func TestDo_PermanentErrorStopsImmediately(t *testing.T) { calls := 0 err := retry.Do(context.Background(), - func() error { + func(ctx context.Context) error { calls++ return retry.Permanent(permanent) }, @@ -143,7 +143,7 @@ func TestDo_ContextCancellationStopsLoop(t *testing.T) { }() err := retry.Do(ctx, - func() error { + func(ctx context.Context) error { calls.Add(1) return transient }, @@ -167,7 +167,7 @@ func TestDo_ContextAlreadyCancelledBeforeFirstAttempt(t *testing.T) { var calls atomic.Int64 err := retry.Do(ctx, - func() error { + func(ctx context.Context) error { calls.Add(1) return errors.New("should not reach") }, @@ -247,7 +247,7 @@ func TestDo_NotifierCalledOnEachRetry(t *testing.T) { var calls atomic.Int64 _ = retry.Do(context.Background(), - func() error { + func(ctx context.Context) error { if calls.Add(1) <= failures { return transient } @@ -269,7 +269,7 @@ func TestDo_NotifierNotCalledOnImmediateSuccess(t *testing.T) { var notifyCount atomic.Int64 _ = retry.Do(context.Background(), - func() error { return nil }, + func(ctx context.Context) error { return nil }, retry.WithNotifier(func(err error, wait time.Duration) { notifyCount.Add(1) }), @@ -284,7 +284,7 @@ func TestDo_NotifierNotCalledOnPermanentError(t *testing.T) { var notifyCount atomic.Int64 _ = retry.Do(context.Background(), - func() error { return retry.Permanent(errors.New("perm")) }, + func(ctx context.Context) error { return retry.Permanent(errors.New("perm")) }, retry.WithMaxAttempts(5), retry.WithInitialInterval(1*time.Millisecond), retry.WithNotifier(func(err error, wait time.Duration) { @@ -345,7 +345,7 @@ func TestDo_DefaultsAreApplied(t *testing.T) { defer cancel() _ = retry.Do(ctx, - func() error { + func(ctx context.Context) error { if calls.Add(1) >= 2 { return nil } From 628892a001cd9fd252ef2c36ec2d53265936eb11 Mon Sep 17 00:00:00 2001 From: Pranav Dhiran Date: Fri, 15 May 2026 15:26:40 +0530 Subject: [PATCH 07/11] Migrate retry package to backoff v5 Signed-off-by: Pranav Dhiran Closes #1008 Signed-off-by: Pranav Dhiran --- go.mod | 3 +- retry/error.go | 20 +++-------- retry/options.go | 82 +++++++++---------------------------------- retry/retry.go | 84 +++++++++------------------------------------ retry/retry_test.go | 49 +++++++++----------------- 5 files changed, 54 insertions(+), 184 deletions(-) diff --git a/go.mod b/go.mod index 1df067e0..cc64e4bb 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ replace ( require ( cuelang.org/go v0.15.1 github.com/Masterminds/semver/v3 v3.4.0 - github.com/cenkalti/backoff/v4 v4.3.0 + github.com/cenkalti/backoff/v5 v5.0.3 github.com/dlclark/regexp2 v1.11.0 github.com/docker/cli v27.5.1+incompatible github.com/fluxcd/pkg/oci v0.43.1 @@ -101,7 +101,6 @@ require ( github.com/aws/smithy-go v1.24.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect - github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chai2010/gettext-go v1.0.3 // indirect github.com/clipperhouse/stringish v0.1.1 // indirect diff --git a/retry/error.go b/retry/error.go index a941d211..d7255b65 100644 --- a/retry/error.go +++ b/retry/error.go @@ -3,31 +3,19 @@ package retry import ( "errors" - "github.com/cenkalti/backoff/v4" + "github.com/cenkalti/backoff/v5" ) -// PermanentError is the type returned by Permanent. -// Callers should not create PermanentError values directly; use Permanent(err). type PermanentError = backoff.PermanentError -// Permanent wraps err to signal Do that no further retries should be -// attempted. The unwrapped error is returned to the caller of Do. -// -// Use this for errors that cannot be resolved by waiting: -// - HTTP 4xx (except 429 Too Many Requests) -// - Authentication / authorisation failures -// - JSON decode / schema validation failures -// - Business-logic invariant violations -// -// Do NOT use Permanent for context-cancellation errors; simply return -// ctx.Err() and the retry loop will stop on its own. +// Permanent wraps err to signal no further retries should be attempted. +// Use for non-transient errors (HTTP 4xx, auth failures, validation errors). +// Do NOT use for context-cancellation; return ctx.Err() directly. func Permanent(err error) error { return backoff.Permanent(err) } // IsPermanent reports whether err is (or wraps) a PermanentError. -// Useful in callers that need to distinguish permanent failures from -// transient ones after Do returns. func IsPermanent(err error) bool { var pErr *backoff.PermanentError return errors.As(err, &pErr) diff --git a/retry/options.go b/retry/options.go index 24f7ee69..fe33cdd8 100644 --- a/retry/options.go +++ b/retry/options.go @@ -7,56 +7,23 @@ import ( ) const ( - // DefaultInitialInterval is the wait before the second attempt. - DefaultInitialInterval = 500 * time.Millisecond - - // DefaultMaxInterval caps the per-attempt wait ceiling. - DefaultMaxInterval = 30 * time.Second - - // DefaultMaxElapsedTime caps the total wall-clock time across all attempts. - // Zero disables this limit; pair with WithMaxAttempts instead. - DefaultMaxElapsedTime = 2 * time.Minute - - // DefaultMultiplier controls exponential growth rate of the wait interval. - DefaultMultiplier = 1.5 - - // DefaultRandomizationFactor adds ±jitter to each wait interval. - // Never set to 0 in production — it prevents thundering-herd stampedes. - DefaultRandomizationFactor = 0.3 + DefaultInitialInterval = 500 * time.Millisecond + DefaultMaxInterval = 30 * time.Second + DefaultMaxElapsedTime = 2 * time.Minute + DefaultMultiplier = 1.5 + DefaultRandomizationFactor = 0.3 // Never set to 0 in production ) -// Config holds all tuneable knobs for a retry operation. -// The zero value is valid; Do applies production-safe defaults before the -// first attempt. Override individual fields with the With* option functions. type Config struct { - // MaxAttempts caps the total number of calls (first attempt + retries). - // 0 means unlimited — rely on MaxElapsedTime to bound the loop instead. - MaxAttempts uint64 - - // InitialInterval is the backoff wait before the second attempt. - InitialInterval time.Duration - - // MaxInterval is the upper ceiling for a single per-attempt wait. - MaxInterval time.Duration - - // MaxElapsedTime is the upper bound on total time spent across all - // attempts (including wait intervals). 0 disables the wall-clock cap. - MaxElapsedTime time.Duration - - // Multiplier is the factor by which the interval grows each attempt. - Multiplier float64 - - // RandomizationFactor adds jitter in the range - // [interval * (1 - f), interval * (1 + f)]. + MaxAttempts uint64 + InitialInterval time.Duration + MaxInterval time.Duration + MaxElapsedTime time.Duration + Multiplier float64 RandomizationFactor float64 - - // Notifier is called after each transient failure, before the next - // wait interval begins. It receives the error and the computed wait - // duration. nil means no-op. - Notifier func(err error, wait time.Duration) + Notifier func(err error, wait time.Duration) } -// defaultConfig returns a Config populated with production-safe defaults. func defaultConfig() Config { return Config{ InitialInterval: DefaultInitialInterval, @@ -67,60 +34,43 @@ func defaultConfig() Config { } } -// Option is a functional option that mutates a Config. type Option func(*Config) -// WithMaxAttempts sets a hard cap on the total number of calls. -// The count includes the first (non-retry) attempt, so -// WithMaxAttempts(1) means "try once, no retries". +// WithMaxAttempts sets a hard cap on total calls (includes first attempt). func WithMaxAttempts(n uint64) Option { return func(c *Config) { c.MaxAttempts = n } } -// WithInitialInterval overrides the wait before the second attempt. func WithInitialInterval(d time.Duration) Option { return func(c *Config) { c.InitialInterval = d } } -// WithMaxInterval overrides the per-attempt wait ceiling. func WithMaxInterval(d time.Duration) Option { return func(c *Config) { c.MaxInterval = d } } -// WithMaxElapsedTime sets the wall-clock deadline across all attempts. -// Pass 0 to disable the elapsed-time cap entirely (use WithMaxAttempts -// to bound the loop in that case). +// WithMaxElapsedTime sets wall-clock deadline. Pass 0 to disable. func WithMaxElapsedTime(d time.Duration) Option { return func(c *Config) { c.MaxElapsedTime = d } } -// WithMultiplier overrides the exponential growth rate of wait intervals. func WithMultiplier(m float64) Option { return func(c *Config) { c.Multiplier = m } } -// WithJitter overrides the randomization factor (range: 0.0–1.0). -// Do not set to 0.0 in production — this disables jitter entirely, which -// can cause synchronized retry storms under load. +// WithJitter overrides randomization factor (range: 0.0-1.0). Do not set to 0.0 in production. func WithJitter(f float64) Option { return func(c *Config) { c.RandomizationFactor = f } } -// WithNotifier wires in an arbitrary per-attempt callback called on each -// transient failure, before the next wait interval begins. func WithNotifier(n func(err error, wait time.Duration)) Option { return func(c *Config) { c.Notifier = n } } -// WithLogNotifier builds a Notifier that emits a structured Warn log entry -// via MeshKit's logger.Handler each time a transient error triggers a retry. -// -// Example: -// -// err := retry.Do(ctx, op, retry.WithLogNotifier(l.Log)) +// WithLogNotifier emits a Warn log entry on each retry via MeshKit's logger.Handler. func WithLogNotifier(log logger.Handler) Option { return WithNotifier(func(err error, wait time.Duration) { - log.Infof("retry: transient error — retrying in %s", wait.Round(time.Millisecond)) + log.Infof("retry: transient error; retrying in %s", wait.Round(time.Millisecond)) log.Warn(err) }) } diff --git a/retry/retry.go b/retry/retry.go index 24b0dc2a..815013b2 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -1,65 +1,16 @@ -// Package retry provides a centralized, context-aware exponential-backoff -// retry abstraction for use across Meshery and its satellite components. -// -// # Overview -// -// The single entry-point is [Do]. It wraps [github.com/cenkalti/backoff/v4] -// and adds: -// - Production-safe defaults (initial 500 ms, ×1.5 growth, 30 % jitter, -// 2 min total wall-clock cap). -// - Context-first design — the loop terminates immediately when ctx is -// cancelled or times out. -// - Permanent-error escape hatch — wrap non-retryable errors with -// [Permanent] to stop the loop without exhausting the retry budget. -// - Composable functional options (see the With* helpers in options.go). -// - Optional per-attempt logging via [WithLogNotifier]. -// -// # Basic usage -// -// err := retry.Do(ctx, func(ctx context.Context) error { -// req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) -// resp, err := http.DefaultClient.Do(req) -// if err != nil { -// return err // transient — will be retried -// } -// if resp.StatusCode == http.StatusBadRequest { -// return retry.Permanent(fmt.Errorf("bad request: %d", resp.StatusCode)) -// } -// return nil -// }, retry.WithMaxAttempts(5), retry.WithLogNotifier(log)) -// -// # Error classification -// -// - Network errors, HTTP 5xx, HTTP 429 → return err (transient) -// - HTTP 4xx (except 429), auth, decode → return retry.Permanent(err) -// - Context cancelled / deadline exceeded → return ctx.Err() (the loop -// exits automatically; do not wrap in Permanent) package retry import ( "context" - "github.com/cenkalti/backoff/v4" + "github.com/cenkalti/backoff/v5" ) -// Operation is the function signature for retryable work. -// -// Return nil on success, [Permanent](err) to stop without further retries, -// or any plain error to trigger the next backoff wait and retry. type Operation func(ctx context.Context) error -// Do executes op with exponential backoff until one of the following occurs: -// - op returns nil (success) -// - op returns a [Permanent] error (stop immediately; unwrapped error returned) -// - ctx is cancelled or its deadline is exceeded -// - the configured MaxAttempts or MaxElapsedTime budget is exhausted -// -// All retry configuration is supplied via the variadic opts. When no opts are -// given, [defaultConfig] values are used (500 ms initial, ×1.5 growth, 30 % -// jitter, 2-min elapsed cap). Individual knobs are overridden with the With* -// helpers defined in options.go. -// -// Do is safe for concurrent use; each call creates its own backoff state. +// Do executes op with exponential backoff until success, permanent error, +// context cancellation, or budget exhaustion. Config via opts (default: +// 500ms initial, 1.5x growth, 30% jitter, 2min cap). func Do(ctx context.Context, op Operation, opts ...Option) error { if err := ctx.Err(); err != nil { return err @@ -69,29 +20,28 @@ func Do(ctx context.Context, op Operation, opts ...Option) error { o(&cfg) } - b := buildBackOff(cfg) - bCtx := backoff.WithContext(b, ctx) + retryOpts := []backoff.RetryOption{ + backoff.WithBackOff(buildBackOff(cfg)), + backoff.WithMaxElapsedTime(cfg.MaxElapsedTime), + backoff.WithNotify(cfg.Notifier), + } + if cfg.MaxAttempts > 0 { + retryOpts = append(retryOpts, backoff.WithMaxTries(uint(cfg.MaxAttempts))) + } - return backoff.RetryNotify(func() error { return op(ctx) }, bCtx, cfg.Notifier) + _, err := backoff.Retry(ctx, func() (struct{}, error) { + return struct{}{}, op(ctx) + }, retryOpts...) + return err } -// buildBackOff constructs a cenkalti/backoff policy from the supplied Config. -// When MaxAttempts is set, the exponential backoff is wrapped with a -// WithMaxRetries limiter; otherwise the raw exponential policy is returned and -// MaxElapsedTime acts as the sole termination condition. +// buildBackOff constructs a backoff policy from Config. func buildBackOff(cfg Config) backoff.BackOff { b := backoff.NewExponentialBackOff() b.InitialInterval = cfg.InitialInterval b.MaxInterval = cfg.MaxInterval - b.MaxElapsedTime = cfg.MaxElapsedTime b.Multiplier = cfg.Multiplier b.RandomizationFactor = cfg.RandomizationFactor - // MaxAttempts is the total number of calls (1st attempt + retries). - // cenkalti/backoff's WithMaxRetries counts extra retries after the first, - // so we subtract 1. - if cfg.MaxAttempts > 0 { - return backoff.WithMaxRetries(b, cfg.MaxAttempts-1) - } return b } diff --git a/retry/retry_test.go b/retry/retry_test.go index b7af8e5b..1b4b1df6 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -11,14 +11,10 @@ import ( "github.com/meshery/meshkit/retry" ) -// ─── helpers ──────────────────────────────────────────────────────────────── - -// alwaysFail returns an Operation that always returns the given error. func alwaysFail(err error) retry.Operation { return func(ctx context.Context) error { return err } } -// countingOp returns an Operation that always fails and increments *count. func countingOp(count *atomic.Int64, err error) retry.Operation { return func(ctx context.Context) error { count.Add(1) @@ -26,9 +22,7 @@ func countingOp(count *atomic.Int64, err error) retry.Operation { } } -// ─── tests ────────────────────────────────────────────────────────────────── - -func TestDo_SucceedsFirstAttempt(t *testing.T) { +func TestRetrySucceedsFirstAttempt(t *testing.T) { t.Parallel() calls := 0 @@ -45,7 +39,7 @@ func TestDo_SucceedsFirstAttempt(t *testing.T) { } } -func TestDo_SucceedsAfterTransientErrors(t *testing.T) { +func TestRetrySucceedsAfterTransientErrors(t *testing.T) { t.Parallel() transient := errors.New("transient") @@ -73,7 +67,7 @@ func TestDo_SucceedsAfterTransientErrors(t *testing.T) { } } -func TestDo_PermanentErrorStopsImmediately(t *testing.T) { +func TestRetryPermanentErrorStopsImmediately(t *testing.T) { t.Parallel() permanent := errors.New("permanent failure") @@ -99,7 +93,7 @@ func TestDo_PermanentErrorStopsImmediately(t *testing.T) { } } -func TestDo_IsPermanent_ReturnsFalseForTransient(t *testing.T) { +func TestIsPermanentReturnsFalseForTransient(t *testing.T) { t.Parallel() err := errors.New("transient") @@ -108,7 +102,7 @@ func TestDo_IsPermanent_ReturnsFalseForTransient(t *testing.T) { } } -func TestDo_IsPermanent_ReturnsTrueForPermanentWrapped(t *testing.T) { +func TestIsPermanentReturnsTrueForPermanentWrapped(t *testing.T) { t.Parallel() inner := errors.New("the cause") @@ -118,7 +112,7 @@ func TestDo_IsPermanent_ReturnsTrueForPermanentWrapped(t *testing.T) { } } -func TestDo_IsPermanent_HandlesDoublyWrappedErrors(t *testing.T) { +func TestIsPermanentHandlesDoublyWrappedErrors(t *testing.T) { t.Parallel() inner := errors.New("the cause") @@ -128,7 +122,7 @@ func TestDo_IsPermanent_HandlesDoublyWrappedErrors(t *testing.T) { } } -func TestDo_ContextCancellationStopsLoop(t *testing.T) { +func TestRetryContextCancellationStopsLoop(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) @@ -136,7 +130,6 @@ func TestDo_ContextCancellationStopsLoop(t *testing.T) { var calls atomic.Int64 transient := errors.New("transient") - // Cancel the context after the first failure. go func() { time.Sleep(5 * time.Millisecond) cancel() @@ -159,7 +152,7 @@ func TestDo_ContextCancellationStopsLoop(t *testing.T) { } } -func TestDo_ContextAlreadyCancelledBeforeFirstAttempt(t *testing.T) { +func TestRetryContextAlreadyCancelledBeforeFirstAttempt(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) @@ -178,14 +171,12 @@ func TestDo_ContextAlreadyCancelledBeforeFirstAttempt(t *testing.T) { if err == nil { t.Fatal("expected error for pre-cancelled context") } - // cenkalti/backoff checks the context before and after each attempt, so - // at most one call can have occurred. if calls.Load() > 1 { t.Fatalf("expected at most 1 call for pre-cancelled context, got %d", calls.Load()) } } -func TestDo_MaxAttemptsEnforced(t *testing.T) { +func TestRetryMaxAttemptsEnforced(t *testing.T) { t.Parallel() const maxAttempts = 4 @@ -207,7 +198,7 @@ func TestDo_MaxAttemptsEnforced(t *testing.T) { } } -func TestDo_MaxElapsedTimeEnforced(t *testing.T) { +func TestRetryMaxElapsedTimeEnforced(t *testing.T) { t.Parallel() start := time.Now() @@ -225,13 +216,12 @@ func TestDo_MaxElapsedTimeEnforced(t *testing.T) { if err == nil { t.Fatal("expected error when elapsed time exceeded") } - // Allow a 3x grace factor for slow CI runners. if elapsed > 3*budget { t.Fatalf("loop ran for %s, expected <= %s", elapsed, 3*budget) } } -func TestDo_NotifierCalledOnEachRetry(t *testing.T) { +func TestRetryNotifierCalledOnEachRetry(t *testing.T) { t.Parallel() const failures = 3 @@ -264,7 +254,7 @@ func TestDo_NotifierCalledOnEachRetry(t *testing.T) { } } -func TestDo_NotifierNotCalledOnImmediateSuccess(t *testing.T) { +func TestRetryNotifierNotCalledOnImmediateSuccess(t *testing.T) { t.Parallel() var notifyCount atomic.Int64 @@ -279,7 +269,7 @@ func TestDo_NotifierNotCalledOnImmediateSuccess(t *testing.T) { } } -func TestDo_NotifierNotCalledOnPermanentError(t *testing.T) { +func TestRetryNotifierNotCalledOnPermanentError(t *testing.T) { t.Parallel() var notifyCount atomic.Int64 @@ -291,16 +281,14 @@ func TestDo_NotifierNotCalledOnPermanentError(t *testing.T) { notifyCount.Add(1) }), ) - // A permanent error stops immediately; the notifier must not be spammed. if notifyCount.Load() != 0 { t.Fatalf("notifier called %d times for permanent error, expected 0", notifyCount.Load()) } } -func TestDo_ZeroMaxAttemptsMeansUnlimited(t *testing.T) { +func TestRetryZeroMaxAttemptsMeansUnlimited(t *testing.T) { t.Parallel() - // MaxAttempts(0) should fall back to elapsed-time only. err := retry.Do(context.Background(), alwaysFail(errors.New("always fails")), retry.WithMaxAttempts(0), @@ -313,7 +301,7 @@ func TestDo_ZeroMaxAttemptsMeansUnlimited(t *testing.T) { } } -func TestDo_WithMaxAttemptsOneNoRetry(t *testing.T) { +func TestRetryWithMaxAttemptsOneNoRetry(t *testing.T) { t.Parallel() var calls atomic.Int64 @@ -331,13 +319,9 @@ func TestDo_WithMaxAttemptsOneNoRetry(t *testing.T) { } } -func TestDo_DefaultsAreApplied(t *testing.T) { +func TestRetryDefaultsAreApplied(t *testing.T) { t.Parallel() - // Smoke-test: Do with zero opts should not panic and should retry at least - // once. We use a 2 s context so the first retry (default 500 ms initial - // interval + up-to-30 % jitter = at most ~650 ms) lands well within budget - // even on slow CI runners. transient := errors.New("transient") var calls atomic.Int64 @@ -351,7 +335,6 @@ func TestDo_DefaultsAreApplied(t *testing.T) { } return transient }, - // No options — pure defaults. ) if calls.Load() < 2 { From eba9ca55504de8fef45a4b0f9896f9fcaa373db9 Mon Sep 17 00:00:00 2001 From: Pranav Dhiran Date: Sat, 16 May 2026 21:13:00 +0530 Subject: [PATCH 08/11] Add input validation to WithMultiplier and WithJitter Clamp negative/invalid values to defaults instead of silently accepting them. Signed-off-by: Pranav Dhiran --- retry/options.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/retry/options.go b/retry/options.go index fe33cdd8..ac156a88 100644 --- a/retry/options.go +++ b/retry/options.go @@ -55,12 +55,24 @@ func WithMaxElapsedTime(d time.Duration) Option { } func WithMultiplier(m float64) Option { - return func(c *Config) { c.Multiplier = m } + return func(c *Config) { + if m <= 0 { + c.Multiplier = DefaultMultiplier + return + } + c.Multiplier = m + } } // WithJitter overrides randomization factor (range: 0.0-1.0). Do not set to 0.0 in production. func WithJitter(f float64) Option { - return func(c *Config) { c.RandomizationFactor = f } + return func(c *Config) { + if f < 0 || f > 1 { + c.RandomizationFactor = DefaultRandomizationFactor + return + } + c.RandomizationFactor = f + } } func WithNotifier(n func(err error, wait time.Duration)) Option { From 1718c390a2746f9e0afa3adfa7ba5da1a98e1c4c Mon Sep 17 00:00:00 2001 From: Pranav Date: Tue, 19 May 2026 13:15:32 +0530 Subject: [PATCH 09/11] retry: drop unnecessary uint64->uint cast by aligning Config.MaxAttempts type with backoff library Signed-off-by: Pranav Dhiran --- retry/options.go | 4 ++-- retry/retry.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/retry/options.go b/retry/options.go index ac156a88..2aca5c60 100644 --- a/retry/options.go +++ b/retry/options.go @@ -15,7 +15,7 @@ const ( ) type Config struct { - MaxAttempts uint64 + MaxAttempts uint InitialInterval time.Duration MaxInterval time.Duration MaxElapsedTime time.Duration @@ -37,7 +37,7 @@ func defaultConfig() Config { type Option func(*Config) // WithMaxAttempts sets a hard cap on total calls (includes first attempt). -func WithMaxAttempts(n uint64) Option { +func WithMaxAttempts(n uint) Option { return func(c *Config) { c.MaxAttempts = n } } diff --git a/retry/retry.go b/retry/retry.go index 815013b2..5308f7ca 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -26,7 +26,7 @@ func Do(ctx context.Context, op Operation, opts ...Option) error { backoff.WithNotify(cfg.Notifier), } if cfg.MaxAttempts > 0 { - retryOpts = append(retryOpts, backoff.WithMaxTries(uint(cfg.MaxAttempts))) + retryOpts = append(retryOpts, backoff.WithMaxTries(cfg.MaxAttempts)) } _, err := backoff.Retry(ctx, func() (struct{}, error) { From 0deb97d6924995096992fc62b94e60367c5e753e Mon Sep 17 00:00:00 2001 From: Pranav Dhiran Date: Sun, 31 May 2026 01:02:18 +0530 Subject: [PATCH 10/11] retry: add ErrorClassifier hook, config validation, remove PermanentError alias - Add ErrorClassifier type with DecisionRetry/DecisionStop so callers can classify errors as transient or terminal at their bounded context - Remove exported PermanentError type alias to avoid leaking cenkalti/backoff into MeshKit's public API (zero call-sites affected) - Add validateConfig that fails fast on invalid config: * InitialInterval > 0, MaxInterval > 0, MaxInterval >= InitialInterval * MaxElapsedTime >= 0, Multiplier finite and >= 1, Jitter in [0,1] - Remove silent-default fallback from WithMultiplier and WithJitter (validation is now centralized instead of hiding bugs) - Add 16 tests covering classifier behaviour and config validation - Add ExampleDo showing idiomatic HTTP usage with per-attempt timeout vs retry loop budget, using httptest for hermetic output Signed-off-by: Pranav Dhiran --- retry/_demo/main.go | 175 ++++++++++++++++++++++ retry/error.go | 16 ++- retry/options.go | 41 ++++-- retry/retry.go | 53 ++++++- retry/retry_test.go | 343 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 612 insertions(+), 16 deletions(-) create mode 100644 retry/_demo/main.go diff --git a/retry/_demo/main.go b/retry/_demo/main.go new file mode 100644 index 00000000..4278561c --- /dev/null +++ b/retry/_demo/main.go @@ -0,0 +1,175 @@ +package main + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/meshery/meshkit/retry" +) + +var attempt int + +func main() { + fmt.Println("══════════════════════════════════════════════════") + fmt.Println(" meshkit/retry — exponential backoff demo") + fmt.Println(" transport-agnostic: works with HTTP, gRPC, DB, ...") + fmt.Println("══════════════════════════════════════════════════") + fmt.Println() + + // ── Demo 1: Immediate success ── + fmt.Println("━━━ Demo 1: Immediate success (no retry needed) ━━━") + fmt.Println(" Operation returns nil on 1st try → retry.Do returns immediately.") + attempt = 0 + err := retry.Do(context.Background(), func(ctx context.Context) error { + attempt++ + return nil + }) + fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) + fmt.Println() + + // ── Demo 2: Transient → success after 2 failures ── + fmt.Println("━━━ Demo 2: Transient failures → recovers ━━━") + fmt.Println(" Operation fails twice (returns error), succeeds on 3rd.") + fmt.Println(" retry.Do retries with exponential backoff until success or budget exhausted.") + attempt = 0 + err = retry.Do(context.Background(), func(ctx context.Context) error { + attempt++ + if attempt < 3 { + fmt.Printf(" 〉attempt %d: transient error (will retry)\n", attempt) + return errors.New("transient error") + } + fmt.Printf(" 〉attempt %d: success\n", attempt) + return nil + }, + retry.WithMaxAttempts(5), + retry.WithInitialInterval(100*time.Millisecond), + retry.WithMaxInterval(500*time.Millisecond), + ) + fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) + fmt.Println() + + // ── Demo 3: Permanent error ── + fmt.Println("━━━ Demo 3: Permanent error (stops immediately) ━━━") + fmt.Println(" Operation wraps error with retry.Permanent() → no retry attempted.") + fmt.Println(" Use case: HTTP 4xx, auth failure, validation errors — anything non-transient.") + attempt = 0 + err = retry.Do(context.Background(), func(ctx context.Context) error { + attempt++ + fmt.Printf(" 〉attempt %d: fatal error → retry.Permanent()\n", attempt) + return retry.Permanent(errors.New("fatal: invalid input")) + }, + retry.WithMaxAttempts(5), + ) + fmt.Printf(" → attempts: %d | err: %v (error preserved through chain)\n", attempt, err) + fmt.Println() + + // ── Demo 4: Exhaustion ── + fmt.Println("━━━ Demo 4: Max attempts exhausted ━━━") + fmt.Println(" Operation always fails. WithMaxAttempts(3) limits total tries.") + fmt.Println(" Backoff: 50ms → 100ms → 200ms (capped). Fails fast instead of hanging.") + attempt = 0 + err = retry.Do(context.Background(), func(ctx context.Context) error { + attempt++ + fmt.Printf(" 〉attempt %d: network timeout\n", attempt) + return errors.New("network timeout") + }, + retry.WithMaxAttempts(3), + retry.WithInitialInterval(50*time.Millisecond), + retry.WithMaxInterval(200*time.Millisecond), + ) + fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) + fmt.Println() + + // ── Demo 5: Context cancellation ── + fmt.Println("━━━ Demo 5: Context cancellation ━━━") + fmt.Println(" Context cancelled after 80ms from a goroutine.") + fmt.Println(" Backoff is 200ms → retry.Do notices ctx.Err() before next attempt and stops.") + ctx, cancel := context.WithCancel(context.Background()) + attempt = 0 + go func() { + time.Sleep(80 * time.Millisecond) + fmt.Println(" 〉cancelling context...") + cancel() + }() + start := time.Now() + err = retry.Do(ctx, func(ctx context.Context) error { + attempt++ + fmt.Printf(" 〉attempt %d: transient (%dms elapsed)\n", attempt, time.Since(start).Milliseconds()) + return errors.New("transient") + }, + retry.WithInitialInterval(200*time.Millisecond), + ) + fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) + fmt.Println() + + // ── Demo 6: gRPC-style ── + fmt.Println("━━━ Demo 6: Transport-agnostic — gRPC ━━━") + fmt.Println(" Same retry.Do() — no HTTP dependency. Works with any error-returning operation.") + fmt.Println(" Simulates: gRPC status 'Unavailable' (service temporarily down)") + attempt = 0 + err = retry.Do(context.Background(), func(ctx context.Context) error { + attempt++ + if attempt < 3 { + fmt.Printf(" 〉attempt %d: rpc error: code = Unavailable (will retry)\n", attempt) + return errors.New("rpc error: code = Unavailable desc = service temporarily unavailable") + } + fmt.Printf(" 〉attempt %d: success (gRPC call completed)\n", attempt) + return nil + }, + retry.WithMaxAttempts(5), + retry.WithInitialInterval(100*time.Millisecond), + ) + fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) + fmt.Println() + + // ── Demo 7: Notifier ── + fmt.Println("━━━ Demo 7: Notifier callback ━━━") + fmt.Println(" WithNotifier fires before each retry — useful for logging, metrics, alerts.") + attempt = 0 + err = retry.Do(context.Background(), func(ctx context.Context) error { + attempt++ + if attempt < 3 { + return errors.New("server not ready") + } + return nil + }, + retry.WithMaxAttempts(5), + retry.WithInitialInterval(50*time.Millisecond), + retry.WithMaxInterval(200*time.Millisecond), + retry.WithNotifier(func(err error, wait time.Duration) { + fmt.Printf(" ⚠ notifier: retrying in %v — err: %v\n", wait.Round(time.Millisecond), err) + }), + ) + fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) + fmt.Println() + + // ── Demo 8: DB connection ── + fmt.Println("━━━ Demo 8: Transport-agnostic — Database ━━━") + fmt.Println(" Same retry.Do() — simulates a DB connection pool retry.") + attempt = 0 + err = retry.Do(context.Background(), func(ctx context.Context) error { + attempt++ + if attempt < 2 { + fmt.Printf(" 〉attempt %d: db: connection refused (will retry)\n", attempt) + return errors.New("db: connection refused") + } + fmt.Printf(" 〉attempt %d: connected\n", attempt) + return nil + }, + retry.WithMaxAttempts(3), + retry.WithInitialInterval(50*time.Millisecond), + ) + fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) + fmt.Println() + + fmt.Println("══════════════════════════════════════════════════") + fmt.Println(" All 8 demos passed. Key takeaways:") + fmt.Println(" • retry.Do is transport-agnostic (gRPC, DB, HTTP — same API)") + fmt.Println(" • retry.Permanent() stops retries for fatal errors") + fmt.Println(" • Context cancellation is respected mid-backoff") + fmt.Println(" • MaxAttempts + MaxElapsedTime = safety budget") + fmt.Println(" • WithNotifier hooks into every retry for logging/metrics") + fmt.Println("══════════════════════════════════════════════════") +} diff --git a/retry/error.go b/retry/error.go index d7255b65..92284223 100644 --- a/retry/error.go +++ b/retry/error.go @@ -6,7 +6,21 @@ import ( "github.com/cenkalti/backoff/v5" ) -type PermanentError = backoff.PermanentError +// ErrorDecision controls retry behaviour for a single error. +type ErrorDecision int + +const ( + DecisionRetry ErrorDecision = iota + DecisionStop +) + +// ErrorClassifier returns the retry decision for a given error. +// Return DecisionStop for errors that should not be retried (e.g. HTTP 4xx, +// validation failures, auth errors). Return DecisionRetry for transient +// errors (timeouts, 5xx, rate limits). +// +// Ignored when the operation explicitly returns Permanent(err). +type ErrorClassifier func(err error) ErrorDecision // Permanent wraps err to signal no further retries should be attempted. // Use for non-transient errors (HTTP 4xx, auth failures, validation errors). diff --git a/retry/options.go b/retry/options.go index 2aca5c60..48f6bca6 100644 --- a/retry/options.go +++ b/retry/options.go @@ -22,6 +22,7 @@ type Config struct { Multiplier float64 RandomizationFactor float64 Notifier func(err error, wait time.Duration) + ErrorClassifier ErrorClassifier } func defaultConfig() Config { @@ -55,24 +56,36 @@ func WithMaxElapsedTime(d time.Duration) Option { } func WithMultiplier(m float64) Option { - return func(c *Config) { - if m <= 0 { - c.Multiplier = DefaultMultiplier - return - } - c.Multiplier = m - } + return func(c *Config) { c.Multiplier = m } } // WithJitter overrides randomization factor (range: 0.0-1.0). Do not set to 0.0 in production. func WithJitter(f float64) Option { - return func(c *Config) { - if f < 0 || f > 1 { - c.RandomizationFactor = DefaultRandomizationFactor - return - } - c.RandomizationFactor = f - } + return func(c *Config) { c.RandomizationFactor = f } +} + +// WithErrorClassifier provides a decision function for classifying errors as +// retryable (DecisionRetry) or terminal (DecisionStop). When set, every error +// returned by the operation (except those explicitly wrapped with Permanent) +// is passed to this function. If it returns DecisionStop, the error is treated +// as permanent and the retry loop stops immediately. +// +// Example: +// +// retry.Do(ctx, op, +// retry.WithErrorClassifier(func(err error) retry.ErrorDecision { +// var status *myHTTPError +// if errors.As(err, &status) { +// if status.Code >= 500 { +// return retry.DecisionRetry +// } +// return retry.DecisionStop +// } +// return retry.DecisionRetry +// }), +// ) +func WithErrorClassifier(classifier ErrorClassifier) Option { + return func(c *Config) { c.ErrorClassifier = classifier } } func WithNotifier(n func(err error, wait time.Duration)) Option { diff --git a/retry/retry.go b/retry/retry.go index 5308f7ca..d940f748 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -2,6 +2,9 @@ package retry import ( "context" + "errors" + "fmt" + "math" "github.com/cenkalti/backoff/v5" ) @@ -11,6 +14,10 @@ type Operation func(ctx context.Context) error // Do executes op with exponential backoff until success, permanent error, // context cancellation, or budget exhaustion. Config via opts (default: // 500ms initial, 1.5x growth, 30% jitter, 2min cap). +// +// When a ErrorClassifier is configured via WithErrorClassifier, every non-nil +// error from op (except those explicitly wrapped with Permanent) is passed to +// the classifier before the retry decision is made. func Do(ctx context.Context, op Operation, opts ...Option) error { if err := ctx.Err(); err != nil { return err @@ -20,6 +27,28 @@ func Do(ctx context.Context, op Operation, opts ...Option) error { o(&cfg) } + if err := validateConfig(cfg); err != nil { + return err + } + + apply := op + if cfg.ErrorClassifier != nil { + apply = func(ctx context.Context) error { + err := op(ctx) + if err == nil { + return nil + } + var pErr *backoff.PermanentError + if errors.As(err, &pErr) { + return err + } + if cfg.ErrorClassifier(err) == DecisionStop { + return backoff.Permanent(err) + } + return err + } + } + retryOpts := []backoff.RetryOption{ backoff.WithBackOff(buildBackOff(cfg)), backoff.WithMaxElapsedTime(cfg.MaxElapsedTime), @@ -30,11 +59,33 @@ func Do(ctx context.Context, op Operation, opts ...Option) error { } _, err := backoff.Retry(ctx, func() (struct{}, error) { - return struct{}{}, op(ctx) + return struct{}{}, apply(ctx) }, retryOpts...) return err } +func validateConfig(cfg Config) error { + if cfg.InitialInterval <= 0 { + return fmt.Errorf("retry: InitialInterval must be > 0, got %v", cfg.InitialInterval) + } + if cfg.MaxInterval <= 0 { + return fmt.Errorf("retry: MaxInterval must be > 0, got %v", cfg.MaxInterval) + } + if cfg.MaxInterval < cfg.InitialInterval { + return fmt.Errorf("retry: MaxInterval (%v) must be >= InitialInterval (%v)", cfg.MaxInterval, cfg.InitialInterval) + } + if cfg.MaxElapsedTime < 0 { + return fmt.Errorf("retry: MaxElapsedTime must be >= 0, got %v", cfg.MaxElapsedTime) + } + if math.IsNaN(cfg.Multiplier) || math.IsInf(cfg.Multiplier, 0) || cfg.Multiplier < 1 { + return fmt.Errorf("retry: Multiplier must be finite and >= 1, got %v", cfg.Multiplier) + } + if math.IsNaN(cfg.RandomizationFactor) || cfg.RandomizationFactor < 0 || cfg.RandomizationFactor > 1 { + return fmt.Errorf("retry: RandomizationFactor must be finite and in [0,1], got %v", cfg.RandomizationFactor) + } + return nil +} + // buildBackOff constructs a backoff policy from Config. func buildBackOff(cfg Config) backoff.BackOff { b := backoff.NewExponentialBackOff() diff --git a/retry/retry_test.go b/retry/retry_test.go index 1b4b1df6..c9eb901a 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -4,6 +4,10 @@ import ( "context" "errors" "fmt" + "math" + "net/http" + "net/http/httptest" + "strings" "sync/atomic" "testing" "time" @@ -341,3 +345,342 @@ func TestRetryDefaultsAreApplied(t *testing.T) { t.Fatalf("expected at least 2 calls with default config, got %d", calls.Load()) } } + +func TestRetryClassifierStopsOnDecisionStop(t *testing.T) { + t.Parallel() + + classifyErr := errors.New("not a chance") + var calls atomic.Int64 + + err := retry.Do(context.Background(), + func(ctx context.Context) error { + calls.Add(1) + return classifyErr + }, + retry.WithErrorClassifier(func(err error) retry.ErrorDecision { + if errors.Is(err, classifyErr) { + return retry.DecisionStop + } + return retry.DecisionRetry + }), + retry.WithMaxAttempts(10), + retry.WithInitialInterval(1*time.Millisecond), + ) + + if err == nil { + t.Fatal("expected non-nil error when classifier stops") + } + if !errors.Is(err, classifyErr) { + t.Fatalf("expected classifier error unwrapped, got %v", err) + } + if calls.Load() != 1 { + t.Fatalf("expected exactly 1 call when classifier stops, got %d", calls.Load()) + } +} + +func TestRetryClassifierRetriesOnDecisionRetry(t *testing.T) { + t.Parallel() + + classifyErr := errors.New("transient per classifier") + var calls atomic.Int64 + + err := retry.Do(context.Background(), + func(ctx context.Context) error { + n := calls.Add(1) + if n < 3 { + return classifyErr + } + return nil + }, + retry.WithErrorClassifier(func(err error) retry.ErrorDecision { + return retry.DecisionRetry + }), + retry.WithMaxAttempts(10), + retry.WithInitialInterval(1*time.Millisecond), + retry.WithMaxInterval(5*time.Millisecond), + ) + + if err != nil { + t.Fatalf("expected success after classifier retries, got %v", err) + } + if calls.Load() != 3 { + t.Fatalf("expected 3 calls (2 classified retries + success), got %d", calls.Load()) + } +} + +func TestRetryClassifierDoesNotOverrideExplicitPermanent(t *testing.T) { + t.Parallel() + + permErr := errors.New("explicitly permanent") + var calls atomic.Int64 + + err := retry.Do(context.Background(), + func(ctx context.Context) error { + calls.Add(1) + return retry.Permanent(permErr) + }, + // Classifier says retry everything — but Permanent should still win. + retry.WithErrorClassifier(func(err error) retry.ErrorDecision { + return retry.DecisionRetry + }), + retry.WithMaxAttempts(10), + retry.WithInitialInterval(1*time.Millisecond), + ) + + if err == nil { + t.Fatal("expected non-nil error") + } + if !errors.Is(err, permErr) { + t.Fatalf("expected permanent error unwrapped, got %v", err) + } + if calls.Load() != 1 { + t.Fatalf("expected exactly 1 call for explicit Permanent, got %d", calls.Load()) + } +} + +func TestRetryClassifierCanMixWithPermanent(t *testing.T) { + t.Parallel() + + permErr := errors.New("permanent") + transientErr := errors.New("transient") + var calls atomic.Int64 + + err := retry.Do(context.Background(), + func(ctx context.Context) error { + n := calls.Add(1) + if n == 1 { + return transientErr + } + return retry.Permanent(permErr) + }, + retry.WithErrorClassifier(func(err error) retry.ErrorDecision { + if errors.Is(err, transientErr) { + return retry.DecisionRetry + } + return retry.DecisionStop + }), + retry.WithMaxAttempts(5), + retry.WithInitialInterval(1*time.Millisecond), + retry.WithMaxInterval(2*time.Millisecond), + ) + + if err == nil { + t.Fatal("expected non-nil error") + } + if !errors.Is(err, permErr) { + t.Fatalf("expected permanent error unwrapped, got %v", err) + } + if calls.Load() != 2 { + t.Fatalf("expected 2 calls (transient + permanent), got %d", calls.Load()) + } +} + +func TestRetryConfigValidationInitialIntervalZero(t *testing.T) { + t.Parallel() + + err := retry.Do(context.Background(), + func(ctx context.Context) error { return errors.New("err") }, + retry.WithInitialInterval(0), + ) + if err == nil { + t.Fatal("expected validation error") + } + if !strings.Contains(err.Error(), "InitialInterval") { + t.Fatalf("expected InitialInterval validation error, got %v", err) + } +} + +func TestRetryConfigValidationMaxIntervalZero(t *testing.T) { + t.Parallel() + + err := retry.Do(context.Background(), + func(ctx context.Context) error { return errors.New("err") }, + retry.WithMaxInterval(0), + ) + if err == nil { + t.Fatal("expected validation error") + } + if !strings.Contains(err.Error(), "MaxInterval") { + t.Fatalf("expected MaxInterval validation error, got %v", err) + } +} + +func TestRetryConfigValidationMaxIntervalLessThanInitial(t *testing.T) { + t.Parallel() + + err := retry.Do(context.Background(), + func(ctx context.Context) error { return errors.New("err") }, + retry.WithInitialInterval(5*time.Second), + retry.WithMaxInterval(1*time.Second), + ) + if err == nil { + t.Fatal("expected validation error") + } + if !strings.Contains(err.Error(), "MaxInterval") || !strings.Contains(err.Error(), "InitialInterval") { + t.Fatalf("expected MaxInterval/InitialInterval mismatch error, got %v", err) + } +} + +func TestRetryConfigValidationMultiplierNaN(t *testing.T) { + t.Parallel() + + err := retry.Do(context.Background(), + func(ctx context.Context) error { return errors.New("err") }, + retry.WithMultiplier(float64(math.NaN())), + ) + if err == nil { + t.Fatal("expected validation error") + } + if !strings.Contains(err.Error(), "Multiplier") { + t.Fatalf("expected Multiplier validation error, got %v", err) + } +} + +func TestRetryConfigValidationMultiplierInf(t *testing.T) { + t.Parallel() + + err := retry.Do(context.Background(), + func(ctx context.Context) error { return errors.New("err") }, + retry.WithMultiplier(float64(math.Inf(1))), + ) + if err == nil { + t.Fatal("expected validation error") + } + if !strings.Contains(err.Error(), "Multiplier") { + t.Fatalf("expected Multiplier validation error, got %v", err) + } +} + +func TestRetryConfigValidationMultiplierLessThanOne(t *testing.T) { + t.Parallel() + + err := retry.Do(context.Background(), + func(ctx context.Context) error { return errors.New("err") }, + retry.WithMultiplier(0.5), + ) + if err == nil { + t.Fatal("expected validation error") + } + if !strings.Contains(err.Error(), "Multiplier") { + t.Fatalf("expected Multiplier validation error, got %v", err) + } +} + +func TestRetryConfigValidationJitterNaN(t *testing.T) { + t.Parallel() + + err := retry.Do(context.Background(), + func(ctx context.Context) error { return errors.New("err") }, + retry.WithJitter(float64(math.NaN())), + ) + if err == nil { + t.Fatal("expected validation error") + } + if !strings.Contains(err.Error(), "RandomizationFactor") { + t.Fatalf("expected RandomizationFactor validation error, got %v", err) + } +} + +func TestRetryConfigValidationJitterOutOfRange(t *testing.T) { + t.Parallel() + + err := retry.Do(context.Background(), + func(ctx context.Context) error { return errors.New("err") }, + retry.WithJitter(1.5), + ) + if err == nil { + t.Fatal("expected validation error") + } + if !strings.Contains(err.Error(), "RandomizationFactor") { + t.Fatalf("expected RandomizationFactor validation error, got %v", err) + } +} + +func TestRetryConfigValidationJitterNegative(t *testing.T) { + t.Parallel() + + err := retry.Do(context.Background(), + func(ctx context.Context) error { return errors.New("err") }, + retry.WithJitter(-0.1), + ) + if err == nil { + t.Fatal("expected validation error") + } + if !strings.Contains(err.Error(), "RandomizationFactor") { + t.Fatalf("expected RandomizationFactor validation error, got %v", err) + } +} + +func TestRetryConfigValidationZeroMaxElapsedTimeIsValid(t *testing.T) { + t.Parallel() + + // 0 for MaxElapsedTime means "no wall-clock limit". Should be valid. + err := retry.Do(context.Background(), + func(ctx context.Context) error { return nil }, + retry.WithMaxElapsedTime(0), + ) + if err != nil { + t.Fatalf("expected success (0 MaxElapsedTime is valid), got %v", err) + } +} + +func TestRetryConfigValidationNegativeMaxElapsedTime(t *testing.T) { + t.Parallel() + + err := retry.Do(context.Background(), + func(ctx context.Context) error { return errors.New("err") }, + retry.WithMaxElapsedTime(-1), + ) + if err == nil { + t.Fatal("expected validation error") + } + if !strings.Contains(err.Error(), "MaxElapsedTime") { + t.Fatalf("expected MaxElapsedTime validation error, got %v", err) + } +} + +// ExampleDo demonstrates idiomatic HTTP usage with retry budget and per-attempt timeout. +// +// MaxElapsedTime limits the retry loop but does NOT interrupt an in-flight HTTP +// request. Always pair it with http.Client.Timeout (or NewRequestWithContext) so +// each attempt has its own deadline. +func ExampleDo() { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + })) + defer srv.Close() + + client := &http.Client{Timeout: 3 * time.Second} + + err := retry.Do(context.Background(), func(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, srv.URL, nil) + if err != nil { + return retry.Permanent(err) + } + + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + switch { + case resp.StatusCode == http.StatusOK: + return nil + case resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500: + return fmt.Errorf("transient response: %s", resp.Status) + default: + return retry.Permanent(fmt.Errorf("non-retryable response: %s", resp.Status)) + } + }, + retry.WithMaxAttempts(3), + retry.WithInitialInterval(time.Second), + retry.WithMaxElapsedTime(10*time.Second), + ) + + if err != nil { + fmt.Printf("request failed: %v\n", err) + } + // Output: + // request failed: non-retryable response: 404 Not Found +} From cfcf5b4a4f0c51263fe7854ddea906ea5a30c2e1 Mon Sep 17 00:00:00 2001 From: Pranav Dhiran Date: Mon, 1 Jun 2026 14:48:13 +0530 Subject: [PATCH 11/11] retry: return meshkit errors, add ErrInvalidConfig sentinel, add ExampleWithErrorClassifier - All errors from Do() are now structured meshkit errors with codes/severity - ErrInvalidConfig sentinel lets callers distinguish config errors via errors.Is - retryError wrapper preserves error chains through Unwrap() []error - Add ExampleWithErrorClassifier compiled example, remove standalone _demo - Add TestRetryInvalidConfigSentinelIsReachable Signed-off-by: Pranav Dhiran --- retry/_demo/main.go | 175 -------------------------------------------- retry/error.go | 58 +++++++++++++++ retry/retry.go | 21 +++--- retry/retry_test.go | 52 ++++++++++++- 4 files changed, 121 insertions(+), 185 deletions(-) delete mode 100644 retry/_demo/main.go diff --git a/retry/_demo/main.go b/retry/_demo/main.go deleted file mode 100644 index 4278561c..00000000 --- a/retry/_demo/main.go +++ /dev/null @@ -1,175 +0,0 @@ -package main - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/meshery/meshkit/retry" -) - -var attempt int - -func main() { - fmt.Println("══════════════════════════════════════════════════") - fmt.Println(" meshkit/retry — exponential backoff demo") - fmt.Println(" transport-agnostic: works with HTTP, gRPC, DB, ...") - fmt.Println("══════════════════════════════════════════════════") - fmt.Println() - - // ── Demo 1: Immediate success ── - fmt.Println("━━━ Demo 1: Immediate success (no retry needed) ━━━") - fmt.Println(" Operation returns nil on 1st try → retry.Do returns immediately.") - attempt = 0 - err := retry.Do(context.Background(), func(ctx context.Context) error { - attempt++ - return nil - }) - fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) - fmt.Println() - - // ── Demo 2: Transient → success after 2 failures ── - fmt.Println("━━━ Demo 2: Transient failures → recovers ━━━") - fmt.Println(" Operation fails twice (returns error), succeeds on 3rd.") - fmt.Println(" retry.Do retries with exponential backoff until success or budget exhausted.") - attempt = 0 - err = retry.Do(context.Background(), func(ctx context.Context) error { - attempt++ - if attempt < 3 { - fmt.Printf(" 〉attempt %d: transient error (will retry)\n", attempt) - return errors.New("transient error") - } - fmt.Printf(" 〉attempt %d: success\n", attempt) - return nil - }, - retry.WithMaxAttempts(5), - retry.WithInitialInterval(100*time.Millisecond), - retry.WithMaxInterval(500*time.Millisecond), - ) - fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) - fmt.Println() - - // ── Demo 3: Permanent error ── - fmt.Println("━━━ Demo 3: Permanent error (stops immediately) ━━━") - fmt.Println(" Operation wraps error with retry.Permanent() → no retry attempted.") - fmt.Println(" Use case: HTTP 4xx, auth failure, validation errors — anything non-transient.") - attempt = 0 - err = retry.Do(context.Background(), func(ctx context.Context) error { - attempt++ - fmt.Printf(" 〉attempt %d: fatal error → retry.Permanent()\n", attempt) - return retry.Permanent(errors.New("fatal: invalid input")) - }, - retry.WithMaxAttempts(5), - ) - fmt.Printf(" → attempts: %d | err: %v (error preserved through chain)\n", attempt, err) - fmt.Println() - - // ── Demo 4: Exhaustion ── - fmt.Println("━━━ Demo 4: Max attempts exhausted ━━━") - fmt.Println(" Operation always fails. WithMaxAttempts(3) limits total tries.") - fmt.Println(" Backoff: 50ms → 100ms → 200ms (capped). Fails fast instead of hanging.") - attempt = 0 - err = retry.Do(context.Background(), func(ctx context.Context) error { - attempt++ - fmt.Printf(" 〉attempt %d: network timeout\n", attempt) - return errors.New("network timeout") - }, - retry.WithMaxAttempts(3), - retry.WithInitialInterval(50*time.Millisecond), - retry.WithMaxInterval(200*time.Millisecond), - ) - fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) - fmt.Println() - - // ── Demo 5: Context cancellation ── - fmt.Println("━━━ Demo 5: Context cancellation ━━━") - fmt.Println(" Context cancelled after 80ms from a goroutine.") - fmt.Println(" Backoff is 200ms → retry.Do notices ctx.Err() before next attempt and stops.") - ctx, cancel := context.WithCancel(context.Background()) - attempt = 0 - go func() { - time.Sleep(80 * time.Millisecond) - fmt.Println(" 〉cancelling context...") - cancel() - }() - start := time.Now() - err = retry.Do(ctx, func(ctx context.Context) error { - attempt++ - fmt.Printf(" 〉attempt %d: transient (%dms elapsed)\n", attempt, time.Since(start).Milliseconds()) - return errors.New("transient") - }, - retry.WithInitialInterval(200*time.Millisecond), - ) - fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) - fmt.Println() - - // ── Demo 6: gRPC-style ── - fmt.Println("━━━ Demo 6: Transport-agnostic — gRPC ━━━") - fmt.Println(" Same retry.Do() — no HTTP dependency. Works with any error-returning operation.") - fmt.Println(" Simulates: gRPC status 'Unavailable' (service temporarily down)") - attempt = 0 - err = retry.Do(context.Background(), func(ctx context.Context) error { - attempt++ - if attempt < 3 { - fmt.Printf(" 〉attempt %d: rpc error: code = Unavailable (will retry)\n", attempt) - return errors.New("rpc error: code = Unavailable desc = service temporarily unavailable") - } - fmt.Printf(" 〉attempt %d: success (gRPC call completed)\n", attempt) - return nil - }, - retry.WithMaxAttempts(5), - retry.WithInitialInterval(100*time.Millisecond), - ) - fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) - fmt.Println() - - // ── Demo 7: Notifier ── - fmt.Println("━━━ Demo 7: Notifier callback ━━━") - fmt.Println(" WithNotifier fires before each retry — useful for logging, metrics, alerts.") - attempt = 0 - err = retry.Do(context.Background(), func(ctx context.Context) error { - attempt++ - if attempt < 3 { - return errors.New("server not ready") - } - return nil - }, - retry.WithMaxAttempts(5), - retry.WithInitialInterval(50*time.Millisecond), - retry.WithMaxInterval(200*time.Millisecond), - retry.WithNotifier(func(err error, wait time.Duration) { - fmt.Printf(" ⚠ notifier: retrying in %v — err: %v\n", wait.Round(time.Millisecond), err) - }), - ) - fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) - fmt.Println() - - // ── Demo 8: DB connection ── - fmt.Println("━━━ Demo 8: Transport-agnostic — Database ━━━") - fmt.Println(" Same retry.Do() — simulates a DB connection pool retry.") - attempt = 0 - err = retry.Do(context.Background(), func(ctx context.Context) error { - attempt++ - if attempt < 2 { - fmt.Printf(" 〉attempt %d: db: connection refused (will retry)\n", attempt) - return errors.New("db: connection refused") - } - fmt.Printf(" 〉attempt %d: connected\n", attempt) - return nil - }, - retry.WithMaxAttempts(3), - retry.WithInitialInterval(50*time.Millisecond), - ) - fmt.Printf(" → attempts: %d | err: %v\n", attempt, err) - fmt.Println() - - fmt.Println("══════════════════════════════════════════════════") - fmt.Println(" All 8 demos passed. Key takeaways:") - fmt.Println(" • retry.Do is transport-agnostic (gRPC, DB, HTTP — same API)") - fmt.Println(" • retry.Permanent() stops retries for fatal errors") - fmt.Println(" • Context cancellation is respected mid-backoff") - fmt.Println(" • MaxAttempts + MaxElapsedTime = safety budget") - fmt.Println(" • WithNotifier hooks into every retry for logging/metrics") - fmt.Println("══════════════════════════════════════════════════") -} diff --git a/retry/error.go b/retry/error.go index 92284223..cd598f00 100644 --- a/retry/error.go +++ b/retry/error.go @@ -4,8 +4,66 @@ import ( "errors" "github.com/cenkalti/backoff/v5" + meshkiterrors "github.com/meshery/meshkit/errors" ) +// ErrInvalidConfig is returned when retry configuration validation fails. +// Use errors.Is(err, ErrInvalidConfig) to distinguish config errors from +// operation failures. +var ErrInvalidConfig = errors.New("retry: invalid config") + +const ( + ErrRetryCode = "meshkit-10001" + ErrContextCode = "meshkit-10002" + ErrConfigCode = "meshkit-10003" +) + +type retryError struct { + inner error + meshkit *meshkiterrors.Error +} + +func (e *retryError) Error() string { + return e.meshkit.Error() +} + +func (e *retryError) Unwrap() []error { + return []error{e.inner, e.meshkit} +} + +func ErrRetry(err error) error { + return &retryError{ + inner: err, + meshkit: meshkiterrors.New(ErrRetryCode, meshkiterrors.Alert, + []string{"Retry operation failed"}, + []string{err.Error()}, + []string{"Operation did not succeed within retry limits"}, + []string{"Check the underlying operation and retry configuration"}), + } +} + +func ErrContext(err error) error { + return &retryError{ + inner: err, + meshkit: meshkiterrors.New(ErrContextCode, meshkiterrors.Alert, + []string{"Context canceled or deadline exceeded"}, + []string{err.Error()}, + []string{"Operation timed out or context was canceled"}, + []string{"Check context timeout and ensure the operation completes in time"}), + } +} + +func ErrConfig(err error) error { + return &retryError{ + inner: err, + meshkit: meshkiterrors.New(ErrConfigCode, meshkiterrors.Alert, + []string{"Invalid retry configuration"}, + []string{err.Error()}, + []string{"One or more config values are invalid"}, + []string{"Ensure all retry configuration values are correct"}), + } +} + // ErrorDecision controls retry behaviour for a single error. type ErrorDecision int diff --git a/retry/retry.go b/retry/retry.go index d940f748..60b247fb 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -20,7 +20,7 @@ type Operation func(ctx context.Context) error // the classifier before the retry decision is made. func Do(ctx context.Context, op Operation, opts ...Option) error { if err := ctx.Err(); err != nil { - return err + return ErrContext(err) } cfg := defaultConfig() for _, o := range opts { @@ -28,7 +28,7 @@ func Do(ctx context.Context, op Operation, opts ...Option) error { } if err := validateConfig(cfg); err != nil { - return err + return ErrConfig(err) } apply := op @@ -61,27 +61,30 @@ func Do(ctx context.Context, op Operation, opts ...Option) error { _, err := backoff.Retry(ctx, func() (struct{}, error) { return struct{}{}, apply(ctx) }, retryOpts...) - return err + if err != nil { + return ErrRetry(err) + } + return nil } func validateConfig(cfg Config) error { if cfg.InitialInterval <= 0 { - return fmt.Errorf("retry: InitialInterval must be > 0, got %v", cfg.InitialInterval) + return fmt.Errorf("%w: InitialInterval must be > 0, got %v", ErrInvalidConfig, cfg.InitialInterval) } if cfg.MaxInterval <= 0 { - return fmt.Errorf("retry: MaxInterval must be > 0, got %v", cfg.MaxInterval) + return fmt.Errorf("%w: MaxInterval must be > 0, got %v", ErrInvalidConfig, cfg.MaxInterval) } if cfg.MaxInterval < cfg.InitialInterval { - return fmt.Errorf("retry: MaxInterval (%v) must be >= InitialInterval (%v)", cfg.MaxInterval, cfg.InitialInterval) + return fmt.Errorf("%w: MaxInterval (%v) must be >= InitialInterval (%v)", ErrInvalidConfig, cfg.MaxInterval, cfg.InitialInterval) } if cfg.MaxElapsedTime < 0 { - return fmt.Errorf("retry: MaxElapsedTime must be >= 0, got %v", cfg.MaxElapsedTime) + return fmt.Errorf("%w: MaxElapsedTime must be >= 0, got %v", ErrInvalidConfig, cfg.MaxElapsedTime) } if math.IsNaN(cfg.Multiplier) || math.IsInf(cfg.Multiplier, 0) || cfg.Multiplier < 1 { - return fmt.Errorf("retry: Multiplier must be finite and >= 1, got %v", cfg.Multiplier) + return fmt.Errorf("%w: Multiplier must be finite and >= 1, got %v", ErrInvalidConfig, cfg.Multiplier) } if math.IsNaN(cfg.RandomizationFactor) || cfg.RandomizationFactor < 0 || cfg.RandomizationFactor > 1 { - return fmt.Errorf("retry: RandomizationFactor must be finite and in [0,1], got %v", cfg.RandomizationFactor) + return fmt.Errorf("%w: RandomizationFactor must be finite and in [0,1], got %v", ErrInvalidConfig, cfg.RandomizationFactor) } return nil } diff --git a/retry/retry_test.go b/retry/retry_test.go index c9eb901a..2686158d 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -639,6 +639,21 @@ func TestRetryConfigValidationNegativeMaxElapsedTime(t *testing.T) { } } +func TestRetryInvalidConfigSentinelIsReachable(t *testing.T) { + t.Parallel() + + err := retry.Do(context.Background(), + func(ctx context.Context) error { return errors.New("fail") }, + retry.WithInitialInterval(0), + ) + if err == nil { + t.Fatal("expected validation error") + } + if !errors.Is(err, retry.ErrInvalidConfig) { + t.Fatalf("errors.Is(err, ErrInvalidConfig) should be true, got %v", err) + } +} + // ExampleDo demonstrates idiomatic HTTP usage with retry budget and per-attempt timeout. // // MaxElapsedTime limits the retry loop but does NOT interrupt an in-flight HTTP @@ -682,5 +697,40 @@ func ExampleDo() { fmt.Printf("request failed: %v\n", err) } // Output: - // request failed: non-retryable response: 404 Not Found + // request failed: non-retryable response: 404 Not Found | Short Description: Retry operation failed | Probable Cause: Operation did not succeed within retry limits | Suggested Remediation: Check the underlying operation and retry configuration +} + +// ExampleWithErrorClassifier shows how to classify errors as retryable or +// terminal using WithErrorClassifier. DecisionRetry keeps retrying; +// DecisionStop ends the loop immediately. +func ExampleWithErrorClassifier() { + var ( + ErrTimeout = errors.New("request timeout") + ErrAuth = errors.New("authentication failed") + ) + + var attempts int + err := retry.Do(context.Background(), func(ctx context.Context) error { + attempts++ + if attempts < 3 { + return ErrTimeout + } + return ErrAuth + }, + retry.WithErrorClassifier(func(err error) retry.ErrorDecision { + if errors.Is(err, ErrTimeout) { + return retry.DecisionRetry + } + return retry.DecisionStop + }), + retry.WithMaxAttempts(5), + retry.WithInitialInterval(10*time.Millisecond), + retry.WithMaxInterval(20*time.Millisecond), + ) + + fmt.Println("attempts:", attempts) + fmt.Println("auth error:", errors.Is(err, ErrAuth)) + // Output: + // attempts: 3 + // auth error: true }