diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7f6765e9..8a6f45ea 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,7 +12,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [1.22.x, 1.23.x] + go-version: [1.25.x, 1.26.x, stable] steps: - name: Install Go@v${{ matrix.go-version }} uses: actions/setup-go@v4 diff --git a/DEVELOPING.md b/DEVELOPING.md index 712be063..1c989a57 100644 --- a/DEVELOPING.md +++ b/DEVELOPING.md @@ -23,7 +23,7 @@ All NPM commands can be found in `package.json`. To embed bundled js, do ``` -go get -u github.com/go-bindata/go-bindata/... +go install github.com/kevinburke/go-bindata/v4/go-bindata@latest cd webui/internal/assets go generate ``` diff --git a/client.go b/client.go index 2375d47b..772d414f 100644 --- a/client.go +++ b/client.go @@ -399,7 +399,7 @@ func (c *Client) RetryDeadJob(diedAt int64, jobID string) error { script := redis.NewScript(len(jobNames)+1, redisLuaRequeueSingleDeadCmd) - args := make([]interface{}, 0, len(jobNames)+1+3) + args := make([]any, 0, len(jobNames)+1+3) args = append(args, redisKeyDead(c.namespace)) // KEY[1] for _, jobName := range jobNames { args = append(args, redisKeyJobs(c.namespace, jobName)) // KEY[2, 3, ...] @@ -442,7 +442,7 @@ func (c *Client) RetryAllDeadJobs() error { script := redis.NewScript(len(jobNames)+1, redisLuaRequeueAllDeadCmd) - args := make([]interface{}, 0, len(jobNames)+1+3) + args := make([]any, 0, len(jobNames)+1+3) args = append(args, redisKeyDead(c.namespace)) // KEY[1] for _, jobName := range jobNames { args = append(args, redisKeyJobs(c.namespace, jobName)) // KEY[2, 3, ...] @@ -456,7 +456,7 @@ func (c *Client) RetryAllDeadJobs() error { // Cap iterations for safety (which could reprocess 1k*1k jobs). // This is conceptually an infinite loop but let's be careful. - for i := 0; i < 1000; i++ { + for range 1000 { res, err := redis.Int64(script.Do(conn, args...)) if err != nil { logError("client.retry_all_dead_jobs.do", err) @@ -538,7 +538,7 @@ func (c *Client) DeleteRetryJob(retryAt int64, jobID string) error { func (c *Client) deleteZsetJob(zsetKey string, zscore int64, jobID string) (bool, []byte, error) { script := redis.NewScript(1, redisLuaDeleteSingleCmd) - args := make([]interface{}, 0, 1+2) + args := make([]any, 0, 1+2) args = append(args, zsetKey) // KEY[1] args = append(args, zscore) // ARGV[1] args = append(args, jobID) // ARGV[2] diff --git a/client_test.go b/client_test.go index c936c4aa..29c809c3 100644 --- a/client_test.go +++ b/client_test.go @@ -236,8 +236,8 @@ func TestClientScheduledJobs(t *testing.T) { assert.EqualValues(t, 1425263409, jobs[1].EnqueuedAt) assert.EqualValues(t, 1425263409, jobs[2].EnqueuedAt) - assert.EqualValues(t, interface{}(1), jobs[0].Args["a"]) - assert.EqualValues(t, interface{}(2), jobs[0].Args["b"]) + assert.EqualValues(t, any(1), jobs[0].Args["a"]) + assert.EqualValues(t, any(2), jobs[0].Args["b"]) assert.EqualValues(t, 0, jobs[0].Fails) assert.EqualValues(t, 0, jobs[1].Fails) @@ -285,7 +285,7 @@ func TestClientRetryJobs(t *testing.T) { assert.EqualValues(t, 1425263429, jobs[0].FailedAt) assert.Equal(t, "wat", jobs[0].Name) assert.EqualValues(t, 1425263409, jobs[0].EnqueuedAt) - assert.EqualValues(t, interface{}(1), jobs[0].Args["a"]) + assert.EqualValues(t, any(1), jobs[0].Args["a"]) assert.EqualValues(t, 1, jobs[0].Fails) assert.EqualValues(t, 1425263429, jobs[0].Job.FailedAt) assert.Equal(t, "ohno", jobs[0].LastErr) @@ -325,7 +325,7 @@ func TestClientDeadJobs(t *testing.T) { assert.EqualValues(t, 1425263429, jobs[0].FailedAt) assert.Equal(t, "wat", jobs[0].Name) assert.EqualValues(t, 1425263409, jobs[0].EnqueuedAt) - assert.EqualValues(t, interface{}(1), jobs[0].Args["a"]) + assert.EqualValues(t, any(1), jobs[0].Args["a"]) assert.EqualValues(t, 1, jobs[0].Fails) assert.EqualValues(t, 1425263429, jobs[0].Job.FailedAt) assert.Equal(t, "ohno", jobs[0].LastErr) @@ -445,7 +445,7 @@ func TestClientRetryDeadJobWithArgs(t *testing.T) { Name: name, ID: makeIdentifier(), EnqueuedAt: encAt, - Args: map[string]interface{}{"a": "wat"}, + Args: map[string]any{"a": "wat"}, Fails: 3, LastErr: "sorry", FailedAt: failAt, @@ -571,7 +571,7 @@ func TestClientRetryAllDeadJobsBig(t *testing.T) { // Ok, we need to efficiently add 10k jobs to the dead queue. // I tried using insertDeadJob but it was too slow (increased test time by 1 second) dead := redisKeyDead(ns) - for i := 0; i < 10000; i++ { + for range 10000 { job := &Job{ Name: "wat1", ID: makeIdentifier(), diff --git a/cmd/workenqueue/main.go b/cmd/workenqueue/main.go index 836a93ab..6bb1c3c8 100644 --- a/cmd/workenqueue/main.go +++ b/cmd/workenqueue/main.go @@ -11,7 +11,7 @@ import ( "github.com/gomodule/redigo/redis" ) -var redisHostPort = flag.String("redis", ":6379", "redis hostport") +var redisHostPort = flag.String("redis", "redis://:6379", "redis hostport") var redisNamespace = flag.String("ns", "work", "redis namespace") var jobName = flag.String("job", "", "job name") var jobArgs = flag.String("args", "{}", "job arguments") @@ -26,7 +26,7 @@ func main() { pool := newPool(*redisHostPort) - var args map[string]interface{} + var args map[string]any err := json.Unmarshal([]byte(*jobArgs), &args) if err != nil { fmt.Println("invalid args:", err) diff --git a/cmd/workfakedata/main.go b/cmd/workfakedata/main.go index 71fd6b11..075c18c3 100644 --- a/cmd/workfakedata/main.go +++ b/cmd/workfakedata/main.go @@ -3,21 +3,21 @@ package main import ( "flag" "fmt" - "math/rand" + "math/rand/v2" "time" "github.com/gojek/work" "github.com/gomodule/redigo/redis" ) -var redisHostPort = flag.String("redis", ":6379", "redis hostport") +var redisHostPort = flag.String("redis", "redis://:6379", "redis hostport") var redisNamespace = flag.String("ns", "work", "redis namespace") func epsilonHandler(job *work.Job) error { fmt.Println("epsilon") time.Sleep(time.Second) - if rand.Intn(2) == 0 { + if rand.IntN(2) == 0 { return fmt.Errorf("random error") } return nil @@ -42,7 +42,7 @@ func main() { go func() { for { en := work.NewEnqueuer(*redisNamespace, pool) - for i := 0; i < 20; i++ { + for i := range 20 { en.Enqueue("foobar", work.Q{"i": i}) } diff --git a/cmd/workwebui/main.go b/cmd/workwebui/main.go index 1407b226..4aebca31 100644 --- a/cmd/workwebui/main.go +++ b/cmd/workwebui/main.go @@ -13,7 +13,7 @@ import ( ) var ( - redisHostPort = flag.String("redis", ":6379", "redis hostport") + redisHostPort = flag.String("redis", "redis://:6379", "redis hostport") redisDatabase = flag.String("database", "0", "redis database") redisNamespace = flag.String("ns", "work", "redis namespace") webHostPort = flag.String("listen", ":5040", "hostport to listen for HTTP JSON API") diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index e930521e..17ff00ff 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -2,7 +2,7 @@ package work import ( "fmt" - "math/rand" + "math/rand/v2" "strings" "time" @@ -60,7 +60,7 @@ func (r *deadPoolReaper) loop() { return case <-timer.C: // Schedule next occurrence periodically with jitter - timer.Reset(r.reapPeriod + time.Duration(rand.Intn(reapJitterSecs))*time.Second) + timer.Reset(r.reapPeriod + time.Duration(rand.IntN(reapJitterSecs))*time.Second) // Reap if err := r.reap(); err != nil { @@ -111,7 +111,7 @@ func (r *deadPoolReaper) reap() error { func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) error { numKeys := len(jobTypes) * 2 redisReapLocksScript := redis.NewScript(numKeys, redisLuaReapStaleLocks) - var scriptArgs = make([]interface{}, 0, numKeys+1) // +1 for argv[1] + var scriptArgs = make([]any, 0, numKeys+1) // +1 for argv[1] for _, jobType := range jobTypes { scriptArgs = append(scriptArgs, redisKeyJobsLock(r.namespace, jobType), redisKeyJobsLockInfo(r.namespace, jobType)) @@ -130,7 +130,7 @@ func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) er func (r *deadPoolReaper) requeueInProgressJobs(poolID string, jobTypes []string) error { numKeys := len(jobTypes) * requeueKeysPerJob redisRequeueScript := redis.NewScript(numKeys, redisLuaReenqueueJob) - var scriptArgs = make([]interface{}, 0, numKeys+1) + var scriptArgs = make([]any, 0, numKeys+1) for _, jobType := range jobTypes { // pops from in progress, push into job queue and decrement the queue lock diff --git a/enqueue.go b/enqueue.go index abf4756c..6c1631bf 100644 --- a/enqueue.go +++ b/enqueue.go @@ -53,7 +53,7 @@ func NewEnqueuerWithOptions(namespace string, pool *redis.Pool, opt EnqueuerOpti // Enqueue will enqueue the specified job name and arguments. The args param can be nil if no args ar needed. // Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com"}) -func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, error) { +func (e *Enqueuer) Enqueue(jobName string, args map[string]any) (*Job, error) { job := &Job{ Name: jobName, ID: makeIdentifier(), @@ -81,11 +81,11 @@ func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, e } // EnqueueIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds. -func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) { +func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]any) (*ScheduledJob, error) { return e.EnqueueAt(jobName, epochAfterSeconds(secondsFromNow), args) } -func (e *Enqueuer) EnqueueAt(jobName string, epochSeconds int64, args map[string]interface{}) (*ScheduledJob, error) { +func (e *Enqueuer) EnqueueAt(jobName string, epochSeconds int64, args map[string]any) (*ScheduledJob, error) { job := &Job{ Name: jobName, ID: makeIdentifier(), @@ -123,12 +123,12 @@ func (e *Enqueuer) EnqueueAt(jobName string, epochSeconds int64, args map[string // Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once. // In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. // EnqueueUnique returns the job if it was enqueued and nil if it wasn't -func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*Job, error) { +func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]any) (*Job, error) { return e.EnqueueUniqueByKey(jobName, args, nil) } // EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs. -func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) { +func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]any) (*ScheduledJob, error) { return e.EnqueueUniqueInByKey(jobName, secondsFromNow, args, nil) } @@ -138,7 +138,7 @@ func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args ma // Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once. // In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. // EnqueueUniqueByKey returns the job if it was enqueued and nil if it wasn't -func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (*Job, error) { +func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]any, keyMap map[string]any) (*Job, error) { enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap) if err != nil { return nil, err @@ -154,17 +154,17 @@ func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{ // EnqueueUniqueInByKey enqueues a job in the scheduled job queue that is unique on specified key for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs. // Subsequent calls with same key will update arguments -func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error) { +func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, args map[string]any, keyMap map[string]any) (*ScheduledJob, error) { return e.EnqueueUniqueAtByKey(jobName, epochAfterSeconds(secondsFromNow), args, keyMap) } // EnqueueUniqueAt enqueues a unique job at the specified absolute epoch time in seconds. See EnqueueUnique for semantics of unique jobs. -func (e *Enqueuer) EnqueueUniqueAt(jobName string, epochSeconds int64, args map[string]interface{}) (*ScheduledJob, error) { +func (e *Enqueuer) EnqueueUniqueAt(jobName string, epochSeconds int64, args map[string]any) (*ScheduledJob, error) { return e.EnqueueUniqueAtByKey(jobName, epochSeconds, args, nil) } // EnqueueUniqueAtByKey enqueues a job unique on specified key at the specified absolute epoch time in seconds, updating arguments. See EnqueueUnique for semantics of unique jobs. -func (e *Enqueuer) EnqueueUniqueAtByKey(jobName string, epochSeconds int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error) { +func (e *Enqueuer) EnqueueUniqueAtByKey(jobName string, epochSeconds int64, args map[string]any, keyMap map[string]any) (*ScheduledJob, error) { enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap) if err != nil { return nil, err @@ -210,7 +210,7 @@ func (e *Enqueuer) addToKnownJobs(conn redis.Conn, jobName string) error { type enqueueFnType func(*int64) (string, error) -func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (enqueueFnType, *Job, error) { +func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]any, keyMap map[string]any) (enqueueFnType, *Job, error) { useDefaultKeys := false if keyMap == nil { useDefaultKeys = true @@ -244,7 +244,7 @@ func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{}, return "", err } - scriptArgs := []interface{}{} + scriptArgs := []any{} script := e.enqueueUniqueScript scriptArgs = append(scriptArgs, e.queuePrefix+jobName) // KEY[1] @@ -286,7 +286,7 @@ func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{}, return enqueueFn, job, nil } -func (e *Enqueuer) redisDoHelper(c redis.Conn, cmdName string, args ...interface{}) (reply interface{}, err error) { +func (e *Enqueuer) redisDoHelper(c redis.Conn, cmdName string, args ...any) (reply any, err error) { if err = c.Send(cmdName, args...); err != nil { return } diff --git a/enqueue_test.go b/enqueue_test.go index 02d247ac..bd423750 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/rafaeljusto/redigomock" + "github.com/rafaeljusto/redigomock/v3" "github.com/stretchr/testify/assert" ) @@ -61,7 +61,7 @@ func TestEnqueue(t *testing.T) { func TestEnqueue_WithMock(t *testing.T) { ns := "work" jobName := "test" - jobArgs := map[string]interface{}{"arg": "value"} + jobArgs := map[string]any{"arg": "value"} var cases = []struct { name string enqueuerOption EnqueuerOption @@ -200,7 +200,7 @@ func TestEnqueueIn(t *testing.T) { func TestEnqueueIn_WithMock(t *testing.T) { ns := "work" jobName := "test" - jobArgs := map[string]interface{}{"arg": "value"} + jobArgs := map[string]any{"arg": "value"} secondsFromNow := int64(100) now := time.Now().Unix() setNowEpochSecondsMock(now) @@ -332,7 +332,7 @@ func TestEnqueueAt(t *testing.T) { func TestEnqueueAt_WithMock(t *testing.T) { ns := "work" jobName := "test" - jobArgs := map[string]interface{}{"arg": "value"} + jobArgs := map[string]any{"arg": "value"} now := time.Now().Unix() runAt := now + 100 setNowEpochSecondsMock(now) @@ -503,7 +503,7 @@ func TestEnqueueUnique(t *testing.T) { func TestEnqueueUnique_WithMock(t *testing.T) { ns := "work" jobName := "test" - jobArgs := map[string]interface{}{"arg": "value"} + jobArgs := map[string]any{"arg": "value"} ok := "ok" dup := "ok" @@ -660,7 +660,7 @@ func TestEnqueueUniqueIn(t *testing.T) { func TestEnqueueUniqueIn_WithMock(t *testing.T) { ns := "work" jobName := "test" - jobArgs := map[string]interface{}{"arg": "value"} + jobArgs := map[string]any{"arg": "value"} secondsFromNow := int64(100) now := time.Now().Unix() setNowEpochSecondsMock(now) @@ -847,8 +847,8 @@ func TestEnqueueUniqueByKey(t *testing.T) { func TestEnqueueUniqueByKey_WithMock(t *testing.T) { ns := "work" jobName := "test" - jobArgs := map[string]interface{}{"arg": "value"} - jobKeyMap := map[string]interface{}{"key": "value"} + jobArgs := map[string]any{"arg": "value"} + jobKeyMap := map[string]any{"key": "value"} ok := "ok" dup := "ok" @@ -982,7 +982,7 @@ func TestEnqueueUniqueAt(t *testing.T) { func TestEnqueueUniqueAt_WithMock(t *testing.T) { ns := "work" jobName := "test" - jobArgs := map[string]interface{}{"arg": "value"} + jobArgs := map[string]any{"arg": "value"} runAt := time.Now().Unix() + 100 @@ -1105,8 +1105,8 @@ func TestEnqueueUniqueInByKey(t *testing.T) { func TestEnqueueUniqueInByKey_WithMock(t *testing.T) { ns := "work" jobName := "test" - jobArgs := map[string]interface{}{"arg": "value"} - jobKeyMap := map[string]interface{}{"key": "value"} + jobArgs := map[string]any{"arg": "value"} + jobKeyMap := map[string]any{"key": "value"} secondsFromNow := int64(100) now := time.Now().Unix() setNowEpochSecondsMock(now) @@ -1240,8 +1240,8 @@ func TestEnqueueUniqueAtByKey(t *testing.T) { func TestEnqueueUniqueAtByKey_WithMock(t *testing.T) { ns := "work" jobName := "test" - jobArgs := map[string]interface{}{"arg": "value"} - jobKeyMap := map[string]interface{}{"key": "value"} + jobArgs := map[string]any{"arg": "value"} + jobKeyMap := map[string]any{"key": "value"} runAt := time.Now().Unix() + 100 ok := "ok" diff --git a/go.mod b/go.mod index be39c9ab..5845247e 100644 --- a/go.mod +++ b/go.mod @@ -1,20 +1,18 @@ module github.com/gojek/work -go 1.22 +go 1.25 require ( - github.com/alicebob/miniredis/v2 v2.14.3 - github.com/braintree/manners v0.0.0-20160418043613-82a8879fc5fd - github.com/gomodule/redigo v1.8.9 - github.com/rafaeljusto/redigomock v2.4.0+incompatible + github.com/alicebob/miniredis/v2 v2.37.0 + github.com/gomodule/redigo v1.9.3 + github.com/rafaeljusto/redigomock/v3 v3.1.3 github.com/robfig/cron/v3 v3.0.1 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.11.1 ) require ( - github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect - github.com/davecgh/go-spew v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect - gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect + github.com/yuin/gopher-lua v1.1.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index b5b350b0..ccd7b341 100644 --- a/go.sum +++ b/go.sum @@ -1,29 +1,25 @@ -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= -github.com/alicebob/miniredis/v2 v2.14.3 h1:QWoo2wchYmLgOB6ctlTt2dewQ1Vu6phl+iQbwT8SYGo= -github.com/alicebob/miniredis/v2 v2.14.3/go.mod h1:gquAfGbzn92jvtrSC69+6zZnwSODVXVpYDRaGhWaL6I= -github.com/braintree/manners v0.0.0-20160418043613-82a8879fc5fd h1:ePesaBzdTmoMQjwqRCLP2jY+jjWMBpwws/LEQdt1fMM= -github.com/braintree/manners v0.0.0-20160418043613-82a8879fc5fd/go.mod h1:TNehV1AhBwtT7Bd+rh8G6MoGDbBLNs/sKdk3nvr4Yzg= -github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= -github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= -github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/alicebob/miniredis/v2 v2.37.0 h1:RheObYW32G1aiJIj81XVt78ZHJpHonHLHW7OLIshq68= +github.com/alicebob/miniredis/v2 v2.37.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws= -github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gomodule/redigo v1.8.8/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= +github.com/gomodule/redigo v1.9.3 h1:dNPSXeXv6HCq2jdyWfjgmhBdqnR6PRO3m/G05nvpPC8= +github.com/gomodule/redigo v1.9.3/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rafaeljusto/redigomock v2.4.0+incompatible h1:d7uo5MVINMxnRr20MxbgDkmZ8QRfevjOVgEa4n0OZyY= -github.com/rafaeljusto/redigomock v2.4.0+incompatible/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY= +github.com/rafaeljusto/redigomock/v3 v3.1.3 h1:8lwZU+uAg0LQutZ/dFTSJjQR/WudR7ogXaTW8Y+4sz4= +github.com/rafaeljusto/redigomock/v3 v3.1.3/go.mod h1:F9zPqz8rMriScZkPtUiLJoLruYcpGo/XXREpeyasREM= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg= -github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= -golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/identifier.go b/identifier.go index a4c652c9..7c4a6df1 100644 --- a/identifier.go +++ b/identifier.go @@ -2,15 +2,14 @@ package work import ( "crypto/rand" - "fmt" - "io" + "encoding/hex" ) func makeIdentifier() string { b := make([]byte, 12) - _, err := io.ReadFull(rand.Reader, b) + _, err := rand.Read(b) if err != nil { return "" } - return fmt.Sprintf("%x", b) + return hex.EncodeToString(b) } diff --git a/job.go b/job.go index 7c6f5d79..425d754c 100644 --- a/job.go +++ b/job.go @@ -10,12 +10,12 @@ import ( // Job represents a job. type Job struct { // Inputs when making a new job - Name string `json:"name,omitempty"` - ID string `json:"id"` - EnqueuedAt int64 `json:"t"` - Args map[string]interface{} `json:"args"` - Unique bool `json:"unique,omitempty"` - UniqueKey string `json:"unique_key,omitempty"` + Name string `json:"name,omitempty"` + ID string `json:"id"` + EnqueuedAt int64 `json:"t"` + Args map[string]any `json:"args"` + Unique bool `json:"unique,omitempty"` + UniqueKey string `json:"unique_key,omitempty"` // Inputs when retrying Fails int64 `json:"fails,omitempty"` // number of times this job has failed @@ -31,7 +31,7 @@ type Job struct { // Q is a shortcut to easily specify arguments for jobs when enqueueing them. // Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com", "track": true}) -type Q map[string]interface{} +type Q map[string]any func newJob(rawJSON, dequeuedFrom, inProgQueue []byte) (*Job, error) { var job Job @@ -50,9 +50,9 @@ func (j *Job) serialize() ([]byte, error) { } // setArg sets a single named argument on the job. -func (j *Job) setArg(key string, val interface{}) { +func (j *Job) setArg(key string, val any) { if j.Args == nil { - j.Args = make(map[string]interface{}) + j.Args = make(map[string]any) } j.Args[key] = val } @@ -177,7 +177,7 @@ func missingKeyError(jsonType, key string) error { return fmt.Errorf("looking for a %s in job.Arg[%s] but key wasn't found", jsonType, key) } -func typecastError(jsonType, key string, v interface{}) error { +func typecastError(jsonType, key string, v any) error { actualType := reflect.TypeOf(v) return fmt.Errorf("looking for a %s in job.Arg[%s] but value wasn't right type: %v(%v)", jsonType, key, actualType, v) } diff --git a/job_test.go b/job_test.go index aa6222f2..5584b11f 100644 --- a/job_test.go +++ b/job_test.go @@ -90,7 +90,7 @@ func TestJobArgumentExtraction(t *testing.T) { func TestJobArgumentExtractionBadString(t *testing.T) { var testCases = []struct { key string - val interface{} + val any good bool }{ {"a", 1, false}, @@ -129,7 +129,7 @@ func TestJobArgumentExtractionBadString(t *testing.T) { func TestJobArgumentExtractionBadBool(t *testing.T) { var testCases = []struct { key string - val interface{} + val any good bool }{ {"a", 1, false}, @@ -169,7 +169,7 @@ func TestJobArgumentExtractionBadBool(t *testing.T) { func TestJobArgumentExtractionBadInt(t *testing.T) { var testCases = []struct { key string - val interface{} + val any good bool }{ {"a", "boo", false}, @@ -215,7 +215,7 @@ func TestJobArgumentExtractionBadInt(t *testing.T) { func TestJobArgumentExtractionBadFloat(t *testing.T) { var testCases = []struct { key string - val interface{} + val any good bool }{ {"a", "boo", false}, diff --git a/observer.go b/observer.go index 5c96ac39..9317dd12 100644 --- a/observer.go +++ b/observer.go @@ -50,7 +50,7 @@ type observation struct { // These need to be set when starting a job startedAt int64 - arguments map[string]interface{} + arguments map[string]any // If we're done w/ the job, err will indicate the success/failure of it err error // nil: success. not nil: the error we got when running the job @@ -91,7 +91,7 @@ func (o *observer) drain() { <-o.doneDrainingChan } -func (o *observer) observeStarted(jobName, jobID string, arguments map[string]interface{}) { +func (o *observer) observeStarted(jobName, jobID string, arguments map[string]any) { o.observationsChan <- &observation{ kind: observationKindStarted, jobName: jobName, @@ -212,7 +212,7 @@ func (o *observer) writeStatus(obv *observation) error { } } - args := make([]interface{}, 0, 13) + args := make([]any, 0, 13) args = append(args, key, "job_name", obv.jobName, diff --git a/periodic_enqueuer.go b/periodic_enqueuer.go index ae957e7f..d95507ca 100644 --- a/periodic_enqueuer.go +++ b/periodic_enqueuer.go @@ -2,7 +2,7 @@ package work import ( "fmt" - "math/rand" + "math/rand/v2" "time" "github.com/gomodule/redigo/redis" @@ -15,12 +15,11 @@ const ( ) type periodicEnqueuer struct { - namespace string - pool *redis.Pool - periodicJobs []*periodicJob - scheduledPeriodicJobs []*scheduledPeriodicJob - stopChan chan struct{} - doneStoppingChan chan struct{} + namespace string + pool *redis.Pool + periodicJobs []*periodicJob + stopChan chan struct{} + doneStoppingChan chan struct{} } type periodicJob struct { @@ -29,12 +28,6 @@ type periodicJob struct { schedule cron.Schedule } -type scheduledPeriodicJob struct { - scheduledAt time.Time - scheduledAtEpoch int64 - *periodicJob -} - func newPeriodicEnqueuer(namespace string, pool *redis.Pool, periodicJobs []*periodicJob) *periodicEnqueuer { return &periodicEnqueuer{ namespace: namespace, @@ -56,7 +49,7 @@ func (pe *periodicEnqueuer) stop() { func (pe *periodicEnqueuer) loop() { // Begin reaping periodically - timer := time.NewTimer(periodicEnqueuerSleep + time.Duration(rand.Intn(30))*time.Second) + timer := time.NewTimer(periodicEnqueuerSleep + time.Duration(rand.IntN(30))*time.Second) defer timer.Stop() if pe.shouldEnqueue() { @@ -72,7 +65,7 @@ func (pe *periodicEnqueuer) loop() { pe.doneStoppingChan <- struct{}{} return case <-timer.C: - timer.Reset(periodicEnqueuerSleep + time.Duration(rand.Intn(30))*time.Second) + timer.Reset(periodicEnqueuerSleep + time.Duration(rand.IntN(30))*time.Second) if pe.shouldEnqueue() { err := pe.enqueue() if err != nil { diff --git a/priority_sampler.go b/priority_sampler.go index 57a0f4aa..f94823c0 100644 --- a/priority_sampler.go +++ b/priority_sampler.go @@ -1,7 +1,7 @@ package work import ( - "math/rand" + "math/rand/v2" ) type prioritySampler struct { @@ -57,7 +57,7 @@ func (s *prioritySampler) sample() []sampleItem { // If we find where it fits, sort the item to the next slot towards the front of the slice. for remaining > 1 { // rn from [0 to sumRemaining) - rn := uint(rand.Uint32()) % sumRemaining + rn := rand.UintN(sumRemaining) prevSum := uint(0) for i := lenSamples - 1; i >= lastValidIdx; i-- { diff --git a/priority_sampler_test.go b/priority_sampler_test.go index 2d6ecc36..590bda08 100644 --- a/priority_sampler_test.go +++ b/priority_sampler_test.go @@ -2,14 +2,12 @@ package work import ( "fmt" - "math/rand" "testing" "github.com/stretchr/testify/assert" ) func TestPrioritySampler(t *testing.T) { - rand.Seed(1) ps := prioritySampler{} ps.add(5, "jobs.5", "jobsinprog.5", "jobspaused.5", "jobslock.5", "jobslockinfo.5", "jobsconcurrency.5") @@ -20,8 +18,8 @@ func TestPrioritySampler(t *testing.T) { var c2 = 0 var c1 = 0 var c1end = 0 - var total = 200 - for i := 0; i < total; i++ { + var total = 1000 + for range total { ret := ps.sample() if ret[0].priority == 5 { c5++ @@ -36,15 +34,15 @@ func TestPrioritySampler(t *testing.T) { } // make sure these numbers are roughly correct. note that probability is a thing. - assert.True(t, c5 > (2*c2)) - assert.True(t, float64(c2) > (1.5*float64(c1))) - assert.True(t, c1 >= (total/13), fmt.Sprintf("c1 = %d total = %d total/13=%d", c1, total, total/13)) - assert.True(t, float64(c1end) > (float64(total)*0.50)) + assert.Greater(t, c5, (2 * c2)) + assert.Greater(t, float64(c2), (1.5 * float64(c1))) + assert.GreaterOrEqualf(t, c1, total/13, "c1 = %d total = %d total/13=%d", c1, total, total/13) + assert.Greater(t, float64(c1end), (float64(total) * 0.50)) } func BenchmarkPrioritySampler(b *testing.B) { ps := prioritySampler{} - for i := 0; i < 200; i++ { + for i := range 200 { ps.add(uint(i)+1, "jobs."+fmt.Sprint(i), "jobsinprog."+fmt.Sprint(i), diff --git a/redis.go b/redis.go index 417eb481..d392676c 100644 --- a/redis.go +++ b/redis.go @@ -72,7 +72,7 @@ func redisKeyJobsConcurrency(namespace, jobName string) string { return redisKeyJobs(namespace, jobName) + ":max_concurrency" } -func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) { +func redisKeyUniqueJob(namespace, jobName string, args map[string]any) (string, error) { var buf bytes.Buffer buf.WriteString(redisNamespacePrefix(namespace)) diff --git a/requeuer.go b/requeuer.go index 55fa4513..1724c087 100644 --- a/requeuer.go +++ b/requeuer.go @@ -12,7 +12,7 @@ type requeuer struct { pool *redis.Pool redisRequeueScript *redis.Script - redisRequeueArgs []interface{} + redisRequeueArgs []any stopChan chan struct{} doneStoppingChan chan struct{} @@ -22,7 +22,7 @@ type requeuer struct { } func newRequeuer(namespace string, pool *redis.Pool, requeueKey string, jobNames []string) *requeuer { - args := make([]interface{}, 0, len(jobNames)+2+2) + args := make([]any, 0, len(jobNames)+2+2) args = append(args, requeueKey) // KEY[1] args = append(args, redisKeyDead(namespace)) // KEY[2] for _, jobName := range jobNames { diff --git a/run_test.go b/run_test.go index 5bf0d2c9..9a4218c3 100644 --- a/run_test.go +++ b/run_test.go @@ -45,7 +45,7 @@ func TestRunBasicMiddleware(t *testing.T) { job := &Job{ Name: "foo", - Args: map[string]interface{}{"a": "foo"}, + Args: map[string]any{"a": "foo"}, } v, err := runJob(job, tstCtxType, middleware, jt) diff --git a/webui/handler.go b/webui/handler.go index e5640fef..2f10fc36 100644 --- a/webui/handler.go +++ b/webui/handler.go @@ -28,7 +28,7 @@ func NewHandler(client *work.Client) *http.ServeMux { mux.HandleFunc("POST /retry_dead_job/{died_at}/{job_id}", ctx.retryDeadJob) mux.HandleFunc("POST /delete_all_dead_jobs", ctx.deleteAllDeadJobs) mux.HandleFunc("POST /retry_all_dead_jobs", ctx.retryAllDeadJobs) - mux.HandleFunc("GET /", ctx.indexPage) + mux.HandleFunc("GET /{$}", ctx.indexPage) mux.HandleFunc("GET /work.js", ctx.workJS) return mux diff --git a/webui/handler_test.go b/webui/handler_test.go index da264e03..ab2232fb 100644 --- a/webui/handler_test.go +++ b/webui/handler_test.go @@ -100,13 +100,13 @@ func (s *TestWebUIHandlerSuite) TestQueues() { s.NoError(err) s.Equal(200, resp.StatusCode) - var res []interface{} + var res []any err = json.NewDecoder(resp.Body).Decode(&res) s.NoError(err) s.Equal(3, len(res)) - foomap, ok := res[0].(map[string]interface{}) + foomap, ok := res[0].(map[string]any) s.True(ok) s.Equal("foo", foomap["job_name"]) s.EqualValues(2, foomap["count"]) @@ -135,13 +135,13 @@ func (s *TestWebUIHandlerSuite) TestWorkerPools() { s.NoError(err) s.Equal(200, resp.StatusCode) - var res []interface{} + var res []any err = json.NewDecoder(resp.Body).Decode(&res) s.NoError(err) s.Equal(2, len(res)) - w1stat, ok := res[0].(map[string]interface{}) + w1stat, ok := res[0].(map[string]any) s.True(ok) s.True(w1stat["worker_pool_id"] != "") // NOTE: WorkerPoolStatus is tested elsewhere. @@ -175,7 +175,7 @@ func (s *TestWebUIHandlerSuite) TestBusyWorkers() { s.NoError(err) s.Equal(200, resp.StatusCode) - var res []interface{} + var res []any err = json.NewDecoder(resp.Body).Decode(&res) s.NoError(err) s.Equal(0, len(res)) @@ -199,7 +199,7 @@ func (s *TestWebUIHandlerSuite) TestBusyWorkers() { s.Equal(1, len(res)) if len(res) == 1 { - hash, ok := res[0].(map[string]interface{}) + hash, ok := res[0].(map[string]any) s.True(ok) s.Equal("wat", hash["job_name"]) s.Equal(true, hash["is_busy"]) @@ -475,6 +475,6 @@ func (s *TestWebUIHandlerSuite) TestAssets() { req, err = http.NewRequest(http.MethodGet, s.pathPrefix()+"/work.js", nil) s.NoError(err) - resp, err = s.server.Client().Do(req) + _, err = s.server.Client().Do(req) s.NoError(err) } diff --git a/webui/internal/assets/assets.go b/webui/internal/assets/assets.go index e77ce0d5..90f84a49 100644 --- a/webui/internal/assets/assets.go +++ b/webui/internal/assets/assets.go @@ -1,15 +1,16 @@ -// Code generated for package assets by go-bindata DO NOT EDIT. (@generated) +// Code generated by go-bindata. DO NOT EDIT. // sources: -// build/index.html -// build/work.js +// index.html (217B) +// work.js (428.189kB) + package assets import ( "bytes" "compress/gzip" + "crypto/sha256" "fmt" "io" - "io/ioutil" "os" "path/filepath" "strings" @@ -19,7 +20,7 @@ import ( func bindataRead(data []byte, name string) ([]byte, error) { gz, err := gzip.NewReader(bytes.NewBuffer(data)) if err != nil { - return nil, fmt.Errorf("Read %q: %v", name, err) + return nil, fmt.Errorf("read %q: %w", name, err) } var buf bytes.Buffer @@ -27,7 +28,7 @@ func bindataRead(data []byte, name string) ([]byte, error) { clErr := gz.Close() if err != nil { - return nil, fmt.Errorf("Read %q: %v", name, err) + return nil, fmt.Errorf("read %q: %w", name, err) } if clErr != nil { return nil, err @@ -37,8 +38,9 @@ func bindataRead(data []byte, name string) ([]byte, error) { } type asset struct { - bytes []byte - info os.FileInfo + bytes []byte + info os.FileInfo + digest [sha256.Size]byte } type bindataFileInfo struct { @@ -48,32 +50,21 @@ type bindataFileInfo struct { modTime time.Time } -// Name return file name func (fi bindataFileInfo) Name() string { return fi.name } - -// Size return file size func (fi bindataFileInfo) Size() int64 { return fi.size } - -// Mode return file mode func (fi bindataFileInfo) Mode() os.FileMode { return fi.mode } - -// Mode return file modify time func (fi bindataFileInfo) ModTime() time.Time { return fi.modTime } - -// IsDir return file whether a directory func (fi bindataFileInfo) IsDir() bool { - return fi.mode&os.ModeDir != 0 + return false } - -// Sys return file is sys mode func (fi bindataFileInfo) Sys() interface{} { return nil } @@ -93,8 +84,8 @@ func indexHtml() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "index.html", size: 217, mode: os.FileMode(420), modTime: time.Unix(1728022205, 0)} - a := &asset{bytes: bytes, info: info} + info := bindataFileInfo{name: "index.html", size: 217, mode: os.FileMode(0644), modTime: time.Unix(1760696323, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x4a, 0x4b, 0x3d, 0xeb, 0xee, 0x60, 0x65, 0x88, 0x4f, 0x80, 0xa1, 0xdd, 0xd4, 0x67, 0x7f, 0x9e, 0x5e, 0x20, 0x58, 0x51, 0xea, 0xf, 0x90, 0x8c, 0xb8, 0x22, 0x22, 0x4b, 0xed, 0x82, 0xc, 0xb8}} return a, nil } @@ -113,8 +104,8 @@ func workJs() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "work.js", size: 428189, mode: os.FileMode(420), modTime: time.Unix(1728026848, 0)} - a := &asset{bytes: bytes, info: info} + info := bindataFileInfo{name: "work.js", size: 428189, mode: os.FileMode(0644), modTime: time.Unix(1760696323, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x17, 0x35, 0x1d, 0x42, 0x82, 0x65, 0x7c, 0xbc, 0x83, 0x24, 0x38, 0x2b, 0x66, 0x11, 0x56, 0xaa, 0xd8, 0x6a, 0x33, 0xac, 0x58, 0x1c, 0xf1, 0x42, 0x74, 0x36, 0xb4, 0xba, 0xe5, 0x3b, 0xfd, 0x7e}} return a, nil } @@ -122,8 +113,8 @@ func workJs() (*asset, error) { // It returns an error if the asset could not be found or // could not be loaded. func Asset(name string) ([]byte, error) { - cannonicalName := strings.Replace(name, "\\", "/", -1) - if f, ok := _bindata[cannonicalName]; ok { + canonicalName := strings.Replace(name, "\\", "/", -1) + if f, ok := _bindata[canonicalName]; ok { a, err := f() if err != nil { return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err) @@ -133,6 +124,12 @@ func Asset(name string) ([]byte, error) { return nil, fmt.Errorf("Asset %s not found", name) } +// AssetString returns the asset contents as a string (instead of a []byte). +func AssetString(name string) (string, error) { + data, err := Asset(name) + return string(data), err +} + // MustAsset is like Asset but panics when Asset would return an error. // It simplifies safe initialization of global variables. func MustAsset(name string) []byte { @@ -144,12 +141,18 @@ func MustAsset(name string) []byte { return a } +// MustAssetString is like AssetString but panics when Asset would return an +// error. It simplifies safe initialization of global variables. +func MustAssetString(name string) string { + return string(MustAsset(name)) +} + // AssetInfo loads and returns the asset info for the given name. // It returns an error if the asset could not be found or // could not be loaded. func AssetInfo(name string) (os.FileInfo, error) { - cannonicalName := strings.Replace(name, "\\", "/", -1) - if f, ok := _bindata[cannonicalName]; ok { + canonicalName := strings.Replace(name, "\\", "/", -1) + if f, ok := _bindata[canonicalName]; ok { a, err := f() if err != nil { return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err) @@ -159,6 +162,33 @@ func AssetInfo(name string) (os.FileInfo, error) { return nil, fmt.Errorf("AssetInfo %s not found", name) } +// AssetDigest returns the digest of the file with the given name. It returns an +// error if the asset could not be found or the digest could not be loaded. +func AssetDigest(name string) ([sha256.Size]byte, error) { + canonicalName := strings.Replace(name, "\\", "/", -1) + if f, ok := _bindata[canonicalName]; ok { + a, err := f() + if err != nil { + return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s can't read by error: %v", name, err) + } + return a.digest, nil + } + return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s not found", name) +} + +// Digests returns a map of all known files and their checksums. +func Digests() (map[string][sha256.Size]byte, error) { + mp := make(map[string][sha256.Size]byte, len(_bindata)) + for name := range _bindata { + a, err := _bindata[name]() + if err != nil { + return nil, err + } + mp[name] = a.digest + } + return mp, nil +} + // AssetNames returns the names of the assets. func AssetNames() []string { names := make([]string, 0, len(_bindata)) @@ -174,24 +204,29 @@ var _bindata = map[string]func() (*asset, error){ "work.js": workJs, } +// AssetDebug is true if the assets were built with the debug flag enabled. +const AssetDebug = false + // AssetDir returns the file names below a certain // directory embedded in the file by go-bindata. // For example if you run go-bindata on data/... and data contains the // following hierarchy: -// data/ -// foo.txt -// img/ -// a.png -// b.png -// then AssetDir("data") would return []string{"foo.txt", "img"} -// AssetDir("data/img") would return []string{"a.png", "b.png"} -// AssetDir("foo.txt") and AssetDir("notexist") would return an error +// +// data/ +// foo.txt +// img/ +// a.png +// b.png +// +// then AssetDir("data") would return []string{"foo.txt", "img"}, +// AssetDir("data/img") would return []string{"a.png", "b.png"}, +// AssetDir("foo.txt") and AssetDir("notexist") would return an error, and // AssetDir("") will return []string{"data"}. func AssetDir(name string) ([]string, error) { node := _bintree if len(name) != 0 { - cannonicalName := strings.Replace(name, "\\", "/", -1) - pathList := strings.Split(cannonicalName, "/") + canonicalName := strings.Replace(name, "\\", "/", -1) + pathList := strings.Split(canonicalName, "/") for _, p := range pathList { node = node.Children[p] if node == nil { @@ -215,11 +250,11 @@ type bintree struct { } var _bintree = &bintree{nil, map[string]*bintree{ - "index.html": &bintree{indexHtml, map[string]*bintree{}}, - "work.js": &bintree{workJs, map[string]*bintree{}}, + "index.html": {indexHtml, map[string]*bintree{}}, + "work.js": {workJs, map[string]*bintree{}}, }} -// RestoreAsset restores an asset under the given directory +// RestoreAsset restores an asset under the given directory. func RestoreAsset(dir, name string) error { data, err := Asset(name) if err != nil { @@ -233,18 +268,14 @@ func RestoreAsset(dir, name string) error { if err != nil { return err } - err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode()) + err = os.WriteFile(_filePath(dir, name), data, info.Mode()) if err != nil { return err } - err = os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime()) - if err != nil { - return err - } - return nil + return os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime()) } -// RestoreAssets restores an asset under the given directory recursively +// RestoreAssets restores an asset under the given directory recursively. func RestoreAssets(dir, name string) error { children, err := AssetDir(name) // File @@ -262,6 +293,6 @@ func RestoreAssets(dir, name string) error { } func _filePath(dir, name string) string { - cannonicalName := strings.Replace(name, "\\", "/", -1) - return filepath.Join(append([]string{dir}, strings.Split(cannonicalName, "/")...)...) + canonicalName := strings.Replace(name, "\\", "/", -1) + return filepath.Join(append([]string{dir}, strings.Split(canonicalName, "/")...)...) } diff --git a/webui/webui.go b/webui/webui.go index 9b65c3ea..a43a4cb7 100644 --- a/webui/webui.go +++ b/webui/webui.go @@ -1,14 +1,13 @@ package webui import ( + gocontext "context" "encoding/json" "fmt" "net/http" "strconv" - "sync" "time" - "github.com/braintree/manners" "github.com/gojek/work" "github.com/gojek/work/webui/internal/assets" "github.com/gomodule/redigo/redis" @@ -16,8 +15,7 @@ import ( // Server implements an HTTP server which exposes a JSON API to view and manage gojek/work items. type Server struct { - server *manners.GracefulServer - wg sync.WaitGroup + server *http.Server } type context struct { @@ -28,7 +26,7 @@ type context struct { func NewServer(namespace string, pool *redis.Pool, hostPort string) *Server { client := work.NewClient(namespace, pool) return &Server{ - server: manners.NewWithServer(&http.Server{Addr: hostPort, Handler: NewHandler(client)}), + server: &http.Server{Addr: hostPort, Handler: NewHandler(client)}, } } @@ -42,18 +40,14 @@ func mustAsset(name string) []byte { // Start starts the server listening for requests on the hostPort specified in NewServer. func (w *Server) Start() { - w.wg.Add(1) - go func(w *Server) { + go func() { _ = w.server.ListenAndServe() - - w.wg.Done() - }(w) + }() } // Stop stops the server and blocks until it has finished. func (w *Server) Stop() { - w.server.Close() - w.wg.Wait() + _ = w.server.Shutdown(gocontext.Background()) } func (c *context) ping(rw http.ResponseWriter, _ *http.Request) { @@ -194,7 +188,7 @@ func (c *context) workJS(rw http.ResponseWriter, _ *http.Request) { _, _ = rw.Write(mustAsset("work.js")) } -func render(rw http.ResponseWriter, jsonable interface{}, err error) { +func render(rw http.ResponseWriter, jsonable any, err error) { if err != nil { renderError(rw, err) return diff --git a/webui/webui_test.go b/webui/webui_test.go index 41ddcbce..f67aba27 100644 --- a/webui/webui_test.go +++ b/webui/webui_test.go @@ -99,13 +99,13 @@ func (s *TestWebUIServerSuite) TestQueues() { s.NoError(err) s.Equal(200, resp.StatusCode) - var res []interface{} + var res []any err = json.NewDecoder(resp.Body).Decode(&res) s.NoError(err) s.Equal(3, len(res)) - foomap, ok := res[0].(map[string]interface{}) + foomap, ok := res[0].(map[string]any) s.True(ok) s.Equal("foo", foomap["job_name"]) s.EqualValues(2, foomap["count"]) @@ -134,13 +134,13 @@ func (s *TestWebUIServerSuite) TestWorkerPools() { s.NoError(err) s.Equal(200, resp.StatusCode) - var res []interface{} + var res []any err = json.NewDecoder(resp.Body).Decode(&res) s.NoError(err) s.Equal(2, len(res)) - w1stat, ok := res[0].(map[string]interface{}) + w1stat, ok := res[0].(map[string]any) s.True(ok) s.True(w1stat["worker_pool_id"] != "") // NOTE: WorkerPoolStatus is tested elsewhere. @@ -174,7 +174,7 @@ func (s *TestWebUIServerSuite) TestBusyWorkers() { s.NoError(err) s.Equal(200, resp.StatusCode) - var res []interface{} + var res []any err = json.NewDecoder(resp.Body).Decode(&res) s.NoError(err) s.Equal(0, len(res)) @@ -198,7 +198,7 @@ func (s *TestWebUIServerSuite) TestBusyWorkers() { s.Equal(1, len(res)) if len(res) == 1 { - hash, ok := res[0].(map[string]interface{}) + hash, ok := res[0].(map[string]any) s.True(ok) s.Equal("wat", hash["job_name"]) s.Equal(true, hash["is_busy"]) @@ -474,7 +474,7 @@ func (s *TestWebUIServerSuite) TestAssets() { req, err = http.NewRequest(http.MethodGet, "http://127.0.0.1:6666/work.js", nil) s.NoError(err) - resp, err = http.DefaultClient.Do(req) + _, err = http.DefaultClient.Do(req) s.NoError(err) } diff --git a/worker.go b/worker.go index d7578334..7899c21f 100644 --- a/worker.go +++ b/worker.go @@ -2,7 +2,7 @@ package work import ( "fmt" - "math/rand" + "math/rand/v2" "reflect" "time" @@ -146,7 +146,7 @@ func (w *worker) fetchJob() (*Job, error) { // NOTE: we could optimize this to only resort every second, or something. w.sampler.sample() numKeys := len(w.sampler.samples) * fetchKeysPerJobType - var scriptArgs = make([]interface{}, 0, numKeys+1) + var scriptArgs = make([]any, 0, numKeys+1) for _, s := range w.sampler.samples { scriptArgs = append(scriptArgs, s.redisJobs, s.redisJobsInProg, s.redisJobsPaused, s.redisJobsLock, s.redisJobsLockInfo, s.redisJobsMaxConcurrency) // KEYS[1-6 * N] @@ -280,7 +280,7 @@ func (w *worker) removeJobFromInProgress(job *Job, fate terminateOp) { type terminateOp func(conn redis.Conn) -func terminateOnly(_ redis.Conn) { return } +func terminateOnly(_ redis.Conn) {} func terminateAndRetry(w *worker, jt *jobType, job *Job) terminateOp { rawJSON, err := job.serialize() if err != nil { @@ -323,5 +323,5 @@ func (w *worker) jobFate(jt *jobType, job *Job) terminateOp { // Default algorithm returns an fastly increasing backoff counter which grows in an unbounded fashion func defaultBackoffCalculator(job *Job) int64 { fails := job.Fails - return (fails * fails * fails * fails) + 15 + (rand.Int63n(30) * (fails + 1)) + return (fails * fails * fails * fails) + 15 + (rand.Int64N(30) * (fails + 1)) } diff --git a/worker_pool.go b/worker_pool.go index cd90751e..d527f1f1 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -85,13 +85,13 @@ type middlewareHandler struct { // NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers. // concurrency specifies how many workers to spin up - each worker can process jobs concurrently. -func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, pool *redis.Pool) *WorkerPool { +func NewWorkerPool(ctx any, concurrency uint, namespace string, pool *redis.Pool) *WorkerPool { return NewWorkerPoolWithOptions(ctx, concurrency, namespace, pool, WorkerPoolOptions{}) } // NewWorkerPoolWithOptions creates a new worker pool as per the NewWorkerPool function, but permits you to specify // additional options such as sleep backoffs. -func NewWorkerPoolWithOptions(ctx interface{}, concurrency uint, namespace string, pool *redis.Pool, workerPoolOpts WorkerPoolOptions) *WorkerPool { +func NewWorkerPoolWithOptions(ctx any, concurrency uint, namespace string, pool *redis.Pool, workerPoolOpts WorkerPoolOptions) *WorkerPool { if pool == nil { panic("NewWorkerPool needs a non-nil *redis.Pool") } @@ -119,7 +119,7 @@ func NewWorkerPoolWithOptions(ctx interface{}, concurrency uint, namespace strin // Middleware appends the specified function to the middleware chain. The fn can take one of these forms: // (*ContextType).func(*Job, NextMiddlewareFunc) error, (ContextType matches the type of ctx specified when creating a pool) // func(*Job, NextMiddlewareFunc) error, for the generic middleware format. -func (wp *WorkerPool) Middleware(fn interface{}) *WorkerPool { +func (wp *WorkerPool) Middleware(fn any) *WorkerPool { vfn := reflect.ValueOf(fn) validateMiddlewareType(wp.contextType, vfn) @@ -145,13 +145,13 @@ func (wp *WorkerPool) Middleware(fn interface{}) *WorkerPool { // fn can take one of these forms: // (*ContextType).func(*Job) error, (ContextType matches the type of ctx specified when creating a pool) // func(*Job) error, for the generic handler format. -func (wp *WorkerPool) Job(name string, fn interface{}) *WorkerPool { +func (wp *WorkerPool) Job(name string, fn any) *WorkerPool { return wp.JobWithOptions(name, JobOptions{}, fn) } // JobWithOptions adds a handler for 'name' jobs as per the Job function, but permits you specify additional options // such as a job's priority, retry count, and whether to send dead jobs to the dead job queue or trash them. -func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool { +func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn any) *WorkerPool { jobOpts = applyDefaultsAndValidate(jobOpts) vfn := reflect.ValueOf(fn) @@ -299,7 +299,7 @@ func (wp *WorkerPool) writeKnownJobsToRedis() { conn := wp.pool.Get() defer conn.Close() key := redisKeyKnownJobs(wp.namespace) - jobNames := make([]interface{}, 0, len(wp.jobTypes)+1) + jobNames := make([]any, 0, len(wp.jobTypes)+1) jobNames = append(jobNames, key) for k := range wp.jobTypes { jobNames = append(jobNames, k) @@ -395,22 +395,20 @@ func isValidHandlerType(ctxType reflect.Type, vfn reflect.Value) bool { } outType := fnType.Out(0) - var e *error - if outType != reflect.TypeOf(e).Elem() { + if outType != reflect.TypeFor[error]() { return false } - var j *Job if numIn == 1 { - if fnType.In(0) != reflect.TypeOf(j) { + if fnType.In(0) != reflect.TypeFor[*Job]() { return false } } else if numIn == 2 { - if fnType.In(0) != reflect.PtrTo(ctxType) { + if fnType.In(0) != reflect.PointerTo(ctxType) { return false } - if fnType.In(1) != reflect.TypeOf(j) { + if fnType.In(1) != reflect.TypeFor[*Job]() { return false } } else { @@ -435,29 +433,26 @@ func isValidMiddlewareType(ctxType reflect.Type, vfn reflect.Value) bool { } outType := fnType.Out(0) - var e *error - if outType != reflect.TypeOf(e).Elem() { + if outType != reflect.TypeFor[error]() { return false } - var j *Job - var nfn NextMiddlewareFunc if numIn == 2 { - if fnType.In(0) != reflect.TypeOf(j) { + if fnType.In(0) != reflect.TypeFor[*Job]() { return false } - if fnType.In(1) != reflect.TypeOf(nfn) { + if fnType.In(1) != reflect.TypeFor[NextMiddlewareFunc]() { return false } } else if numIn == 3 { - if fnType.In(0) != reflect.PtrTo(ctxType) { + if fnType.In(0) != reflect.PointerTo(ctxType) { return false } - if fnType.In(1) != reflect.TypeOf(j) { + if fnType.In(1) != reflect.TypeFor[*Job]() { return false } - if fnType.In(2) != reflect.TypeOf(nfn) { + if fnType.In(2) != reflect.TypeFor[NextMiddlewareFunc]() { return false } } else { diff --git a/worker_pool_test.go b/worker_pool_test.go index de1c7bbc..ea1f98ba 100644 --- a/worker_pool_test.go +++ b/worker_pool_test.go @@ -12,7 +12,6 @@ import ( ) type tstCtx struct { - a int bytes.Buffer } @@ -20,11 +19,11 @@ func (c *tstCtx) record(s string) { _, _ = c.WriteString(s) } -var tstCtxType = reflect.TypeOf(tstCtx{}) +var tstCtxType = reflect.TypeFor[tstCtx]() func TestWorkerPoolHandlerValidations(t *testing.T) { var cases = []struct { - fn interface{} + fn any good bool }{ {func(j *Job) error { return nil }, true}, @@ -48,7 +47,7 @@ func TestWorkerPoolHandlerValidations(t *testing.T) { func TestWorkerPoolMiddlewareValidations(t *testing.T) { var cases = []struct { - fn interface{} + fn any good bool }{ {func(j *Job, n NextMiddlewareFunc) error { return nil }, true}, @@ -135,7 +134,7 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) { wp.Start() // enqueue some jobs enqueuer := NewEnqueuer(ns, pool) - for i := 0; i < numJobs; i++ { + for range numJobs { _, err := enqueuer.Enqueue(job1, Q{"sleep": sleepTime}) assert.Nil(t, err) } @@ -177,7 +176,7 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { wp.Start() // enqueue some jobs enqueuer := NewEnqueuer(ns, pool) - for i := 0; i < numJobs; i++ { + for range numJobs { _, err := enqueuer.Enqueue(job1, Q{"sleep": sleepTime}) assert.Nil(t, err) } diff --git a/worker_test.go b/worker_test.go index b20ee396..9ec79a8a 100644 --- a/worker_test.go +++ b/worker_test.go @@ -9,7 +9,7 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/gomodule/redigo/redis" - "github.com/rafaeljusto/redigomock" + "github.com/rafaeljusto/redigomock/v3" "github.com/stretchr/testify/assert" ) @@ -335,7 +335,7 @@ func TestWorkersPaused(t *testing.T) { w.start() // make sure the jobs stay in the still in the run queue and not moved to in progress - for i := 0; i < 2; i++ { + for range 2 { time.Sleep(10 * time.Millisecond) assert.EqualValues(t, 1, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, "1", job1))) @@ -587,7 +587,7 @@ func jobOnZset(pool *redis.Pool, key string) (int64, *Job) { panic("ZRANGE error: " + err.Error()) } - vv := v.([]interface{}) + vv := v.([]any) job, err := newJob(vv[0].([]byte), nil, nil) if err != nil {