Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
run: go mod download

- name: "Run tests"
run: go test -v ./...
run: go test -count 10 -v ./...

- name: "Build"
run: go build -o ./bin/krun .
Expand Down
82 changes: 48 additions & 34 deletions krun.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package krun

import (
"context"
"errors"
"sync"
"time"
)

// ErrPoolClosed it is closed (hahah)
var ErrPoolClosed = errors.New("pool's closed")

type Result struct {
Data interface{}
Error error
Expand All @@ -16,13 +20,17 @@ type Krun interface {
Run(ctx context.Context, f Job) <-chan *Result
Wait(ctx context.Context)
Size() int
Close() error
}

type krun struct {
n int
waitSleep time.Duration
workers chan *worker
mu sync.RWMutex
poolSize int
closed bool

workers chan *worker
mu sync.RWMutex

wg sync.WaitGroup
}
type worker struct {
job Job
Expand All @@ -36,9 +44,12 @@ type Config struct {

func New(cfg *Config) Krun {
k := &krun{
n: cfg.Size,
workers: make(chan *worker, cfg.Size),
waitSleep: cfg.WaitSleep,
poolSize: cfg.Size,
closed: false,

workers: make(chan *worker, cfg.Size),
wg: sync.WaitGroup{},
mu: sync.RWMutex{},
}

for i := 0; i < cfg.Size; i++ {
Expand All @@ -50,17 +61,18 @@ func New(cfg *Config) Krun {

func (k *krun) Size() int {
k.mu.RLock()
s := k.n
s := k.poolSize
k.mu.RUnlock()
return s
}

func (k *krun) Run(ctx context.Context, f Job) <-chan *Result {
// get worker from the channel
w := k.pop()
k.wg.Add(1)

// assign Job to the worker and Run it
cr := make(chan *Result)
cr := make(chan *Result, 1)
w.job = f
w.result = cr
go k.work(ctx, w)
Expand All @@ -70,39 +82,49 @@ func (k *krun) Run(ctx context.Context, f Job) <-chan *Result {
}

func (k *krun) Wait(ctx context.Context) {
k.mu.RLock()
n := k.n
k.mu.RUnlock()
done := make(chan struct{})

go func() {
k.wg.Wait()
close(done)
}()

if k.len() == n {
select {
case <-ctx.Done():
return
case <-done:
return
}
}

for {
select {
case <-ctx.Done():
return
case <-time.After(k.waitSleep):
// "wait" until all workers are back
if k.len() < n {
continue
}

return
}
func (k *krun) Close() error {
k.mu.Lock()
if k.closed {
k.mu.Unlock()
return ErrPoolClosed
}
k.closed = true
k.mu.Unlock()

// Wait for all work to complete
k.wg.Wait()

// Close worker channel
close(k.workers)

return nil
}

func (k *krun) work(ctx context.Context, w *worker) {
// run the job
d, err := w.job(ctx)

// send Result into the caller channel
// this will block until is read
w.result <- &Result{d, err}

// return worker to Krun
k.push(w)
k.wg.Done()
}
func (k *krun) push(w *worker) {
k.workers <- w
Expand All @@ -111,11 +133,3 @@ func (k *krun) push(w *worker) {
func (k *krun) pop() *worker {
return <-k.workers
}

func (k *krun) len() int {
k.mu.RLock()
l := len(k.workers)
k.mu.RUnlock()

return l
}
Loading
Loading