go-worker provides a simple way to manage and execute prioritized tasks concurrently, backed by a TaskManager with a worker pool and a priority queue.
Stop()removed. UseStopGraceful(ctx)orStopNow().- Local result streaming uses
SubscribeResults(buffer);GetResults()is now a compatibility shim and the legacy localStreamResults()is removed (gRPCStreamResultsremains). RegisterTasksnow returns an error.Task.ExecutereplacesFnin examples.NewGRPCServerrequires a handler map.- Rate limiting is deterministic: burst is
min(maxWorkers, maxTasks)andExecuteTaskuses the shared limiter. - gRPC durable tasks use
RegisterDurableTasksand the newDurableTaskmessage. - When a durable backend is configured, use
RegisterDurableTask(s)instead ofRegisterTask(s). DurableBackendnow requiresExtend(lease renewal support for custom backends).
- Task prioritization: tasks are scheduled by priority.
- Concurrent execution: tasks run in a worker pool with strict rate limiting.
- Middleware: wrap the
TaskManagerfor logging/metrics, etc. - Results: fan-out subscriptions via
SubscribeResults. - Cancellation: cancel tasks before or during execution.
- Retries: exponential backoff with capped delays.
- Durability: optional Redis-backed durable task queue (at-least-once, lease-based).
flowchart LR
Client[Client code] -->|register tasks| TaskManager
TaskManager --> Queue[Priority Queue]
Queue -->|dispatch| Worker1[Worker]
Queue -->|dispatch| WorkerN[Worker]
Worker1 --> Results[Result Broadcaster]
WorkerN --> Results
go-worker exposes its functionality over gRPC through the WorkerService.
The service allows clients to register tasks, stream results, cancel running
tasks and query their status.
The server registers handlers keyed by name. Each handler consists of a Make function that constructs the expected payload type, and a Fn function that executes the task logic using the unpacked payload.
Clients send a Task message containing a name and a serialized payload using google.protobuf.Any. The server automatically unpacks the Any payload into the correct type based on the registered handler and passes it to the corresponding function. For durable tasks, use RegisterDurableTasks with the DurableTask message (the payload is still an Any).
handlers := map[string]worker.HandlerSpec{
"create_user": {
Make: func() protoreflect.ProtoMessage { return &workerpb.CreateUserPayload{} },
Fn: func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error) {
p := payload.(*workerpb.CreateUserPayload)
return &workerpb.CreateUserResponse{UserId: "1234"}, nil
},
},
}
srv := worker.NewGRPCServer(tm, handlers)For production, configure TLS credentials and interceptors (logging/auth) on the gRPC server; see __examples/grpc for a complete setup.
For a Redis-backed durable gRPC example, see __examples/grpc_durable.
Queue selection for gRPC tasks is done via metadata (worker.MetadataQueueKey, worker.MetadataWeightKey):
queue: named queue (empty meansdefault)weight: integer weight (as string)
Security defaults to follow in production:
- Use TLS (prefer mTLS) for gRPC; the durable gRPC example uses insecure credentials for local demos only.
- Scrub payloads and auth metadata from logs; log task IDs or correlation IDs instead of PII.
- Implement auth via
WithGRPCAuthand redact/validate tokens inside interceptors.
For compile-time payload checks in handlers, use the typed registry. It removes the need for payload type assertions inside your handler functions.
registry := worker.NewTypedHandlerRegistry()
_ = worker.AddTypedHandler(registry, "create_user", worker.TypedHandlerSpec[*workerpb.CreateUserPayload]{
Make: func() *workerpb.CreateUserPayload { return &workerpb.CreateUserPayload{} },
Fn: func(ctx context.Context, payload *workerpb.CreateUserPayload) (any, error) {
return &workerpb.CreateUserResponse{UserId: "1234"}, nil
},
})
srv := worker.NewGRPCServer(tm, registry.Handlers())Use RegisterDurableTasks for persisted tasks (payload is still Any). Results stream is shared with non-durable tasks.
payload, _ := anypb.New(&workerpb.SendEmailPayload{
To: "ops@example.com",
Subject: "Hello durable gRPC",
Body: "Persisted task",
})
resp, err := client.RegisterDurableTasks(ctx, &workerpb.RegisterDurableTasksRequest{
Tasks: []*workerpb.DurableTask{
{
Name: "send_email",
Payload: payload,
IdempotencyKey: "durable:send_email:ops@example.com",
},
},
})
if err != nil {
log.Fatal(err)
}You can enforce authentication/authorization at the gRPC boundary with WithGRPCAuth.
Return a gRPC status error to control the response code (e.g., Unauthenticated or PermissionDenied).
auth := func(ctx context.Context, method string, _ any) error {
md, _ := metadata.FromIncomingContext(ctx)
values := md.Get("authorization")
if len(values) == 0 {
return status.Error(codes.Unauthenticated, "missing token")
}
token := strings.TrimSpace(strings.TrimPrefix(values[0], "Bearer "))
if token != "expected-token" {
return status.Error(codes.Unauthenticated, "missing or invalid token")
}
return nil
}
srv := worker.NewGRPCServer(tm, handlers, worker.WithGRPCAuth(auth))Note on deadlines: When the client uses a stream context with a deadline, exceeding the deadline will terminate the stream but does not cancel the tasks running on the server. To properly handle cancellation, use separate contexts for task execution or cancel tasks explicitly.
tm := worker.NewTaskManagerWithDefaults(context.Background())
handlers := map[string]worker.HandlerSpec{
"create_user": {
Make: func() protoreflect.ProtoMessage { return &workerpb.CreateUserPayload{} },
Fn: func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error) {
p := payload.(*workerpb.CreateUserPayload)
return &workerpb.CreateUserResponse{UserId: "1234"}, nil
},
},
}
srv := worker.NewGRPCServer(tm, handlers)
gs := grpc.NewServer()
workerpb.RegisterWorkerServiceServer(gs, srv)
// listen and serve ...
client := workerpb.NewWorkerServiceClient(conn)
// register a task with payload
payload, err := anypb.New(&workerpb.CreateUserPayload{
Username: "newuser",
Email: "newuser@example.com",
})
if err != nil {
log.Fatal(err)
}
_, _ = client.RegisterTasks(ctx, &workerpb.RegisterTasksRequest{
Tasks: []*workerpb.Task{
{
Name: "create_user",
Payload: payload,
CorrelationId: uuid.NewString(),
IdempotencyKey: "create_user:newuser@example.com",
Metadata: map[string]string{"source": "api_example", "role": "admin"},
},
},
})
// cancel by id
_, _ = client.CancelTask(ctx, &workerpb.CancelTaskRequest{Id: "<task-id>"})
// get task information
res, _ := client.GetTask(ctx, &workerpb.GetTaskRequest{Id: "<task-id>"})
fmt.Println(res.Status)tm := worker.NewTaskManager(context.Background(), 2, 10, 5, 30*time.Second, time.Second, 3)
task := &worker.Task{
ID: uuid.New(),
Priority: 1,
Ctx: context.Background(),
Execute: func(ctx context.Context, _ ...any) (any, error) { return "hello", nil },
}
if err := tm.RegisterTask(context.Background(), task); err != nil {
log.Fatal(err)
}
results, cancel := tm.SubscribeResults(1)
res := <-results
cancel()
fmt.Println(res.Result)By default, full subscriber buffers drop new results. You can change the policy:
tm.SetResultsDropPolicy(worker.DropOldest)GetResults() remains as a compatibility shim and returns a channel with a default buffer.
Prefer SubscribeResults(buffer) so you can control buffering and explicitly unsubscribe.
Create a new TaskManager by calling the NewTaskManager() function with the following parameters:
ctxis the base context for the task manager (used for shutdown and derived task contexts)maxWorkersis the number of workers to start. If <= 0, it will default to the number of available CPUsmaxTasksis the maximum number of queued tasks, defaults to 10tasksPerSecondis the rate limit of tasks that can be executed per second. If <= 0, rate limiting is disabled (the limiter uses a burst size ofmin(maxWorkers, maxTasks)for deterministic throttling)timeoutis the default timeout for tasks, defaults to 5 minutesretryDelayis the default delay between retries, defaults to 1 secondmaxRetriesis the default maximum number of retries, defaults to 3 (0 disables retries)
tm := worker.NewTaskManager(context.Background(), 4, 10, 5, 30*time.Second, 1*time.Second, 3)Durable tasks use a separate DurableTask type and a handler registry keyed by name.
The default encoding is protobuf via ProtoDurableCodec. When a durable backend is enabled,
RegisterTask/RegisterTasks are disabled in favor of RegisterDurableTask(s).
See __examples/durable_redis for a runnable example.
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
backend, err := worker.NewRedisDurableBackend(client)
if err != nil {
log.Fatal(err)
}
handlers := map[string]worker.DurableHandlerSpec{
"send_email": {
Make: func() proto.Message { return &workerpb.SendEmailRequest{} },
Fn: func(ctx context.Context, payload proto.Message) (any, error) {
req := payload.(*workerpb.SendEmailRequest)
// process request
return &workerpb.SendEmailResponse{MessageId: "msg-1"}, nil
},
},
}
tm := worker.NewTaskManagerWithOptions(
context.Background(),
worker.WithDurableBackend(backend),
worker.WithDurableHandlers(handlers),
)
err = tm.RegisterDurableTask(context.Background(), worker.DurableTask{
Handler: "send_email",
Message: &workerpb.SendEmailRequest{To: "ops@example.com"},
Retries: 5,
Queue: "email",
Weight: 2,
})
if err != nil {
log.Fatal(err)
}Or use the typed durable registry for compile-time checks:
durableRegistry := worker.NewTypedDurableRegistry()
_ = worker.AddTypedDurableHandler(durableRegistry, "send_email", worker.TypedDurableHandlerSpec[*workerpb.SendEmailRequest]{
Make: func() *workerpb.SendEmailRequest { return &workerpb.SendEmailRequest{} },
Fn: func(ctx context.Context, payload *workerpb.SendEmailRequest) (any, error) {
// process request
return &workerpb.SendEmailResponse{MessageId: "msg-1"}, nil
},
})
tm := worker.NewTaskManagerWithOptions(
context.Background(),
worker.WithDurableBackend(backend),
worker.WithDurableHandlers(durableRegistry.Handlers()),
)Defaults: lease is 30s, poll interval is 200ms, Redis dequeue batch is 50, and lease renewal is disabled (configurable via options).
Queue weights for durable tasks can be configured with WithRedisDurableQueueWeights, and the default queue via WithRedisDurableDefaultQueue.
Operational notes (durable Redis):
- Key hashing: Redis Lua scripts touch multiple keys; for clustered Redis, all keys must share the same hash slot. The backend auto-wraps the prefix in
{}to enforce this (e.g.,{go-worker}:ready). - DLQ: Failed tasks are pushed to a dead-letter list (
{prefix}:dead). - DLQ replay: Use the
workerctl durable dlq replaycommand or the__examples/durable_dlq_replayutility (dry-run by default; use--apply/-applyto replay). - Multi-node workers: Multiple workers can safely dequeue from the same backend. Lease timeouts handle worker crashes, but tune
WithDurableLeasefor your workload. - Lease renewal: enable
WithDurableLeaseRenewalIntervalfor long-running tasks to extend leases while a task executes. - Visibility: Ready/processing queues live in per-queue sorted sets:
{prefix}:ready:<queue>and{prefix}:processing:<queue>. Known queues are tracked in{prefix}:queues. - Inspect utility:
workerctl durable inspect(or__examples/durable_queue_inspect) prints ready/processing/dead counts; use--show-ids --queue=<name>(or-show-ids -queue=<name>) to display IDs.
Build the CLI:
go build -o workerctl ./cmd/workerctlInspect queues:
./workerctl durable inspect --redis-addr localhost:6380 --redis-password supersecret --redis-prefix go-worker --queue default --show-ids --peek 10List queues:
./workerctl durable queues --with-countsRequeue specific tasks by ID:
./workerctl durable retry --id 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12 --applyRequeue tasks from a source set (DLQ/ready/processing):
./workerctl durable retry --source dlq --limit 100 --apply
./workerctl durable retry --source ready --from-queue default --limit 50 --applyRequeue a queue directly (shortcut):
./workerctl durable requeue --queue default --limit 50 --applyFetch a task by ID:
./workerctl durable get --id 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12Delete a task (and optionally its hash):
./workerctl durable delete --id 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12 --apply
./workerctl durable delete --id 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12 --delete-hash --applyShow stats in JSON:
./workerctl durable stats --json
./workerctl durable stats --watch 2sPause/resume durable dequeue:
./workerctl durable pause --apply
./workerctl durable resume --apply
./workerctl durable pausedPurge queues (use with care):
./workerctl durable purge --ready --processing --queue default --applyDump task metadata (JSON lines, no payloads):
./workerctl durable dump --queue default --ready --limit 100 > dump.jsonlExport/import queue snapshots (JSONL):
./workerctl durable snapshot export --out snapshot.jsonl --ready --processing --dlq
./workerctl durable snapshot import --in snapshot.jsonl --applyReplay DLQ items (dry-run by default):
./workerctl durable dlq replay --batch 100 --applyUse --tls (and --tls-insecure if needed) for secure Redis connections.
Generate shell completion:
./workerctl completion zsh > "${fpath[1]}/_workerctl"Durable processing is at-least-once. When multiple nodes consume from the same Redis backend:
- Lease sizing: set
WithDurableLeaselonger than your worst-case task duration (plus buffer). If a task exceeds its lease, it can be requeued and run again on another node. - Lease renewal (optional): set
WithDurableLeaseRenewalInterval(less than the lease duration) to extend leases while tasks run. - Idempotency: enforce idempotency at the task level (idempotency key + handler-side dedupe) because duplicates are possible on retries and lease expiry.
- Throughput control: worker count and polling interval are per node. If you need a global rate limit across nodes, enforce it externally or in the handler.
- Clock skew: Redis uses server time for scores; keep node clocks in sync to avoid uneven dequeue/lease timing.
- Isolation: use distinct prefixes per environment/region/tenant to avoid cross-talk.
Checklist:
- Set
WithDurableLeaseabove p99 task duration (plus buffer). - Enable
WithDurableLeaseRenewalIntervalfor tasks that can exceed the lease duration. - Keep task handlers idempotent; always use idempotency keys for external side effects.
- Tune
WithDurablePollIntervalbased on desired responsiveness vs. Redis load. - Scale
WithMaxWorkersper node based on CPU and downstream throughput.
Example:
go run __examples/durable_queue_inspect/main.go -redis-addr=localhost:6380 -redis-password=supersecret -redis-prefix=go-worker
go run __examples/durable_queue_inspect/main.go -redis-addr=localhost:6380 -redis-password=supersecret -redis-prefix=go-worker -queue=default -show-ids -peek=5Sample output:
queue=default ready=3 processing=1
ready=3 processing=1 dead=0
ready IDs: 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12, 9b18d5f2-3b7f-4d7a-9dd1-1bb1a3a56c55DLQ replay example (dry-run by default):
go run __examples/durable_dlq_replay/main.go -redis-addr=localhost:6380 -redis-password=supersecret -redis-prefix=go-worker -batch=100
go run __examples/durable_dlq_replay/main.go -redis-addr=localhost:6380 -redis-password=supersecret -redis-prefix=go-worker -batch=100 -applyOptional retention can be configured to prevent unbounded task registry growth:
tm.SetRetentionPolicy(worker.RetentionPolicy{
TTL: 24 * time.Hour,
MaxEntries: 100000,
})Retention applies only to terminal tasks (completed/failed/cancelled/etc). Running or queued tasks are never evicted.
Cleanup is best-effort: it runs on task completion and periodically when TTL > 0.
If CleanupInterval is unset, the default interval is clamp(TTL/2, 1s, 1m).
If MaxEntries is lower than the number of active tasks, the registry may exceed the limit until tasks finish.
Task lifecycle hooks can be configured for structured logging or tracing:
tm.SetHooks(worker.TaskHooks{
OnQueued: func(task *worker.Task) {
// log enqueue
},
OnStart: func(task *worker.Task) {
// log start
},
OnFinish: func(task *worker.Task, status worker.TaskStatus, _ any, err error) {
// log completion
_ = err
_ = status
},
})Tracing hooks can be configured with a tracer implementation:
tm.SetTracer(myTracer)See __examples/tracing for a minimal logger-based tracer.
See __examples/otel_tracing for OpenTelemetry tracing with a stdout exporter.
To export metrics with OpenTelemetry, configure a meter provider and pass it to the task manager:
exporter, err := stdoutmetric.New(stdoutmetric.WithPrettyPrint())
if err != nil {
log.Fatal(err)
}
reader := sdkmetric.NewPeriodicReader(exporter)
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
defer func() {
_ = mp.Shutdown(context.Background())
}()
if err := tm.SetMeterProvider(mp); err != nil {
log.Fatal(err)
}See __examples/otel_metrics for a complete runnable example.
See __examples/otel_metrics_otlp for an OTLP/HTTP exporter example.
Emitted metrics:
tasks_scheduled_totaltasks_runningtasks_completed_totaltasks_failed_totaltasks_cancelled_totaltasks_retried_totalresults_dropped_totalqueue_depthtask_latency_seconds
Register new tasks by calling the RegisterTasks() method of the TaskManager struct and passing in a variadic number of tasks.
id := uuid.New()
task := &worker.Task{
ID: id,
Name: "Some task",
Description: "Here goes the description of the task",
Priority: 10,
Queue: "critical",
Weight: 2,
Ctx: context.Background(),
Execute: func(ctx context.Context, _ ...any) (any, error) {
time.Sleep(time.Second)
return fmt.Sprintf("task %s executed", id), nil
},
Retries: 3,
RetryDelay: 2 * time.Second,
}
task2 := &worker.Task{
ID: uuid.New(),
Priority: 10,
Queue: "default",
Weight: 1,
Ctx: context.Background(),
Execute: func(ctx context.Context, _ ...any) (any, error) { return "Hello, World!", nil },
}
if err := tm.RegisterTasks(context.Background(), task, task2); err != nil {
log.Fatal(err)
}Queues and weights:
Queuegroups tasks for scheduling. Empty meansdefault.Weightis a per-task scheduling hint within a queue (higher weight runs earlier among equal priorities).- Queue weights control inter-queue share via
WithQueueWeights; change the default queue viaWithDefaultQueue.
For gRPC, set metadata["queue"] and metadata["weight"] (string) on Task/DurableTask.
Schedule tasks for later execution with RunAt, RegisterTaskAt, or RegisterTaskAfter.
task, _ := worker.NewTask(context.Background(), func(ctx context.Context, _ ...any) (any, error) {
return "delayed", nil
})
_ = tm.RegisterTaskAt(context.Background(), task, time.Now().Add(30*time.Second))
// or
_ = tm.RegisterTaskAfter(context.Background(), task, 30*time.Second)Durable tasks can also be delayed by setting RunAt before RegisterDurableTask.
Use StopGraceful to stop accepting new tasks and wait for completion, or StopNow to cancel tasks immediately.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = tm.StopGraceful(ctx)
// or
// tm.StopNow()Subscribe to results with a dedicated channel per subscriber.
results, cancel := tm.SubscribeResults(10)
ctx, cancelWait := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelWait()
_ = tm.Wait(ctx)
cancel()
for res := range results {
fmt.Println(res)
}You can cancel a Task by calling the CancelTask() method of the TaskManager struct and passing in the task ID as a parameter.
_ = tm.CancelTask(task.ID)You can cancel all tasks by calling the CancelAll() method of the TaskManager struct.
tm.CancelAll()You can apply middleware to the TaskManager by calling the RegisterMiddleware() function and passing in the TaskManager and the middleware functions.
srv := worker.RegisterMiddleware[worker.Service](tm,
func(next worker.Service) worker.Service {
return middleware.NewLoggerMiddleware(next, logger)
},
)package main
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
worker "github.com/hyp3rd/go-worker"
"github.com/hyp3rd/go-worker/middleware"
)
func main() {
tm := worker.NewTaskManager(context.Background(), 4, 10, 5, 3*time.Second, 30*time.Second, 3)
var srv worker.Service = worker.RegisterMiddleware[worker.Service](tm,
func(next worker.Service) worker.Service {
return middleware.NewLoggerMiddleware(next, middleware.DefaultLogger())
},
)
task := &worker.Task{
ID: uuid.New(),
Priority: 1,
Ctx: context.Background(),
Execute: func(ctx context.Context, _ ...any) (any, error) {
return 2 + 5, nil
},
}
_ = srv.RegisterTasks(context.Background(), task)
results, cancel := srv.SubscribeResults(10)
defer cancel()
ctx, cancelWait := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelWait()
_ = srv.Wait(ctx)
for res := range results {
fmt.Println(res)
}
}This project follows Semantic Versioning.
We welcome contributions! Fork the repository, create a feature branch, run the linters and tests, then open a pull request.
To propose new ideas, open an issue using the Feature request template.
Issues labeled good first issue or help wanted are ideal starting points for new contributors.
See CHANGELOG for the history of released versions.
This project is licensed under the MIT License - see the LICENSE file for details.