Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 143 additions & 28 deletions analytics.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package analytics

import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"sync"

"bytes"
"encoding/json"
"net/http"
"time"
)

Expand Down Expand Up @@ -243,10 +241,21 @@ func (c *client) sendAsync(msgs []message, wg *sync.WaitGroup, ex *executor) {
}
}

// httpError is returned by report() for non-2xx/3xx responses.
type httpError struct {
StatusCode int
Retryable bool
IsRateLimit bool
RetryAfter int64 // seconds from Retry-After header; 0 if absent
Body string
}

func (e *httpError) Error() string {
return fmt.Sprintf("%d %s", e.StatusCode, e.Body)
}

// Send batch request.
func (c *client) send(msgs []message) {
const attempts = 10

b, err := json.Marshal(batch{
MessageId: c.uid(),
SentAt: c.now(),
Expand All @@ -260,28 +269,111 @@ func (c *client) send(msgs []message) {
return
}

for i := 0; i != attempts; i++ {
if err = c.upload(b); err == nil {
retry := retryState{client: c, msgs: msgs}
for {
retry.totalAttempts++

uploadErr := c.upload(b, retry.totalAttempts)
if uploadErr == nil {
c.notifySuccess(msgs)
return
}

// Wait for either a retry timeout or the client to be closed.
select {
case <-time.After(c.RetryAfter(i)):
case <-c.quit:
c.errorf("%d messages dropped because they failed to be sent and the client was closed", len(msgs))
c.notifyFailure(msgs, err)
action := retry.classify(uploadErr)
switch action {
case retryActionDrop:
return
case retryActionRateLimit:
time.Sleep(retry.rateLimitDelay)
case retryActionBackoff:
time.Sleep(c.RetryAfter(retry.backoffAttempts - 1))
}
}
}

type retryAction int

c.errorf("%d messages dropped because they failed to be sent after %d attempts", len(msgs), attempts)
c.notifyFailure(msgs, err)
const (
retryActionBackoff retryAction = iota
retryActionRateLimit retryAction = iota
retryActionDrop retryAction = iota
)

// retryState tracks state across attempts within a single send call.
type retryState struct {
client *client
msgs []message
totalAttempts int
backoffAttempts int
firstFailureTime time.Time
rateLimitStartTime time.Time
rateLimitDelay time.Duration
}

// Upload serialized batch message.
func (c *client) upload(b []byte) error {
// classify determines what to do after a failed upload. It updates internal
// counters, logs/notifies on terminal failures, and returns the action the
// caller should take.
func (r *retryState) classify(uploadErr error) retryAction {
c := r.client

httpErr, ok := uploadErr.(*httpError)
if !ok {
httpErr = &httpError{Retryable: true}
}

if !httpErr.Retryable {
c.errorf("messages dropped due to non-retryable error - %s", uploadErr)
c.notifyFailure(r.msgs, uploadErr)
return retryActionDrop
}

if httpErr.IsRateLimit && httpErr.RetryAfter > 0 {
return r.handleRateLimit(httpErr)
}

return r.handleBackoff(uploadErr)
}

func (r *retryState) handleRateLimit(httpErr *httpError) retryAction {
c := r.client

if r.rateLimitStartTime.IsZero() {
r.rateLimitStartTime = c.now()
}
if c.now().Sub(r.rateLimitStartTime) > c.MaxRateLimitDuration {
c.errorf("messages dropped - %s", ErrRateLimitBudgetExceeded)
c.notifyFailure(r.msgs, ErrRateLimitBudgetExceeded)
return retryActionDrop
}

r.rateLimitDelay = time.Duration(httpErr.RetryAfter) * time.Second
return retryActionRateLimit
}

func (r *retryState) handleBackoff(lastErr error) retryAction {
c := r.client

if r.firstFailureTime.IsZero() {
r.firstFailureTime = c.now()
}
Comment thread
MichaelGHSeg marked this conversation as resolved.
if c.now().Sub(r.firstFailureTime) > c.MaxTotalBackoffDuration {
c.errorf("messages dropped - %s", ErrBackoffBudgetExceeded)
c.notifyFailure(r.msgs, ErrBackoffBudgetExceeded)
return retryActionDrop
}

r.backoffAttempts++
if r.backoffAttempts > c.MaxRetries {
c.errorf("%d messages dropped after %d attempts", len(r.msgs), r.totalAttempts)
c.notifyFailure(r.msgs, lastErr)
return retryActionDrop
}

return retryActionBackoff
}

// Upload serialized batch message. attempt is 1-based (1 = first attempt).
func (c *client) upload(b []byte, attempt int) error {
url := c.Endpoint + "/v1/batch"
req, err := http.NewRequest("POST", url, bytes.NewReader(b))
if err != nil {
Expand All @@ -294,8 +386,12 @@ func (c *client) upload(b []byte) error {
req.Header.Add("Content-Length", strconv.Itoa(len(b)))
req.SetBasicAuth(c.key, "")

res, err := c.http.Do(req)
// Spec item 7: omit on first attempt, send 1-based count on retries
if attempt > 1 {
req.Header.Add("X-Retry-Count", strconv.Itoa(attempt-1))
}

res, err := c.http.Do(req)
if err != nil {
c.errorf("sending request - %s", err)
return err
Expand All @@ -306,21 +402,40 @@ func (c *client) upload(b []byte) error {
}

// Report on response body.
func (c *client) report(res *http.Response) (err error) {
var body []byte

if res.StatusCode < 300 {
func (c *client) report(res *http.Response) error {
// Spec item 1: 2xx and 3xx are success
if isSuccess(res.StatusCode) {
c.debugf("response %s", res.Status)
return
return nil
}

if body, err = ioutil.ReadAll(res.Body); err != nil {
body, err := io.ReadAll(res.Body)
if err != nil {
c.errorf("response %d %s - %s", res.StatusCode, res.Status, err)
return
retryable, isRL := retryableStatus(res.StatusCode)
return &httpError{
StatusCode: res.StatusCode,
Retryable: retryable,
IsRateLimit: isRL,
Body: err.Error(),
}
}

c.logf("response %d %s – %s", res.StatusCode, res.Status, string(body))
return fmt.Errorf("%d %s", res.StatusCode, res.Status)

retryable, isRateLimit := retryableStatus(res.StatusCode)
var retryAfterSecs int64
if isRateLimit {
retryAfterSecs = parseRetryAfter(res.Header.Get("Retry-After"), maxRetryAfterSeconds)
}

return &httpError{
StatusCode: res.StatusCode,
Retryable: retryable,
IsRateLimit: isRateLimit,
RetryAfter: retryAfterSecs,
Body: string(body),
}
}

// Batch loop.
Expand Down
6 changes: 4 additions & 2 deletions analytics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,8 +781,10 @@ func TestClientResponseBodyError(t *testing.T) {
if err := <-errchan; err == nil {
t.Error("failure callback not triggered for a 400 response")

} else if err != testError {
t.Errorf("invalid error returned by erroring response body: %T: %s", err, err)
} else if httpErr, ok := err.(*httpError); !ok {
t.Errorf("expected *httpError, got %T: %s", err, err)
} else if httpErr.StatusCode != 400 {
t.Errorf("expected status 400, got %d", httpErr.StatusCode)
}
}

Expand Down
35 changes: 34 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ type Config struct {
// If not set the client will fallback to use a default retry policy.
RetryAfter func(int) time.Duration

// Maximum number of counted backoff retries. Defaults to DefaultMaxRetries.
MaxRetries int

// Wall-clock cap on total time spent in backoff retries. Defaults to DefaultMaxTotalBackoffDuration.
MaxTotalBackoffDuration time.Duration

// Wall-clock cap on total time spent retrying after 429 Retry-After responses. Defaults to DefaultMaxRateLimitDuration.
MaxRateLimitDuration time.Duration

// A function called by the client to generate unique message identifiers.
// The client uses a UUID generator if none is provided.
// This field is not exported and only exposed internally to let unit tests
Expand Down Expand Up @@ -92,6 +101,18 @@ const DefaultInterval = 5 * time.Second
// was explicitly set.
const DefaultBatchSize = 250

// DefaultMaxRetries is the default number of counted backoff retries.
const DefaultMaxRetries = 10

// DefaultMaxTotalBackoffDuration is the default wall-clock cap on total backoff time.
const DefaultMaxTotalBackoffDuration = 12 * time.Hour

// DefaultMaxRateLimitDuration is the default wall-clock cap on 429 Retry-After retries.
const DefaultMaxRateLimitDuration = 12 * time.Hour

// maxRetryAfterSeconds is the cap applied to Retry-After header values.
const maxRetryAfterSeconds = int64(300)

// Verifies that fields that don't have zero-values are set to valid values,
// returns an error describing the problem if a field was invalid.
func (c *Config) validate() error {
Expand Down Expand Up @@ -142,7 +163,19 @@ func makeConfig(c Config) Config {
}

if c.RetryAfter == nil {
c.RetryAfter = backo.DefaultBacko().Duration
c.RetryAfter = backo.NewBacko(500*time.Millisecond, 2, 0, 60*time.Second).Duration
}

if c.MaxRetries == 0 {
c.MaxRetries = DefaultMaxRetries
}

if c.MaxTotalBackoffDuration == 0 {
c.MaxTotalBackoffDuration = DefaultMaxTotalBackoffDuration
}

if c.MaxRateLimitDuration == 0 {
c.MaxRateLimitDuration = DefaultMaxRateLimitDuration
}

if c.uid == nil {
Expand Down
2 changes: 1 addition & 1 deletion e2e-cli/e2e-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": "go",
"test_suites": "basic",
"test_suites": "basic,retry",
"auto_settings": false,
"patch": null,
"env": {}
Expand Down
4 changes: 4 additions & 0 deletions e2e-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func run(input Input) Output {
cfg.BatchSize = input.Config.FlushAt
}

if input.Config.MaxRetries > 0 {
cfg.MaxRetries = input.Config.MaxRetries
}

client, err := analytics.NewWithConfig(input.WriteKey, cfg)
if err != nil {
return Output{
Expand Down
11 changes: 11 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,15 @@ var (
// failed because the JSON representation of a message exceeded the upper
// limit.
ErrMessageTooBig = errors.New("the message exceeds the maximum allowed size")

// ErrBackoffBudgetExceeded is returned when the maximum total backoff
// duration is exceeded before a batch upload succeeds.
ErrBackoffBudgetExceeded = errors.New("max total backoff duration exceeded")

// ErrRateLimitBudgetExceeded is returned when the maximum rate-limit
// (429 Retry-After) duration is exceeded.
ErrRateLimitBudgetExceeded = errors.New("max rate limit duration exceeded")

// ErrRetriesExhausted is returned when all retry attempts are consumed.
ErrRetriesExhausted = errors.New("retries exhausted")
)
15 changes: 15 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,19 @@ require (
github.com/google/uuid v1.3.0
github.com/segmentio/backo-go v1.0.0
github.com/segmentio/conf v1.2.0
github.com/stretchr/testify v1.11.1
)

require (
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/segmentio/go-snakecase v1.1.0 // indirect
github.com/segmentio/objconv v1.0.1 // indirect
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/go-playground/mold.v2 v2.2.0 // indirect
gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 // indirect
gopkg.in/yaml.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading