From c2cbc03c5ec001bd612d18f2675978dbc23f2971 Mon Sep 17 00:00:00 2001 From: Ed Harrod Date: Mon, 13 Apr 2026 08:54:13 +0100 Subject: [PATCH 1/2] chore: apply go fix improvements - Add missing imports - Optimize string concatenation using strings.Builder - Use Go 1.22 range-over-int syntax --- internal/metrics/metrics_test.go | 2 +- rblob/blob.go | 4 ++-- rblob/blob_test.go | 2 +- rpatterns/batch_test.go | 2 +- rpatterns/concurrent.go | 12 ++++-------- rpatterns/concurrent_test.go | 2 +- rpatterns/cursor_test.go | 4 ++-- rpatterns/deadletter_test.go | 2 +- rpatterns/parallel.go | 6 +++--- rpatterns/sequence.go | 4 ++-- rsql/cursorstable.go | 2 +- rsql/cursorstable_test.go | 2 +- rsql/db.go | 14 +++++--------- rsql/db_test.go | 2 +- rsql/eventstable_test.go | 12 +++++------- rsql/helpers_test.go | 2 +- rsql/stream_test.go | 20 +++++++++----------- 17 files changed, 41 insertions(+), 53 deletions(-) diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index da3523e..3b5572e 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -19,7 +19,7 @@ func TestActivityGauge(t *testing.T) { assertMetric := func(ch chan prometheus.Metric) { require.Len(t, ch, 2) - for i := 0; i < 2; i++ { + for range 2 { m := <-ch dm := new(dto.Metric) err := m.Write(dm) diff --git a/rblob/blob.go b/rblob/blob.go index e481f99..8ebe18d 100644 --- a/rblob/blob.go +++ b/rblob/blob.go @@ -349,8 +349,8 @@ func getNextKey(ctx context.Context, label string, bucket *blob.Bucket, prev str // makeStartAfter returns a blob.BeforeList function that starts listing after // the provided key for improved performance when scanning large buckets. -func makeStartAfter(key string) func(func(interface{}) bool) error { - return func(asFunc func(interface{}) bool) error { +func makeStartAfter(key string) func(func(any) bool) error { + return func(asFunc func(any) bool) error { s3input := new(s3.ListObjectsV2Input) if !asFunc(&s3input) { // We always expect asFunc to return true. diff --git a/rblob/blob_test.go b/rblob/blob_test.go index 2cda3c8..27e06a0 100644 --- a/rblob/blob_test.go +++ b/rblob/blob_test.go @@ -127,7 +127,7 @@ func TestWaitForMore(t *testing.T) { sc, err := s.Stream(context.Background(), "") require.NoError(t, err) - for i := 0; i < 7; i++ { + for range 7 { _, err := sc.Recv() jtest.Require(t, nil, err) } diff --git a/rpatterns/batch_test.go b/rpatterns/batch_test.go index 684f4d2..cd75655 100644 --- a/rpatterns/batch_test.go +++ b/rpatterns/batch_test.go @@ -229,7 +229,7 @@ func TestBatchError(t *testing.T) { jtest.Assert(t, expErr, err) }() - for bNo := 0; bNo < 3; bNo++ { + for range 3 { b, ok := <-batches if !ok { break diff --git a/rpatterns/concurrent.go b/rpatterns/concurrent.go index 606cb83..57aeb80 100644 --- a/rpatterns/concurrent.go +++ b/rpatterns/concurrent.go @@ -97,11 +97,9 @@ func (c *ConcurrentConsumer) Reset() error { // Start the new background loop c.inFlight = make(chan int64, c.maxInFlight) c.doneEvents = make(chan eventReturn, c.maxInFlight) - c.bgLoop.Add(1) - go func() { + c.bgLoop.Go(func() { c.updateCursorForever() - c.bgLoop.Done() - }() + }) return nil } @@ -113,9 +111,7 @@ func (c *ConcurrentConsumer) Consume(ctx context.Context, e *reflex.Event) error } c.inFlight <- e.IDInt() - c.inFlightWait.Add(1) - go func() { - defer c.inFlightWait.Done() + c.inFlightWait.Go(func() { ret := eventReturn{EventID: e.IDInt()} err := c.consumer.Consume(ctx, e) if err != nil { @@ -123,7 +119,7 @@ func (c *ConcurrentConsumer) Consume(ctx context.Context, e *reflex.Event) error ret.Err = err } c.doneEvents <- ret - }() + }) return nil } diff --git a/rpatterns/concurrent_test.go b/rpatterns/concurrent_test.go index db40527..e350c13 100644 --- a/rpatterns/concurrent_test.go +++ b/rpatterns/concurrent_test.go @@ -142,7 +142,7 @@ func TestGapIsIgnored(t *testing.T) { ctx := context.Background() - for i := 0; i < 100; i++ { + for i := range 100 { if i == 70 { continue } diff --git a/rpatterns/cursor_test.go b/rpatterns/cursor_test.go index 0af3276..3704059 100644 --- a/rpatterns/cursor_test.go +++ b/rpatterns/cursor_test.go @@ -173,12 +173,12 @@ func TestConcurrentWrites(t *testing.T) { writerCount := 100 writerReadyGroup.Add(writerCount) - for i := 0; i < writerCount; i++ { + for range writerCount { writerCompletedGroup.Add(1) go func(writerReadyGroup *sync.WaitGroup, writerCompletedGroup *sync.WaitGroup) { writerReadyGroup.Done() writerReadyGroup.Wait() - for i := 0; i < 100; i++ { + for i := range 100 { // Write the thing val := fmt.Sprintf("%v", i) err := store.SetCursor(ctx, "single-key", val) diff --git a/rpatterns/deadletter_test.go b/rpatterns/deadletter_test.go index 19a0ba4..9da5d53 100644 --- a/rpatterns/deadletter_test.go +++ b/rpatterns/deadletter_test.go @@ -20,7 +20,7 @@ var ( func makeExpected(size int) []string { var exp []string - for i := 0; i < size; i++ { + for range size { exp = append(exp, consumerErr.Error()) } return exp diff --git a/rpatterns/parallel.go b/rpatterns/parallel.go index 4b7119f..4749db4 100644 --- a/rpatterns/parallel.go +++ b/rpatterns/parallel.go @@ -111,7 +111,7 @@ func appendMofN(base string, m, n int) string { func ConsumerShards(name string, n int, opts ...ParallelOption) []ConsumerShard { conf := getParallelConfig(opts) ret := make([]ConsumerShard, 0, n) - for m := 0; m < n; m++ { + for m := range n { shardName := conf.fmtName(name, m, n) pc := ConsumerShard{ Name: shardName, @@ -191,7 +191,7 @@ func Parallel(getCtx getCtxFn, getConsumer getConsumerFn, n int, stream reflex.S cstore reflex.CursorStore, opts ...ParallelOption, ) { conf := getParallelConfig(opts) - for m := 0; m < n; m++ { + for m := range n { m := m consumerM := makeConsumer(conf, m, n, getConsumer(m)) gcf := func() context.Context { @@ -212,7 +212,7 @@ func Parallel(getCtx getCtxFn, getConsumer getConsumerFn, n int, stream reflex.S // cursors. func ParallelAck(getCtx getCtxFn, getConsumer getAckConsumerFn, n int, stream reflex.StreamFunc, opts ...ParallelOption) { conf := getParallelConfig(opts) - for m := 0; m < n; m++ { + for m := range n { m := m consumerM := makeAckConsumer(conf, m, n, getConsumer(m)) gcf := func() context.Context { diff --git a/rpatterns/sequence.go b/rpatterns/sequence.go index f4fd68c..d7a81e7 100644 --- a/rpatterns/sequence.go +++ b/rpatterns/sequence.go @@ -10,11 +10,11 @@ func (h minHeap) Len() int { return len(h) } func (h minHeap) Less(i, j int) bool { return h[i] < h[j] } func (h minHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h *minHeap) Push(x interface{}) { +func (h *minHeap) Push(x any) { *h = append(*h, x.(int64)) } -func (h *minHeap) Pop() interface{} { +func (h *minHeap) Pop() any { old := *h n := len(old) x := old[n-1] diff --git a/rsql/cursorstable.go b/rsql/cursorstable.go index a2efb57..ada4d3d 100644 --- a/rsql/cursorstable.go +++ b/rsql/cursorstable.go @@ -18,7 +18,7 @@ import ( type CursorType int // Cast returns cursor casted to type. -func (t CursorType) Cast(cursor string) (interface{}, error) { +func (t CursorType) Cast(cursor string) (any, error) { if t == cursorTypeString { return cursor, nil } diff --git a/rsql/cursorstable_test.go b/rsql/cursorstable_test.go index cbee588..65e6031 100644 --- a/rsql/cursorstable_test.go +++ b/rsql/cursorstable_test.go @@ -110,7 +110,7 @@ func TestAsyncSetCursor(t *testing.T) { s.UnblockOnce() waitForResult(t, 2, s.Count) - getCursor := func() interface{} { + getCursor := func() any { c, err := ct.GetCursor(context.Background(), dbc, "test") require.NoError(t, err) return c diff --git a/rsql/db.go b/rsql/db.go index 6f1a8fb..30c2773 100644 --- a/rsql/db.go +++ b/rsql/db.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "slices" "strconv" "testing" "time" @@ -51,7 +52,7 @@ func makeDefaultInserter(schema eTableSchema) inserter { ) error { q := "insert into " + schema.name + " set " + schema.foreignIDField + "=?, " + schema.timeField + "=now(6), " + schema.typeField + "=?" - args := []interface{}{foreignID, typ.ReflexType()} + args := []any{foreignID, typ.ReflexType()} if schema.metadataField != "" { q += ", " + schema.metadataField + "=?" @@ -77,7 +78,7 @@ func makeDefaultInserter(schema eTableSchema) inserter { } type row interface { - Scan(dest ...interface{}) error + Scan(dest ...any) error } func scan(row row) (*reflex.Event, error) { @@ -128,7 +129,7 @@ func getNextEvents(ctx context.Context, dbc DBC, schema eTableSchema, var ( q string - args []interface{} + args []any ) q += "select " + schema.idField + ", " + schema.foreignIDField + ", " + schema.timeField + ", " + schema.typeField @@ -232,12 +233,7 @@ func isMySQLErr(err error, nums ...uint16) bool { return false } - for _, num := range nums { - if me.Number == num { - return true - } - } - return false + return slices.Contains(nums, me.Number) } func getCursor(ctx context.Context, dbc DBC, schema ctableSchema, id string) (string, time.Time, error) { diff --git a/rsql/db_test.go b/rsql/db_test.go index b1735ec..c6668c1 100644 --- a/rsql/db_test.go +++ b/rsql/db_test.go @@ -301,7 +301,7 @@ func TestGetLatestID(t *testing.T) { assert.Equal(t, int64(0), id) n := 10 - for i := 0; i < n; i++ { + for range n { err := insertTestEvent(dbc, rsql.NewEventsTable("events"), "strid", testEventType(0)) assert.NoError(t, err) } diff --git a/rsql/eventstable_test.go b/rsql/eventstable_test.go index c77ac14..17e21f1 100644 --- a/rsql/eventstable_test.go +++ b/rsql/eventstable_test.go @@ -180,15 +180,13 @@ func TestNoDeadlockGap(t *testing.T) { assert.NoError(t, err) var wg sync.WaitGroup - wg.Add(1) - go func() { + wg.Go(func() { sc2, err := table.ToStream(dbc)(context.Background(), "1") assert.NoError(t, err) _, err = sc2.Recv() require.NoError(t, err) - wg.Done() - }() + }) // This should block until delay, then return 3 (noop(2) is filtered out). assertEvent(t, sc1, 3) @@ -396,7 +394,7 @@ func TestRandomGaps(t *testing.T) { // N concurrent transactions that sleep and commit or rollback. const n = 8 var inserted int64 - for i := 0; i < n; i++ { + for i := range n { tx, err := dbc.Begin() require.NoError(t, err) @@ -493,7 +491,7 @@ func TestLimit(t *testing.T) { table := rsql.NewEventsTable(eventsTable, rsql.WithEventLookupLimit(10)) - for i := 0; i < 1_000; i++ { + for i := range 1_000 { err := insertTestEvent(dbc, table, i2s(i), testEventType(1)) require.NoError(t, err) } @@ -511,7 +509,7 @@ func TestLimit(t *testing.T) { table := rsql.NewEventsTable(eventsTable, rsql.WithEventLookupLimit(2_000)) - for i := 0; i < 5_000; i++ { + for i := range 5_000 { err := insertTestEvent(dbc, table, i2s(i), testEventType(1)) require.NoError(t, err) } diff --git a/rsql/helpers_test.go b/rsql/helpers_test.go index 54f944c..8dd549d 100644 --- a/rsql/helpers_test.go +++ b/rsql/helpers_test.go @@ -45,7 +45,7 @@ func i2s(i int) string { return strconv.Itoa(i) } -func assertEqualI2S(t *testing.T, expected int, actual string, msgAndArgs ...interface{}) { +func assertEqualI2S(t *testing.T, expected int, actual string, msgAndArgs ...any) { assert.Equal(t, i2s(expected), actual, msgAndArgs) } diff --git a/rsql/stream_test.go b/rsql/stream_test.go index 13b883b..5f0967a 100644 --- a/rsql/stream_test.go +++ b/rsql/stream_test.go @@ -221,7 +221,7 @@ func TestStreamClientErrors(t *testing.T) { require.Len(t, calls, 3) require.Nil(t, ctx.Err()) // parent context not cancelled - for i := 0; i < 3; i++ { + for i := range 3 { res := calls[i] require.Equal(t, i+1, res.Type.ReflexType()) require.Equal(t, int64(i+1), res.ForeignIDInt()) @@ -270,12 +270,10 @@ func TestConsumeStreamLag(t *testing.T) { reflex.WithStreamLag(2*time.Second+100*time.Millisecond), // Add 100ms just to be sure. ) var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { err := reflex.Run(context.Background(), spec) jtest.Require(t, errDone, err) - }() + }) i := 1 for ev := range feed { @@ -423,7 +421,7 @@ func TestStreamMetadata(t *testing.T) { assert.NoError(t, err) var results []*reflex.Event - for i := 0; i < prefill; i++ { + for range prefill { e, err := sc.Recv() assert.NoError(t, err) results = append(results, e) @@ -438,7 +436,7 @@ func TestStreamMetadata(t *testing.T) { assert.Len(t, e.MetaData, i) var meta []byte - for l := 0; l < i; l++ { + for l := range i { meta = append(meta, byte(l)) } assert.EqualValues(t, meta, e.MetaData) @@ -483,7 +481,7 @@ func TestStreamLag(t *testing.T) { // First read all events into the cache. sc1 := s.eTable.Stream(ctx, s.dbc, "") - for i := 0; i < total; i++ { + for range total { _, err := sc1.Recv() require.NoError(t, err) } @@ -495,7 +493,7 @@ func TestStreamLag(t *testing.T) { lag := reflex.WithStreamLag(time.Second * (60*5 + 30)) sc2 := s.eTable.Stream(ctx, s.dbc, "", lag) - for i := 0; i < 4; i++ { + for range 4 { _, err := sc2.Recv() require.NoError(t, err) } @@ -561,7 +559,7 @@ func TestStreamLagNoCache(t *testing.T) { // First read all 10 events. sc1 := s.eTable.Stream(ctx, s.dbc, "") - for i := 0; i < total; i++ { + for range total { _, err := sc1.Recv() require.NoError(t, err) } @@ -573,7 +571,7 @@ func TestStreamLagNoCache(t *testing.T) { lag := reflex.WithStreamLag(time.Second * (60*5 + 30)) sc2 := s.eTable.Stream(ctx, s.dbc, "", lag) - for i := 0; i < 4; i++ { + for range 4 { _, err := sc2.Recv() require.NoError(t, err) } From e05299f103e14984a18cf372c84448fa5bb0f914 Mon Sep 17 00:00:00 2001 From: Ed Harrod Date: Tue, 5 May 2026 12:02:54 +0100 Subject: [PATCH 2/2] fix: expand variadic msgAndArgs in assertEqualI2S Co-Authored-By: Claude Sonnet 4.6 --- rsql/helpers_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsql/helpers_test.go b/rsql/helpers_test.go index 8dd549d..c79fbce 100644 --- a/rsql/helpers_test.go +++ b/rsql/helpers_test.go @@ -46,7 +46,7 @@ func i2s(i int) string { } func assertEqualI2S(t *testing.T, expected int, actual string, msgAndArgs ...any) { - assert.Equal(t, i2s(expected), actual, msgAndArgs) + assert.Equal(t, i2s(expected), actual, msgAndArgs...) } func waitFor(t *testing.T, d time.Duration, f func() bool) {