-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkerPool.go
More file actions
100 lines (75 loc) · 2.36 KB
/
workerPool.go
File metadata and controls
100 lines (75 loc) · 2.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package workerPool
import (
"github.com/emillis/cacheMachine"
"time"
)
//===========[STRUCTS]====================================================================================================
//WorkerPool provides the main public API to worker pools
type WorkerPool[TWork any] struct {
requirements Requirements
//The channel that all workers will get the jobs from
incomingWork chan TWork
//pool of the actual workers
workers cacheMachine.Cache[int, *worker[TWork]]
//This will be passed to each worker to use for work processing
workHandler func(TWork)
}
//------PRIVATE------
//addWorkers add n number of workers to the pool
func (wp *WorkerPool[TWork]) addWorkers(n int, timeout time.Duration) {
for i := 0; i < n; i++ {
w := &worker[TWork]{
workBucket: wp.incomingWork,
workHandler: wp.workHandler,
timeout: timeout,
workerPool: wp,
id: issueNewWorkerId(),
}
wp.workers.Add(w.id, w)
w.spawnGoroutine()
}
}
//This is called for each WorkerPool, and it's responsible for automatic management of worker spawning/removal
func (wp *WorkerPool[TWork]) spawnGoroutine() {
go func() {
for {
if len(wp.incomingWork) <= wp.requirements.MinWorkers {
continue
}
remainingPoolCapacity := wp.requirements.MaxWorkers - wp.workers.Count()
n := wp.requirements.WorkerSpawnMultiplier
if n > remainingPoolCapacity {
n = remainingPoolCapacity
}
wp.addWorkers(n, wp.requirements.Timeout)
time.Sleep(time.Microsecond * 100)
}
}()
}
//------PUBLIC------
//AddWork sends work to workers
func (wp *WorkerPool[TWork]) AddWork(w TWork) {
wp.incomingWork <- w
}
//WorkerCount returns number of active workers in the worker pool
func (wp *WorkerPool[TWork]) WorkerCount() int {
return wp.workers.Count()
}
//===========[FUNCTIONS]================================================================================================
//New creates and returns a new WorkerPool
func New[TWork any](workHandler func(TWork), r *Requirements) *WorkerPool[TWork] {
if r == nil {
r = &defaultRequirements
} else {
makeRequirementsReasonable(r)
}
wp := &WorkerPool[TWork]{
requirements: *r,
incomingWork: make(chan TWork, r.WorkBucketSize),
workers: cacheMachine.New[int, *worker[TWork]](nil),
workHandler: workHandler,
}
wp.addWorkers(wp.requirements.MinWorkers, time.Hour*8760)
wp.spawnGoroutine()
return wp
}