From 1b0f272ce15e4a75602b8f9af7a3907413953ed0 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Tue, 19 May 2026 13:32:51 -0400 Subject: [PATCH 1/3] Implement dual-path retry with exponential backoff and rate-limit handling Replace the fixed 10-attempt retry loop with a structured retry system: - 429 + Retry-After: sleep without consuming retry budget, capped by MaxRateLimitDuration - Other retryable errors: counted exponential backoff (base 500ms, 2x, cap 60s) bounded by MaxRetries and MaxTotalBackoffDuration - Non-retryable status codes (4xx except 408/410/429/460, plus 501/505/511) discard immediately - Remove select on c.quit from retry sleeps so Close() waits for in-flight retries to complete naturally via wg.Wait() - Add X-Retry-Count header on retry attempts - Wire MaxRetries config through to e2e-cli - Enable retry test suite in e2e-config --- analytics.go | 130 +++++++++++++++++++++++++++++++--------- config.go | 35 ++++++++++- e2e-cli/e2e-config.json | 2 +- e2e-cli/main.go | 4 ++ error.go | 11 ++++ go.mod | 15 +++++ go.sum | 39 +++++++----- status.go | 46 ++++++++++++++ status_test.go | 62 +++++++++++++++++++ 9 files changed, 297 insertions(+), 47 deletions(-) create mode 100644 status.go create mode 100644 status_test.go diff --git a/analytics.go b/analytics.go index e487ba2..da1c9eb 100644 --- a/analytics.go +++ b/analytics.go @@ -1,15 +1,13 @@ package analytics import ( + "bytes" + "encoding/json" "fmt" "io" - "io/ioutil" + "net/http" "strconv" "sync" - - "bytes" - "encoding/json" - "net/http" "time" ) @@ -243,10 +241,21 @@ func (c *client) sendAsync(msgs []message, wg *sync.WaitGroup, ex *executor) { } } +// httpError is returned by report() for non-2xx/3xx responses. +type httpError struct { + StatusCode int + Retryable bool + IsRateLimit bool + RetryAfter int64 // seconds from Retry-After header; 0 if absent + Body string +} + +func (e *httpError) Error() string { + return fmt.Sprintf("%d %s", e.StatusCode, e.Body) +} + // Send batch request. func (c *client) send(msgs []message) { - const attempts = 10 - b, err := json.Marshal(batch{ MessageId: c.uid(), SentAt: c.now(), @@ -260,28 +269,74 @@ func (c *client) send(msgs []message) { return } - for i := 0; i != attempts; i++ { - if err = c.upload(b); err == nil { + var ( + totalAttempts int + backoffAttempts int + firstFailureTime time.Time + rateLimitStartTime time.Time + lastErr error + ) + + for { + totalAttempts++ + uploadErr := c.upload(b, totalAttempts) + + if uploadErr == nil { c.notifySuccess(msgs) return } - // Wait for either a retry timeout or the client to be closed. - select { - case <-time.After(c.RetryAfter(i)): - case <-c.quit: - c.errorf("%d messages dropped because they failed to be sent and the client was closed", len(msgs)) - c.notifyFailure(msgs, err) + lastErr = uploadErr + + httpErr, ok := uploadErr.(*httpError) + if !ok { + // Network-level error — treat as retryable backoff + httpErr = &httpError{Retryable: true} + } + + if !httpErr.Retryable { + c.errorf("messages dropped due to non-retryable error - %s", uploadErr) + c.notifyFailure(msgs, uploadErr) return } - } - c.errorf("%d messages dropped because they failed to be sent after %d attempts", len(msgs), attempts) - c.notifyFailure(msgs, err) + if httpErr.IsRateLimit && httpErr.RetryAfter > 0 { + // Retry-After present — sleep without consuming retry budget + if rateLimitStartTime.IsZero() { + rateLimitStartTime = c.now() + } + if c.now().Sub(rateLimitStartTime) > c.MaxRateLimitDuration { + c.errorf("messages dropped - %s", ErrRateLimitBudgetExceeded) + c.notifyFailure(msgs, ErrRateLimitBudgetExceeded) + return + } + time.Sleep(time.Duration(httpErr.RetryAfter) * time.Second) + continue + } + + // Counted backoff retry + if firstFailureTime.IsZero() { + firstFailureTime = c.now() + } + if c.now().Sub(firstFailureTime) > c.MaxTotalBackoffDuration { + c.errorf("messages dropped - %s", ErrBackoffBudgetExceeded) + c.notifyFailure(msgs, ErrBackoffBudgetExceeded) + return + } + + backoffAttempts++ + if backoffAttempts > c.MaxRetries { + c.errorf("%d messages dropped after %d attempts", len(msgs), totalAttempts) + c.notifyFailure(msgs, lastErr) + return + } + + time.Sleep(c.RetryAfter(backoffAttempts - 1)) + } } -// Upload serialized batch message. -func (c *client) upload(b []byte) error { +// Upload serialized batch message. attempt is 1-based (1 = first attempt). +func (c *client) upload(b []byte, attempt int) error { url := c.Endpoint + "/v1/batch" req, err := http.NewRequest("POST", url, bytes.NewReader(b)) if err != nil { @@ -294,8 +349,12 @@ func (c *client) upload(b []byte) error { req.Header.Add("Content-Length", strconv.Itoa(len(b))) req.SetBasicAuth(c.key, "") - res, err := c.http.Do(req) + // Spec item 7: omit on first attempt, send 1-based count on retries + if attempt > 1 { + req.Header.Add("X-Retry-Count", strconv.Itoa(attempt-1)) + } + res, err := c.http.Do(req) if err != nil { c.errorf("sending request - %s", err) return err @@ -306,21 +365,34 @@ func (c *client) upload(b []byte) error { } // Report on response body. -func (c *client) report(res *http.Response) (err error) { - var body []byte - - if res.StatusCode < 300 { +func (c *client) report(res *http.Response) error { + // Spec item 1: 2xx and 3xx are success + if isSuccess(res.StatusCode) { c.debugf("response %s", res.Status) - return + return nil } - if body, err = ioutil.ReadAll(res.Body); err != nil { + body, err := io.ReadAll(res.Body) + if err != nil { c.errorf("response %d %s - %s", res.StatusCode, res.Status, err) - return + return err } c.logf("response %d %s – %s", res.StatusCode, res.Status, string(body)) - return fmt.Errorf("%d %s", res.StatusCode, res.Status) + + retryable, isRateLimit := retryableStatus(res.StatusCode) + var retryAfterSecs int64 + if isRateLimit { + retryAfterSecs = parseRetryAfter(res.Header.Get("Retry-After"), maxRetryAfterSeconds) + } + + return &httpError{ + StatusCode: res.StatusCode, + Retryable: retryable, + IsRateLimit: isRateLimit, + RetryAfter: retryAfterSecs, + Body: string(body), + } } // Batch loop. diff --git a/config.go b/config.go index 2672d86..29ad7fc 100644 --- a/config.go +++ b/config.go @@ -61,6 +61,15 @@ type Config struct { // If not set the client will fallback to use a default retry policy. RetryAfter func(int) time.Duration + // Maximum number of counted backoff retries. Defaults to DefaultMaxRetries. + MaxRetries int + + // Wall-clock cap on total time spent in backoff retries. Defaults to DefaultMaxTotalBackoffDuration. + MaxTotalBackoffDuration time.Duration + + // Wall-clock cap on total time spent retrying after 429 Retry-After responses. Defaults to DefaultMaxRateLimitDuration. + MaxRateLimitDuration time.Duration + // A function called by the client to generate unique message identifiers. // The client uses a UUID generator if none is provided. // This field is not exported and only exposed internally to let unit tests @@ -92,6 +101,18 @@ const DefaultInterval = 5 * time.Second // was explicitly set. const DefaultBatchSize = 250 +// DefaultMaxRetries is the default number of counted backoff retries. +const DefaultMaxRetries = 10 + +// DefaultMaxTotalBackoffDuration is the default wall-clock cap on total backoff time. +const DefaultMaxTotalBackoffDuration = 12 * time.Hour + +// DefaultMaxRateLimitDuration is the default wall-clock cap on 429 Retry-After retries. +const DefaultMaxRateLimitDuration = 12 * time.Hour + +// maxRetryAfterSeconds is the cap applied to Retry-After header values. +const maxRetryAfterSeconds = int64(300) + // Verifies that fields that don't have zero-values are set to valid values, // returns an error describing the problem if a field was invalid. func (c *Config) validate() error { @@ -142,7 +163,19 @@ func makeConfig(c Config) Config { } if c.RetryAfter == nil { - c.RetryAfter = backo.DefaultBacko().Duration + c.RetryAfter = backo.NewBacko(500*time.Millisecond, 2, 0, 60*time.Second).Duration + } + + if c.MaxRetries == 0 { + c.MaxRetries = DefaultMaxRetries + } + + if c.MaxTotalBackoffDuration == 0 { + c.MaxTotalBackoffDuration = DefaultMaxTotalBackoffDuration + } + + if c.MaxRateLimitDuration == 0 { + c.MaxRateLimitDuration = DefaultMaxRateLimitDuration } if c.uid == nil { diff --git a/e2e-cli/e2e-config.json b/e2e-cli/e2e-config.json index d7deb43..f926123 100644 --- a/e2e-cli/e2e-config.json +++ b/e2e-cli/e2e-config.json @@ -1,6 +1,6 @@ { "sdk": "go", - "test_suites": "basic", + "test_suites": "basic,retry", "auto_settings": false, "patch": null, "env": {} diff --git a/e2e-cli/main.go b/e2e-cli/main.go index eb35f89..69b9889 100644 --- a/e2e-cli/main.go +++ b/e2e-cli/main.go @@ -219,6 +219,10 @@ func run(input Input) Output { cfg.BatchSize = input.Config.FlushAt } + if input.Config.MaxRetries > 0 { + cfg.MaxRetries = input.Config.MaxRetries + } + client, err := analytics.NewWithConfig(input.WriteKey, cfg) if err != nil { return Output{ diff --git a/error.go b/error.go index d550386..218a4ee 100644 --- a/error.go +++ b/error.go @@ -57,4 +57,15 @@ var ( // failed because the JSON representation of a message exceeded the upper // limit. ErrMessageTooBig = errors.New("the message exceeds the maximum allowed size") + + // ErrBackoffBudgetExceeded is returned when the maximum total backoff + // duration is exceeded before a batch upload succeeds. + ErrBackoffBudgetExceeded = errors.New("max total backoff duration exceeded") + + // ErrRateLimitBudgetExceeded is returned when the maximum rate-limit + // (429 Retry-After) duration is exceeded. + ErrRateLimitBudgetExceeded = errors.New("max rate limit duration exceeded") + + // ErrRetriesExhausted is returned when all retry attempts are consumed. + ErrRetriesExhausted = errors.New("retries exhausted") ) diff --git a/go.mod b/go.mod index 99f9a03..c7ec864 100644 --- a/go.mod +++ b/go.mod @@ -6,4 +6,19 @@ require ( github.com/google/uuid v1.3.0 github.com/segmentio/backo-go v1.0.0 github.com/segmentio/conf v1.2.0 + github.com/stretchr/testify v1.11.1 +) + +require ( + github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/segmentio/go-snakecase v1.1.0 // indirect + github.com/segmentio/objconv v1.0.1 // indirect + gopkg.in/go-playground/assert.v1 v1.2.1 // indirect + gopkg.in/go-playground/mold.v2 v2.2.0 // indirect + gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 // indirect + gopkg.in/yaml.v2 v2.2.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 120e887..23cfd3f 100644 --- a/go.sum +++ b/go.sum @@ -1,21 +1,20 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= -github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/segmentio/analytics-go v3.1.0+incompatible h1:IyiOfUgQFVHvsykKKbdI7ZsH374uv3/DfZUo9+G0Z80= -github.com/segmentio/analytics-go v3.1.0+incompatible/go.mod h1:C7CYBtQWk4vRk2RyLu0qOcbHJ18E3F1HV2C/8JvKN48= -github.com/segmentio/backo-go v0.0.0-20200129164019-23eae7c10bd3 h1:ZuhckGJ10ulaKkdvJtiAqsLTiPrLaXSdnVgXJKJkTxE= -github.com/segmentio/backo-go v0.0.0-20200129164019-23eae7c10bd3/go.mod h1:9/Rh6yILuLysoQnZ2oNooD2g7aBnvM7r/fNVxRNWfBc= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/segmentio/backo-go v1.0.0 h1:kbOAtGJY2DqOR0jfRkYEorx/b18RgtepGtY3+Cpe6qA= github.com/segmentio/backo-go v1.0.0/go.mod h1:kJ9mm9YmoWSkk+oQ+5Cj8DEoRCX2JT6As4kEtIIOp1M= github.com/segmentio/conf v1.2.0 h1:5OT9+6OyVHLsFLsiJa/2KlqiA1m7mpdUBlkB/qYTMts= @@ -24,12 +23,17 @@ github.com/segmentio/go-snakecase v1.1.0 h1:ZJO4SNKKV0MjGOv0LHnixxN5FYv1JKBnVXEu github.com/segmentio/go-snakecase v1.1.0/go.mod h1:jk1miR5MS7Na32PZUykG89Arm+1BUSYhuGR6b7+hJto= github.com/segmentio/objconv v1.0.1 h1:QjfLzwriJj40JibCV3MGSEiAoXixbp4ybhwfTB8RXOM= github.com/segmentio/objconv v1.0.1/go.mod h1:auayaH5k3137Cl4SoXTgrzQcuQDmvuVtZgS0fb1Ahys= -github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c h1:3lbZUMbMiGUW/LMkfsEABsc5zNT9+b1CvsJx47JzJ8g= -github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c/go.mod h1:UrdRz5enIKZ63MEE3IF9l2/ebyx59GyGgPi+tICQdmM= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/mold.v2 v2.2.0 h1:Y4IYB4/HYQfuq43zaKh6vs9cVelLE9qbqe2fkyfCTWQ= @@ -38,3 +42,6 @@ gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 h1:WB265cn5OpO+hK3pikC9 gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/status.go b/status.go new file mode 100644 index 0000000..c9f7b83 --- /dev/null +++ b/status.go @@ -0,0 +1,46 @@ +package analytics + +import ( + "strconv" + "strings" +) + +// isSuccess returns true for 2xx and 3xx responses (spec item 1). +func isSuccess(status int) bool { + return status >= 200 && status < 400 +} + +// retryableStatus returns the retry strategy for a given HTTP status code. +// Returns (retryable bool, isRateLimit bool). +func retryableStatus(status int) (retryable bool, isRateLimit bool) { + switch status { + case 429: + return true, true + case 408, 410, 460: + return true, false + case 501, 505, 511: + return false, false + default: + if status >= 500 && status < 600 { + return true, false + } + return false, false + } +} + +// parseRetryAfter parses the Retry-After header value (integer seconds only). +// Returns 0 if the value is absent, invalid, zero, or an HTTP-date. +// Caps the value at cap. +func parseRetryAfter(header string, cap int64) int64 { + if header == "" { + return 0 + } + n, err := strconv.ParseInt(strings.TrimSpace(header), 10, 64) + if err != nil || n <= 0 { + return 0 + } + if n > cap { + return cap + } + return n +} diff --git a/status_test.go b/status_test.go new file mode 100644 index 0000000..4eb32cf --- /dev/null +++ b/status_test.go @@ -0,0 +1,62 @@ +package analytics + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIsSuccess(t *testing.T) { + cases := []struct { + status int + want bool + }{ + {200, true}, {201, true}, {204, true}, {301, true}, {302, true}, + {400, false}, {429, false}, {500, false}, {0, false}, {199, false}, + } + for _, tc := range cases { + assert.Equal(t, tc.want, isSuccess(tc.status), "status %d", tc.status) + } +} + +func TestRetryableStatus(t *testing.T) { + cases := []struct { + status int + retryable bool + rateLimit bool + }{ + {408, true, false}, + {410, true, false}, + {429, true, true}, + {460, true, false}, + {500, true, false}, + {502, true, false}, + {503, true, false}, + {504, true, false}, + {508, true, false}, + {501, false, false}, + {505, false, false}, + {511, false, false}, + {400, false, false}, + {401, false, false}, + {403, false, false}, + {413, false, false}, + {200, false, false}, + } + for _, tc := range cases { + r, rl := retryableStatus(tc.status) + assert.Equal(t, tc.retryable, r, "retryable for status %d", tc.status) + assert.Equal(t, tc.rateLimit, rl, "isRateLimit for status %d", tc.status) + } +} + +func TestParseRetryAfter(t *testing.T) { + assert.Equal(t, int64(60), parseRetryAfter("60", 300)) + assert.Equal(t, int64(300), parseRetryAfter("9999", 300)) // capped + assert.Equal(t, int64(0), parseRetryAfter("0", 300)) + assert.Equal(t, int64(0), parseRetryAfter("-1", 300)) + assert.Equal(t, int64(0), parseRetryAfter("", 300)) + assert.Equal(t, int64(0), parseRetryAfter("Wed, 07 May 2026 12:00:00 GMT", 300)) + assert.Equal(t, int64(1), parseRetryAfter("1", 300)) + assert.Equal(t, int64(300), parseRetryAfter("300", 300)) +} From bbf12a19ee8346a928711594eb9007707b8325d7 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Thu, 21 May 2026 10:57:39 -0400 Subject: [PATCH 2/3] Fix review finding: preserve status code retryability on body-read error When io.ReadAll fails on a non-2xx response, wrap the error in httpError with the original status code's retryability instead of returning a raw error (which would be treated as a network error and always retried). --- analytics.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/analytics.go b/analytics.go index da1c9eb..f2ab845 100644 --- a/analytics.go +++ b/analytics.go @@ -375,7 +375,13 @@ func (c *client) report(res *http.Response) error { body, err := io.ReadAll(res.Body) if err != nil { c.errorf("response %d %s - %s", res.StatusCode, res.Status, err) - return err + retryable, isRL := retryableStatus(res.StatusCode) + return &httpError{ + StatusCode: res.StatusCode, + Retryable: retryable, + IsRateLimit: isRL, + Body: err.Error(), + } } c.logf("response %d %s – %s", res.StatusCode, res.Status, string(body)) From 1f2a42f32dd5dbd1ff6e19665f1792fabd18bdc7 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Thu, 4 Jun 2026 20:08:11 -0400 Subject: [PATCH 3/3] Refactor send into small, single-responsibility helpers Extract retry loop body into retryState.classify/handleRateLimit/handleBackoff so that the send function reads as a simple attempt-classify-act switch. The control flow (return, sleep-and-continue) is now visible at a glance. Also fix TestClientResponseBodyError to match the httpError wrapping introduced in bbf12a1. --- analytics.go | 137 +++++++++++++++++++++++++++++----------------- analytics_test.go | 6 +- 2 files changed, 91 insertions(+), 52 deletions(-) diff --git a/analytics.go b/analytics.go index f2ab845..0be4720 100644 --- a/analytics.go +++ b/analytics.go @@ -269,70 +269,107 @@ func (c *client) send(msgs []message) { return } - var ( - totalAttempts int - backoffAttempts int - firstFailureTime time.Time - rateLimitStartTime time.Time - lastErr error - ) - + retry := retryState{client: c, msgs: msgs} for { - totalAttempts++ - uploadErr := c.upload(b, totalAttempts) + retry.totalAttempts++ + uploadErr := c.upload(b, retry.totalAttempts) if uploadErr == nil { c.notifySuccess(msgs) return } - lastErr = uploadErr - - httpErr, ok := uploadErr.(*httpError) - if !ok { - // Network-level error — treat as retryable backoff - httpErr = &httpError{Retryable: true} - } - - if !httpErr.Retryable { - c.errorf("messages dropped due to non-retryable error - %s", uploadErr) - c.notifyFailure(msgs, uploadErr) + action := retry.classify(uploadErr) + switch action { + case retryActionDrop: return + case retryActionRateLimit: + time.Sleep(retry.rateLimitDelay) + case retryActionBackoff: + time.Sleep(c.RetryAfter(retry.backoffAttempts - 1)) } + } +} - if httpErr.IsRateLimit && httpErr.RetryAfter > 0 { - // Retry-After present — sleep without consuming retry budget - if rateLimitStartTime.IsZero() { - rateLimitStartTime = c.now() - } - if c.now().Sub(rateLimitStartTime) > c.MaxRateLimitDuration { - c.errorf("messages dropped - %s", ErrRateLimitBudgetExceeded) - c.notifyFailure(msgs, ErrRateLimitBudgetExceeded) - return - } - time.Sleep(time.Duration(httpErr.RetryAfter) * time.Second) - continue - } +type retryAction int - // Counted backoff retry - if firstFailureTime.IsZero() { - firstFailureTime = c.now() - } - if c.now().Sub(firstFailureTime) > c.MaxTotalBackoffDuration { - c.errorf("messages dropped - %s", ErrBackoffBudgetExceeded) - c.notifyFailure(msgs, ErrBackoffBudgetExceeded) - return - } +const ( + retryActionBackoff retryAction = iota + retryActionRateLimit retryAction = iota + retryActionDrop retryAction = iota +) - backoffAttempts++ - if backoffAttempts > c.MaxRetries { - c.errorf("%d messages dropped after %d attempts", len(msgs), totalAttempts) - c.notifyFailure(msgs, lastErr) - return - } +// retryState tracks state across attempts within a single send call. +type retryState struct { + client *client + msgs []message + totalAttempts int + backoffAttempts int + firstFailureTime time.Time + rateLimitStartTime time.Time + rateLimitDelay time.Duration +} + +// classify determines what to do after a failed upload. It updates internal +// counters, logs/notifies on terminal failures, and returns the action the +// caller should take. +func (r *retryState) classify(uploadErr error) retryAction { + c := r.client + + httpErr, ok := uploadErr.(*httpError) + if !ok { + httpErr = &httpError{Retryable: true} + } + + if !httpErr.Retryable { + c.errorf("messages dropped due to non-retryable error - %s", uploadErr) + c.notifyFailure(r.msgs, uploadErr) + return retryActionDrop + } + + if httpErr.IsRateLimit && httpErr.RetryAfter > 0 { + return r.handleRateLimit(httpErr) + } + + return r.handleBackoff(uploadErr) +} + +func (r *retryState) handleRateLimit(httpErr *httpError) retryAction { + c := r.client + + if r.rateLimitStartTime.IsZero() { + r.rateLimitStartTime = c.now() + } + if c.now().Sub(r.rateLimitStartTime) > c.MaxRateLimitDuration { + c.errorf("messages dropped - %s", ErrRateLimitBudgetExceeded) + c.notifyFailure(r.msgs, ErrRateLimitBudgetExceeded) + return retryActionDrop + } + + r.rateLimitDelay = time.Duration(httpErr.RetryAfter) * time.Second + return retryActionRateLimit +} + +func (r *retryState) handleBackoff(lastErr error) retryAction { + c := r.client - time.Sleep(c.RetryAfter(backoffAttempts - 1)) + if r.firstFailureTime.IsZero() { + r.firstFailureTime = c.now() } + if c.now().Sub(r.firstFailureTime) > c.MaxTotalBackoffDuration { + c.errorf("messages dropped - %s", ErrBackoffBudgetExceeded) + c.notifyFailure(r.msgs, ErrBackoffBudgetExceeded) + return retryActionDrop + } + + r.backoffAttempts++ + if r.backoffAttempts > c.MaxRetries { + c.errorf("%d messages dropped after %d attempts", len(r.msgs), r.totalAttempts) + c.notifyFailure(r.msgs, lastErr) + return retryActionDrop + } + + return retryActionBackoff } // Upload serialized batch message. attempt is 1-based (1 = first attempt). diff --git a/analytics_test.go b/analytics_test.go index 50efa11..a895aff 100644 --- a/analytics_test.go +++ b/analytics_test.go @@ -781,8 +781,10 @@ func TestClientResponseBodyError(t *testing.T) { if err := <-errchan; err == nil { t.Error("failure callback not triggered for a 400 response") - } else if err != testError { - t.Errorf("invalid error returned by erroring response body: %T: %s", err, err) + } else if httpErr, ok := err.(*httpError); !ok { + t.Errorf("expected *httpError, got %T: %s", err, err) + } else if httpErr.StatusCode != 400 { + t.Errorf("expected status 400, got %d", httpErr.StatusCode) } }