Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestActivityGauge(t *testing.T) {

assertMetric := func(ch chan prometheus.Metric) {
require.Len(t, ch, 2)
for i := 0; i < 2; i++ {
for range 2 {
m := <-ch
dm := new(dto.Metric)
err := m.Write(dm)
Expand Down
4 changes: 2 additions & 2 deletions rblob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,8 @@ func getNextKey(ctx context.Context, label string, bucket *blob.Bucket, prev str

// makeStartAfter returns a blob.BeforeList function that starts listing after
// the provided key for improved performance when scanning large buckets.
func makeStartAfter(key string) func(func(interface{}) bool) error {
return func(asFunc func(interface{}) bool) error {
func makeStartAfter(key string) func(func(any) bool) error {
return func(asFunc func(any) bool) error {
s3input := new(s3.ListObjectsV2Input)
if !asFunc(&s3input) {
// We always expect asFunc to return true.
Expand Down
2 changes: 1 addition & 1 deletion rblob/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestWaitForMore(t *testing.T) {
sc, err := s.Stream(context.Background(), "")
require.NoError(t, err)

for i := 0; i < 7; i++ {
for range 7 {
_, err := sc.Recv()
jtest.Require(t, nil, err)
}
Expand Down
2 changes: 1 addition & 1 deletion rpatterns/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func TestBatchError(t *testing.T) {
jtest.Assert(t, expErr, err)
}()

for bNo := 0; bNo < 3; bNo++ {
for range 3 {
b, ok := <-batches
if !ok {
break
Expand Down
12 changes: 4 additions & 8 deletions rpatterns/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,9 @@ func (c *ConcurrentConsumer) Reset() error {
// Start the new background loop
c.inFlight = make(chan int64, c.maxInFlight)
c.doneEvents = make(chan eventReturn, c.maxInFlight)
c.bgLoop.Add(1)
go func() {
c.bgLoop.Go(func() {
c.updateCursorForever()
c.bgLoop.Done()
}()
})
return nil
}

Expand All @@ -113,17 +111,15 @@ func (c *ConcurrentConsumer) Consume(ctx context.Context, e *reflex.Event) error
}

c.inFlight <- e.IDInt()
c.inFlightWait.Add(1)
go func() {
defer c.inFlightWait.Done()
c.inFlightWait.Go(func() {
ret := eventReturn{EventID: e.IDInt()}
err := c.consumer.Consume(ctx, e)
if err != nil {
// NoReturnErr: Populate the returned error
ret.Err = err
}
c.doneEvents <- ret
}()
})
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion rpatterns/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestGapIsIgnored(t *testing.T) {

ctx := context.Background()

for i := 0; i < 100; i++ {
for i := range 100 {
if i == 70 {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions rpatterns/cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ func TestConcurrentWrites(t *testing.T) {

writerCount := 100
writerReadyGroup.Add(writerCount)
for i := 0; i < writerCount; i++ {
for range writerCount {
writerCompletedGroup.Add(1)
go func(writerReadyGroup *sync.WaitGroup, writerCompletedGroup *sync.WaitGroup) {
writerReadyGroup.Done()
writerReadyGroup.Wait()
for i := 0; i < 100; i++ {
for i := range 100 {
// Write the thing
val := fmt.Sprintf("%v", i)
err := store.SetCursor(ctx, "single-key", val)
Expand Down
2 changes: 1 addition & 1 deletion rpatterns/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (

func makeExpected(size int) []string {
var exp []string
for i := 0; i < size; i++ {
for range size {
exp = append(exp, consumerErr.Error())
}
return exp
Expand Down
6 changes: 3 additions & 3 deletions rpatterns/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func appendMofN(base string, m, n int) string {
func ConsumerShards(name string, n int, opts ...ParallelOption) []ConsumerShard {
conf := getParallelConfig(opts)
ret := make([]ConsumerShard, 0, n)
for m := 0; m < n; m++ {
for m := range n {
shardName := conf.fmtName(name, m, n)
pc := ConsumerShard{
Name: shardName,
Expand Down Expand Up @@ -191,7 +191,7 @@ func Parallel(getCtx getCtxFn, getConsumer getConsumerFn, n int, stream reflex.S
cstore reflex.CursorStore, opts ...ParallelOption,
) {
conf := getParallelConfig(opts)
for m := 0; m < n; m++ {
for m := range n {
m := m
consumerM := makeConsumer(conf, m, n, getConsumer(m))
gcf := func() context.Context {
Expand All @@ -212,7 +212,7 @@ func Parallel(getCtx getCtxFn, getConsumer getConsumerFn, n int, stream reflex.S
// cursors.
func ParallelAck(getCtx getCtxFn, getConsumer getAckConsumerFn, n int, stream reflex.StreamFunc, opts ...ParallelOption) {
conf := getParallelConfig(opts)
for m := 0; m < n; m++ {
for m := range n {
m := m
consumerM := makeAckConsumer(conf, m, n, getConsumer(m))
gcf := func() context.Context {
Expand Down
4 changes: 2 additions & 2 deletions rpatterns/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ func (h minHeap) Len() int { return len(h) }
func (h minHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h minHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *minHeap) Push(x interface{}) {
func (h *minHeap) Push(x any) {
*h = append(*h, x.(int64))
}

func (h *minHeap) Pop() interface{} {
func (h *minHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
Expand Down
2 changes: 1 addition & 1 deletion rsql/cursorstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type CursorType int

// Cast returns cursor casted to type.
func (t CursorType) Cast(cursor string) (interface{}, error) {
func (t CursorType) Cast(cursor string) (any, error) {
if t == cursorTypeString {
return cursor, nil
}
Expand Down
2 changes: 1 addition & 1 deletion rsql/cursorstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestAsyncSetCursor(t *testing.T) {
s.UnblockOnce()
waitForResult(t, 2, s.Count)

getCursor := func() interface{} {
getCursor := func() any {
c, err := ct.GetCursor(context.Background(), dbc, "test")
require.NoError(t, err)
return c
Expand Down
14 changes: 5 additions & 9 deletions rsql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"slices"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -51,7 +52,7 @@ func makeDefaultInserter(schema eTableSchema) inserter {
) error {
q := "insert into " + schema.name +
" set " + schema.foreignIDField + "=?, " + schema.timeField + "=now(6), " + schema.typeField + "=?"
args := []interface{}{foreignID, typ.ReflexType()}
args := []any{foreignID, typ.ReflexType()}

if schema.metadataField != "" {
q += ", " + schema.metadataField + "=?"
Expand All @@ -77,7 +78,7 @@ func makeDefaultInserter(schema eTableSchema) inserter {
}

type row interface {
Scan(dest ...interface{}) error
Scan(dest ...any) error
}

func scan(row row) (*reflex.Event, error) {
Expand Down Expand Up @@ -128,7 +129,7 @@ func getNextEvents(ctx context.Context, dbc DBC, schema eTableSchema,

var (
q string
args []interface{}
args []any
)

q += "select " + schema.idField + ", " + schema.foreignIDField + ", " + schema.timeField + ", " + schema.typeField
Expand Down Expand Up @@ -232,12 +233,7 @@ func isMySQLErr(err error, nums ...uint16) bool {
return false
}

for _, num := range nums {
if me.Number == num {
return true
}
}
return false
return slices.Contains(nums, me.Number)
}

func getCursor(ctx context.Context, dbc DBC, schema ctableSchema, id string) (string, time.Time, error) {
Expand Down
2 changes: 1 addition & 1 deletion rsql/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func TestGetLatestID(t *testing.T) {
assert.Equal(t, int64(0), id)

n := 10
for i := 0; i < n; i++ {
for range n {
err := insertTestEvent(dbc, rsql.NewEventsTable("events"), "strid", testEventType(0))
assert.NoError(t, err)
}
Expand Down
12 changes: 5 additions & 7 deletions rsql/eventstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,13 @@ func TestNoDeadlockGap(t *testing.T) {
assert.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Go(func() {
sc2, err := table.ToStream(dbc)(context.Background(), "1")
assert.NoError(t, err)

_, err = sc2.Recv()
require.NoError(t, err)
wg.Done()
}()
})

// This should block until delay, then return 3 (noop(2) is filtered out).
assertEvent(t, sc1, 3)
Expand Down Expand Up @@ -396,7 +394,7 @@ func TestRandomGaps(t *testing.T) {
// N concurrent transactions that sleep and commit or rollback.
const n = 8
var inserted int64
for i := 0; i < n; i++ {
for i := range n {
tx, err := dbc.Begin()
require.NoError(t, err)

Expand Down Expand Up @@ -493,7 +491,7 @@ func TestLimit(t *testing.T) {

table := rsql.NewEventsTable(eventsTable, rsql.WithEventLookupLimit(10))

for i := 0; i < 1_000; i++ {
for i := range 1_000 {
err := insertTestEvent(dbc, table, i2s(i), testEventType(1))
require.NoError(t, err)
}
Expand All @@ -511,7 +509,7 @@ func TestLimit(t *testing.T) {

table := rsql.NewEventsTable(eventsTable, rsql.WithEventLookupLimit(2_000))

for i := 0; i < 5_000; i++ {
for i := range 5_000 {
err := insertTestEvent(dbc, table, i2s(i), testEventType(1))
require.NoError(t, err)
}
Expand Down
4 changes: 2 additions & 2 deletions rsql/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func i2s(i int) string {
return strconv.Itoa(i)
}

func assertEqualI2S(t *testing.T, expected int, actual string, msgAndArgs ...interface{}) {
assert.Equal(t, i2s(expected), actual, msgAndArgs)
func assertEqualI2S(t *testing.T, expected int, actual string, msgAndArgs ...any) {
assert.Equal(t, i2s(expected), actual, msgAndArgs...)
}

func waitFor(t *testing.T, d time.Duration, f func() bool) {
Expand Down
20 changes: 9 additions & 11 deletions rsql/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestStreamClientErrors(t *testing.T) {
require.Len(t, calls, 3)
require.Nil(t, ctx.Err()) // parent context not cancelled

for i := 0; i < 3; i++ {
for i := range 3 {
res := calls[i]
require.Equal(t, i+1, res.Type.ReflexType())
require.Equal(t, int64(i+1), res.ForeignIDInt())
Expand Down Expand Up @@ -270,12 +270,10 @@ func TestConsumeStreamLag(t *testing.T) {
reflex.WithStreamLag(2*time.Second+100*time.Millisecond), // Add 100ms just to be sure.
)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
err := reflex.Run(context.Background(), spec)
jtest.Require(t, errDone, err)
}()
})

i := 1
for ev := range feed {
Expand Down Expand Up @@ -423,7 +421,7 @@ func TestStreamMetadata(t *testing.T) {
assert.NoError(t, err)

var results []*reflex.Event
for i := 0; i < prefill; i++ {
for range prefill {
e, err := sc.Recv()
assert.NoError(t, err)
results = append(results, e)
Expand All @@ -438,7 +436,7 @@ func TestStreamMetadata(t *testing.T) {
assert.Len(t, e.MetaData, i)

var meta []byte
for l := 0; l < i; l++ {
for l := range i {
meta = append(meta, byte(l))
}
assert.EqualValues(t, meta, e.MetaData)
Expand Down Expand Up @@ -483,7 +481,7 @@ func TestStreamLag(t *testing.T) {

// First read all events into the cache.
sc1 := s.eTable.Stream(ctx, s.dbc, "")
for i := 0; i < total; i++ {
for range total {
_, err := sc1.Recv()
require.NoError(t, err)
}
Expand All @@ -495,7 +493,7 @@ func TestStreamLag(t *testing.T) {
lag := reflex.WithStreamLag(time.Second * (60*5 + 30))
sc2 := s.eTable.Stream(ctx, s.dbc, "", lag)

for i := 0; i < 4; i++ {
for range 4 {
_, err := sc2.Recv()
require.NoError(t, err)
}
Expand Down Expand Up @@ -561,7 +559,7 @@ func TestStreamLagNoCache(t *testing.T) {

// First read all 10 events.
sc1 := s.eTable.Stream(ctx, s.dbc, "")
for i := 0; i < total; i++ {
for range total {
_, err := sc1.Recv()
require.NoError(t, err)
}
Expand All @@ -573,7 +571,7 @@ func TestStreamLagNoCache(t *testing.T) {
lag := reflex.WithStreamLag(time.Second * (60*5 + 30))
sc2 := s.eTable.Stream(ctx, s.dbc, "", lag)

for i := 0; i < 4; i++ {
for range 4 {
_, err := sc2.Recv()
require.NoError(t, err)
}
Expand Down
Loading