Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ var (
ErrInvalidTransition = errors.New("invalid transition")
)

// ErrorCounter defines an interface for counting occurrences of errors with optional labels.
// ErrorCounter defines an interface for counting errors keyed by stable labels.
// At least one label is required — labels should identify the process and run (e.g. processName, runID).
type ErrorCounter interface {
Add(err error, labels ...string) int
Count(err error, labels ...string) int
Clear(err error, labels ...string)
Add(label string, extras ...string) int
Count(label string, extras ...string) int
Clear(label string, extras ...string)
}
31 changes: 17 additions & 14 deletions internal/errorcounter/errorcounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,34 @@ type Counter struct {
store map[string]int
}

func (c *Counter) Add(err error, labels ...string) int {
func (c *Counter) Add(label string, extras ...string) int {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is technically a breaking change, but because we're pre v1.0.0, it's fine to release in a minor 👍 just worth noting in the release notes

c.mu.Lock()
defer c.mu.Unlock()

errMsg := err.Error()
errMsg += strings.Join(labels, "-")
c.store[errMsg] += 1
return c.store[errMsg]
key := makeKey(label, extras)
c.store[key] += 1
return c.store[key]
}

func (c *Counter) Count(err error, labels ...string) int {
func (c *Counter) Count(label string, extras ...string) int {
c.mu.Lock()
defer c.mu.Unlock()

errMsg := err.Error()
errMsg += strings.Join(labels, "-")
return c.store[errMsg]
key := makeKey(label, extras)
return c.store[key]
}

func (c *Counter) Clear(err error, labels ...string) {
func (c *Counter) Clear(label string, extras ...string) {
c.mu.Lock()
defer c.mu.Unlock()

errMsg := err.Error()
errMsg += strings.Join(labels, "-")
c.store[errMsg] = 0
return
key := makeKey(label, extras)
delete(c.store, key)
}

func makeKey(label string, extras []string) string {
if len(extras) == 0 {
return label
}
return label + "\x00" + strings.Join(extras, "\x00")
}
120 changes: 71 additions & 49 deletions internal/errorcounter/errorcounter_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package errorcounter_test

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -10,52 +9,75 @@ import (
)

func TestErrorCounter(t *testing.T) {
testCases := []struct {
name string
inputErr error
labels []string
iterationCount int
expectedCount int
}{
{
name: "Add 3 and get 3",
inputErr: errors.New("test error"),
labels: []string{"label 1", "label 2"},
iterationCount: 3,
expectedCount: 3,
},
{
name: "Add 1 and get 1 - no labels",
inputErr: errors.New("test error"),
labels: []string{},
iterationCount: 3,
expectedCount: 3,
},
{
name: "Add 0 and get 0",
inputErr: errors.New("test error"),
labels: []string{"label 1"},
iterationCount: 0,
expectedCount: 0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := errorcounter.New()

var currentCount int
for i := 0; i < tc.iterationCount; i++ {
currentCount = c.Add(tc.inputErr, tc.labels...)
}
require.Equal(t, tc.expectedCount, currentCount)

count := c.Count(tc.inputErr, tc.labels...)
require.Equal(t, tc.expectedCount, count)

c.Clear(tc.inputErr, tc.labels...)
count = c.Count(tc.inputErr, tc.labels...)
require.Equal(t, 0, count)
})
}
t.Run("Add 3 and get 3", func(t *testing.T) {
c := errorcounter.New()

c.Add("label 1", "label 2")
c.Add("label 1", "label 2")
count := c.Add("label 1", "label 2")
require.Equal(t, 3, count)

require.Equal(t, 3, c.Count("label 1", "label 2"))

c.Clear("label 1", "label 2")
require.Equal(t, 0, c.Count("label 1", "label 2"))
})

t.Run("Single label", func(t *testing.T) {
c := errorcounter.New()

c.Add("only-label")
count := c.Add("only-label")
require.Equal(t, 2, count)

require.Equal(t, 2, c.Count("only-label"))

c.Clear("only-label")
require.Equal(t, 0, c.Count("only-label"))
})

t.Run("Add 0 and get 0", func(t *testing.T) {
c := errorcounter.New()
require.Equal(t, 0, c.Count("label 1"))
})
}

func TestErrorCounter_ClearRemovesKey(t *testing.T) {
c := errorcounter.New()

c.Add("process", "run-1")
c.Add("process", "run-1")
require.Equal(t, 2, c.Count("process", "run-1"))

c.Clear("process", "run-1")

// After clear, count should be 0 and next Add should return 1.
require.Equal(t, 0, c.Count("process", "run-1"))
require.Equal(t, 1, c.Add("process", "run-1"))
}

func TestErrorCounter_DifferentLabelsSeparateCounters(t *testing.T) {
c := errorcounter.New()

c.Add("process-a", "run-1")
c.Add("process-a", "run-1")
c.Add("process-b", "run-2")

require.Equal(t, 2, c.Count("process-a", "run-1"))
require.Equal(t, 1, c.Count("process-b", "run-2"))
}

func TestErrorCounter_NoKeyCollision(t *testing.T) {
c := errorcounter.New()

// ("a-b", "c") and ("a", "b-c") must map to distinct keys.
c.Add("a-b", "c")
c.Add("a", "b-c")

require.Equal(t, 1, c.Count("a-b", "c"))
require.Equal(t, 1, c.Count("a", "b-c"))

c.Clear("a-b", "c")
require.Equal(t, 0, c.Count("a-b", "c"))
require.Equal(t, 1, c.Count("a", "b-c"))
}
4 changes: 2 additions & 2 deletions pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func maybePause[Type any, Status StatusType](
return false, nil
}

count := counter.Add(originalErr, processName, run.RunID)
count := counter.Add(processName, run.RunID)
if count < pauseAfterErrCount {
return false, nil
}
Expand All @@ -41,7 +41,7 @@ func maybePause[Type any, Status StatusType](
})

// Run paused - now clear the error counter.
counter.Clear(originalErr, processName, run.RunID)
counter.Clear(processName, run.RunID)
return true, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pause_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ func Test_maybeAutoPause(t *testing.T) {
RunID: "run-id",
}, "test", WithPauseFn(tc.pauseFn))

counter.Clear(testErr, processName, r.RunID)
counter.Clear(processName, r.RunID)
for range tc.errCount {
counter.Add(testErr, processName, r.RunID)
counter.Add(processName, r.RunID)
}

paused, err := maybePause(
Expand Down
2 changes: 1 addition & 1 deletion step_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func Test_stepConsumer(t *testing.T) {
})

t.Run("Pause record after exceeding allowed error count", func(t *testing.T) {
counter.Clear(testErr, processName, current.RunID)
counter.Clear(processName, current.RunID)

calls := map[string]int{
"consumerFunc": 0,
Expand Down
4 changes: 2 additions & 2 deletions timeout_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ func TestProcessTimeout(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
counter.Clear(testErr, processName, tc.record.RunID)
counter.Clear(processName, tc.record.RunID)
for range tc.currentErrCount {
counter.Add(testErr, processName, tc.record.RunID)
counter.Add(processName, tc.record.RunID)
}

calls := map[string]int{}
Expand Down
Loading