Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [1.22.x, 1.23.x]
go-version: [1.25.x, 1.26.x, stable]
steps:
- name: Install Go@v${{ matrix.go-version }}
uses: actions/setup-go@v4
Expand Down
2 changes: 1 addition & 1 deletion DEVELOPING.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ All NPM commands can be found in `package.json`.
To embed bundled js, do

```
go get -u github.com/go-bindata/go-bindata/...
go install github.com/kevinburke/go-bindata/v4/go-bindata@latest
cd webui/internal/assets
go generate
```
8 changes: 4 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (c *Client) RetryDeadJob(diedAt int64, jobID string) error {

script := redis.NewScript(len(jobNames)+1, redisLuaRequeueSingleDeadCmd)

args := make([]interface{}, 0, len(jobNames)+1+3)
args := make([]any, 0, len(jobNames)+1+3)
args = append(args, redisKeyDead(c.namespace)) // KEY[1]
for _, jobName := range jobNames {
args = append(args, redisKeyJobs(c.namespace, jobName)) // KEY[2, 3, ...]
Expand Down Expand Up @@ -442,7 +442,7 @@ func (c *Client) RetryAllDeadJobs() error {

script := redis.NewScript(len(jobNames)+1, redisLuaRequeueAllDeadCmd)

args := make([]interface{}, 0, len(jobNames)+1+3)
args := make([]any, 0, len(jobNames)+1+3)
args = append(args, redisKeyDead(c.namespace)) // KEY[1]
for _, jobName := range jobNames {
args = append(args, redisKeyJobs(c.namespace, jobName)) // KEY[2, 3, ...]
Expand All @@ -456,7 +456,7 @@ func (c *Client) RetryAllDeadJobs() error {

// Cap iterations for safety (which could reprocess 1k*1k jobs).
// This is conceptually an infinite loop but let's be careful.
for i := 0; i < 1000; i++ {
for range 1000 {
res, err := redis.Int64(script.Do(conn, args...))
if err != nil {
logError("client.retry_all_dead_jobs.do", err)
Expand Down Expand Up @@ -538,7 +538,7 @@ func (c *Client) DeleteRetryJob(retryAt int64, jobID string) error {
func (c *Client) deleteZsetJob(zsetKey string, zscore int64, jobID string) (bool, []byte, error) {
script := redis.NewScript(1, redisLuaDeleteSingleCmd)

args := make([]interface{}, 0, 1+2)
args := make([]any, 0, 1+2)
args = append(args, zsetKey) // KEY[1]
args = append(args, zscore) // ARGV[1]
args = append(args, jobID) // ARGV[2]
Expand Down
12 changes: 6 additions & 6 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ func TestClientScheduledJobs(t *testing.T) {
assert.EqualValues(t, 1425263409, jobs[1].EnqueuedAt)
assert.EqualValues(t, 1425263409, jobs[2].EnqueuedAt)

assert.EqualValues(t, interface{}(1), jobs[0].Args["a"])
assert.EqualValues(t, interface{}(2), jobs[0].Args["b"])
assert.EqualValues(t, any(1), jobs[0].Args["a"])
assert.EqualValues(t, any(2), jobs[0].Args["b"])

assert.EqualValues(t, 0, jobs[0].Fails)
assert.EqualValues(t, 0, jobs[1].Fails)
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestClientRetryJobs(t *testing.T) {
assert.EqualValues(t, 1425263429, jobs[0].FailedAt)
assert.Equal(t, "wat", jobs[0].Name)
assert.EqualValues(t, 1425263409, jobs[0].EnqueuedAt)
assert.EqualValues(t, interface{}(1), jobs[0].Args["a"])
assert.EqualValues(t, any(1), jobs[0].Args["a"])
assert.EqualValues(t, 1, jobs[0].Fails)
assert.EqualValues(t, 1425263429, jobs[0].Job.FailedAt)
assert.Equal(t, "ohno", jobs[0].LastErr)
Expand Down Expand Up @@ -325,7 +325,7 @@ func TestClientDeadJobs(t *testing.T) {
assert.EqualValues(t, 1425263429, jobs[0].FailedAt)
assert.Equal(t, "wat", jobs[0].Name)
assert.EqualValues(t, 1425263409, jobs[0].EnqueuedAt)
assert.EqualValues(t, interface{}(1), jobs[0].Args["a"])
assert.EqualValues(t, any(1), jobs[0].Args["a"])
assert.EqualValues(t, 1, jobs[0].Fails)
assert.EqualValues(t, 1425263429, jobs[0].Job.FailedAt)
assert.Equal(t, "ohno", jobs[0].LastErr)
Expand Down Expand Up @@ -445,7 +445,7 @@ func TestClientRetryDeadJobWithArgs(t *testing.T) {
Name: name,
ID: makeIdentifier(),
EnqueuedAt: encAt,
Args: map[string]interface{}{"a": "wat"},
Args: map[string]any{"a": "wat"},
Fails: 3,
LastErr: "sorry",
FailedAt: failAt,
Expand Down Expand Up @@ -571,7 +571,7 @@ func TestClientRetryAllDeadJobsBig(t *testing.T) {
// Ok, we need to efficiently add 10k jobs to the dead queue.
// I tried using insertDeadJob but it was too slow (increased test time by 1 second)
dead := redisKeyDead(ns)
for i := 0; i < 10000; i++ {
for range 10000 {
job := &Job{
Name: "wat1",
ID: makeIdentifier(),
Expand Down
4 changes: 2 additions & 2 deletions cmd/workenqueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/gomodule/redigo/redis"
)

var redisHostPort = flag.String("redis", ":6379", "redis hostport")
var redisHostPort = flag.String("redis", "redis://:6379", "redis hostport")
var redisNamespace = flag.String("ns", "work", "redis namespace")
var jobName = flag.String("job", "", "job name")
var jobArgs = flag.String("args", "{}", "job arguments")
Expand All @@ -26,7 +26,7 @@ func main() {

pool := newPool(*redisHostPort)

var args map[string]interface{}
var args map[string]any
err := json.Unmarshal([]byte(*jobArgs), &args)
if err != nil {
fmt.Println("invalid args:", err)
Expand Down
8 changes: 4 additions & 4 deletions cmd/workfakedata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ package main
import (
"flag"
"fmt"
"math/rand"
"math/rand/v2"
"time"

"github.com/gojek/work"
"github.com/gomodule/redigo/redis"
)

var redisHostPort = flag.String("redis", ":6379", "redis hostport")
var redisHostPort = flag.String("redis", "redis://:6379", "redis hostport")
var redisNamespace = flag.String("ns", "work", "redis namespace")

func epsilonHandler(job *work.Job) error {
fmt.Println("epsilon")
time.Sleep(time.Second)

if rand.Intn(2) == 0 {
if rand.IntN(2) == 0 {
return fmt.Errorf("random error")
}
return nil
Expand All @@ -42,7 +42,7 @@ func main() {
go func() {
for {
en := work.NewEnqueuer(*redisNamespace, pool)
for i := 0; i < 20; i++ {
for i := range 20 {
en.Enqueue("foobar", work.Q{"i": i})
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/workwebui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

var (
redisHostPort = flag.String("redis", ":6379", "redis hostport")
redisHostPort = flag.String("redis", "redis://:6379", "redis hostport")
redisDatabase = flag.String("database", "0", "redis database")
redisNamespace = flag.String("ns", "work", "redis namespace")
webHostPort = flag.String("listen", ":5040", "hostport to listen for HTTP JSON API")
Expand Down
8 changes: 4 additions & 4 deletions dead_pool_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package work

import (
"fmt"
"math/rand"
"math/rand/v2"
"strings"
"time"

Expand Down Expand Up @@ -60,7 +60,7 @@ func (r *deadPoolReaper) loop() {
return
case <-timer.C:
// Schedule next occurrence periodically with jitter
timer.Reset(r.reapPeriod + time.Duration(rand.Intn(reapJitterSecs))*time.Second)
timer.Reset(r.reapPeriod + time.Duration(rand.IntN(reapJitterSecs))*time.Second)

// Reap
if err := r.reap(); err != nil {
Expand Down Expand Up @@ -111,7 +111,7 @@ func (r *deadPoolReaper) reap() error {
func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) error {
numKeys := len(jobTypes) * 2
redisReapLocksScript := redis.NewScript(numKeys, redisLuaReapStaleLocks)
var scriptArgs = make([]interface{}, 0, numKeys+1) // +1 for argv[1]
var scriptArgs = make([]any, 0, numKeys+1) // +1 for argv[1]

for _, jobType := range jobTypes {
scriptArgs = append(scriptArgs, redisKeyJobsLock(r.namespace, jobType), redisKeyJobsLockInfo(r.namespace, jobType))
Expand All @@ -130,7 +130,7 @@ func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) er
func (r *deadPoolReaper) requeueInProgressJobs(poolID string, jobTypes []string) error {
numKeys := len(jobTypes) * requeueKeysPerJob
redisRequeueScript := redis.NewScript(numKeys, redisLuaReenqueueJob)
var scriptArgs = make([]interface{}, 0, numKeys+1)
var scriptArgs = make([]any, 0, numKeys+1)

for _, jobType := range jobTypes {
// pops from in progress, push into job queue and decrement the queue lock
Expand Down
24 changes: 12 additions & 12 deletions enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewEnqueuerWithOptions(namespace string, pool *redis.Pool, opt EnqueuerOpti

// Enqueue will enqueue the specified job name and arguments. The args param can be nil if no args ar needed.
// Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com"})
func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, error) {
func (e *Enqueuer) Enqueue(jobName string, args map[string]any) (*Job, error) {
job := &Job{
Name: jobName,
ID: makeIdentifier(),
Expand Down Expand Up @@ -81,11 +81,11 @@ func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, e
}

// EnqueueIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds.
func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) {
func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]any) (*ScheduledJob, error) {
return e.EnqueueAt(jobName, epochAfterSeconds(secondsFromNow), args)
}

func (e *Enqueuer) EnqueueAt(jobName string, epochSeconds int64, args map[string]interface{}) (*ScheduledJob, error) {
func (e *Enqueuer) EnqueueAt(jobName string, epochSeconds int64, args map[string]any) (*ScheduledJob, error) {
job := &Job{
Name: jobName,
ID: makeIdentifier(),
Expand Down Expand Up @@ -123,12 +123,12 @@ func (e *Enqueuer) EnqueueAt(jobName string, epochSeconds int64, args map[string
// Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once.
// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs.
// EnqueueUnique returns the job if it was enqueued and nil if it wasn't
func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]interface{}) (*Job, error) {
func (e *Enqueuer) EnqueueUnique(jobName string, args map[string]any) (*Job, error) {
return e.EnqueueUniqueByKey(jobName, args, nil)
}

// EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.
func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) {
func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args map[string]any) (*ScheduledJob, error) {
return e.EnqueueUniqueInByKey(jobName, secondsFromNow, args, nil)
}

Expand All @@ -138,7 +138,7 @@ func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, args ma
// Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once.
// In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs.
// EnqueueUniqueByKey returns the job if it was enqueued and nil if it wasn't
func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (*Job, error) {
func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]any, keyMap map[string]any) (*Job, error) {
enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap)
if err != nil {
return nil, err
Expand All @@ -154,17 +154,17 @@ func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{

// EnqueueUniqueInByKey enqueues a job in the scheduled job queue that is unique on specified key for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.
// Subsequent calls with same key will update arguments
func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error) {
func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, args map[string]any, keyMap map[string]any) (*ScheduledJob, error) {
return e.EnqueueUniqueAtByKey(jobName, epochAfterSeconds(secondsFromNow), args, keyMap)
}

// EnqueueUniqueAt enqueues a unique job at the specified absolute epoch time in seconds. See EnqueueUnique for semantics of unique jobs.
func (e *Enqueuer) EnqueueUniqueAt(jobName string, epochSeconds int64, args map[string]interface{}) (*ScheduledJob, error) {
func (e *Enqueuer) EnqueueUniqueAt(jobName string, epochSeconds int64, args map[string]any) (*ScheduledJob, error) {
return e.EnqueueUniqueAtByKey(jobName, epochSeconds, args, nil)
}

// EnqueueUniqueAtByKey enqueues a job unique on specified key at the specified absolute epoch time in seconds, updating arguments. See EnqueueUnique for semantics of unique jobs.
func (e *Enqueuer) EnqueueUniqueAtByKey(jobName string, epochSeconds int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error) {
func (e *Enqueuer) EnqueueUniqueAtByKey(jobName string, epochSeconds int64, args map[string]any, keyMap map[string]any) (*ScheduledJob, error) {
enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap)
if err != nil {
return nil, err
Expand Down Expand Up @@ -210,7 +210,7 @@ func (e *Enqueuer) addToKnownJobs(conn redis.Conn, jobName string) error {

type enqueueFnType func(*int64) (string, error)

func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{}, keyMap map[string]interface{}) (enqueueFnType, *Job, error) {
func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]any, keyMap map[string]any) (enqueueFnType, *Job, error) {
useDefaultKeys := false
if keyMap == nil {
useDefaultKeys = true
Expand Down Expand Up @@ -244,7 +244,7 @@ func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{},
return "", err
}

scriptArgs := []interface{}{}
scriptArgs := []any{}
script := e.enqueueUniqueScript

scriptArgs = append(scriptArgs, e.queuePrefix+jobName) // KEY[1]
Expand Down Expand Up @@ -286,7 +286,7 @@ func (e *Enqueuer) uniqueJobHelper(jobName string, args map[string]interface{},
return enqueueFn, job, nil
}

func (e *Enqueuer) redisDoHelper(c redis.Conn, cmdName string, args ...interface{}) (reply interface{}, err error) {
func (e *Enqueuer) redisDoHelper(c redis.Conn, cmdName string, args ...any) (reply any, err error) {
if err = c.Send(cmdName, args...); err != nil {
return
}
Expand Down
26 changes: 13 additions & 13 deletions enqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"testing"
"time"

"github.com/rafaeljusto/redigomock"
"github.com/rafaeljusto/redigomock/v3"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -61,7 +61,7 @@ func TestEnqueue(t *testing.T) {
func TestEnqueue_WithMock(t *testing.T) {
ns := "work"
jobName := "test"
jobArgs := map[string]interface{}{"arg": "value"}
jobArgs := map[string]any{"arg": "value"}
var cases = []struct {
name string
enqueuerOption EnqueuerOption
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestEnqueueIn(t *testing.T) {
func TestEnqueueIn_WithMock(t *testing.T) {
ns := "work"
jobName := "test"
jobArgs := map[string]interface{}{"arg": "value"}
jobArgs := map[string]any{"arg": "value"}
secondsFromNow := int64(100)
now := time.Now().Unix()
setNowEpochSecondsMock(now)
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestEnqueueAt(t *testing.T) {
func TestEnqueueAt_WithMock(t *testing.T) {
ns := "work"
jobName := "test"
jobArgs := map[string]interface{}{"arg": "value"}
jobArgs := map[string]any{"arg": "value"}
now := time.Now().Unix()
runAt := now + 100
setNowEpochSecondsMock(now)
Expand Down Expand Up @@ -503,7 +503,7 @@ func TestEnqueueUnique(t *testing.T) {
func TestEnqueueUnique_WithMock(t *testing.T) {
ns := "work"
jobName := "test"
jobArgs := map[string]interface{}{"arg": "value"}
jobArgs := map[string]any{"arg": "value"}

ok := "ok"
dup := "ok"
Expand Down Expand Up @@ -660,7 +660,7 @@ func TestEnqueueUniqueIn(t *testing.T) {
func TestEnqueueUniqueIn_WithMock(t *testing.T) {
ns := "work"
jobName := "test"
jobArgs := map[string]interface{}{"arg": "value"}
jobArgs := map[string]any{"arg": "value"}
secondsFromNow := int64(100)
now := time.Now().Unix()
setNowEpochSecondsMock(now)
Expand Down Expand Up @@ -847,8 +847,8 @@ func TestEnqueueUniqueByKey(t *testing.T) {
func TestEnqueueUniqueByKey_WithMock(t *testing.T) {
ns := "work"
jobName := "test"
jobArgs := map[string]interface{}{"arg": "value"}
jobKeyMap := map[string]interface{}{"key": "value"}
jobArgs := map[string]any{"arg": "value"}
jobKeyMap := map[string]any{"key": "value"}

ok := "ok"
dup := "ok"
Expand Down Expand Up @@ -982,7 +982,7 @@ func TestEnqueueUniqueAt(t *testing.T) {
func TestEnqueueUniqueAt_WithMock(t *testing.T) {
ns := "work"
jobName := "test"
jobArgs := map[string]interface{}{"arg": "value"}
jobArgs := map[string]any{"arg": "value"}

runAt := time.Now().Unix() + 100

Expand Down Expand Up @@ -1105,8 +1105,8 @@ func TestEnqueueUniqueInByKey(t *testing.T) {
func TestEnqueueUniqueInByKey_WithMock(t *testing.T) {
ns := "work"
jobName := "test"
jobArgs := map[string]interface{}{"arg": "value"}
jobKeyMap := map[string]interface{}{"key": "value"}
jobArgs := map[string]any{"arg": "value"}
jobKeyMap := map[string]any{"key": "value"}
secondsFromNow := int64(100)
now := time.Now().Unix()
setNowEpochSecondsMock(now)
Expand Down Expand Up @@ -1240,8 +1240,8 @@ func TestEnqueueUniqueAtByKey(t *testing.T) {
func TestEnqueueUniqueAtByKey_WithMock(t *testing.T) {
ns := "work"
jobName := "test"
jobArgs := map[string]interface{}{"arg": "value"}
jobKeyMap := map[string]interface{}{"key": "value"}
jobArgs := map[string]any{"arg": "value"}
jobKeyMap := map[string]any{"key": "value"}
runAt := time.Now().Unix() + 100

ok := "ok"
Expand Down
Loading
Loading