Add separate goroutine that closes the output chan

This allows iterating over the output channel without having to start
another Goroutine outside of the worker pool. This also removes the need
for calling Wait().
This commit is contained in:
Alexander Neumann 2016-02-05 22:23:34 +01:00
parent ee422110c8
commit 4cb4a3ac7f
1 changed files with 29 additions and 16 deletions

View File

@ -1,10 +1,5 @@
package worker package worker
import (
"fmt"
"sync"
)
// Job is one unit of work. It is given to a Func, and the returned result and // Job is one unit of work. It is given to a Func, and the returned result and
// error are stored in Result and Error. // error are stored in Result and Error.
type Job struct { type Job struct {
@ -20,9 +15,12 @@ type Func func(job Job, done <-chan struct{}) (result interface{}, err error)
type Pool struct { type Pool struct {
f Func f Func
done chan struct{} done chan struct{}
wg *sync.WaitGroup
jobCh <-chan Job jobCh <-chan Job
resCh chan<- Job resCh chan<- Job
numWorkers int
workersExit chan struct{}
allWorkersDone chan struct{}
} }
// New returns a new worker pool with n goroutines, each running the function // New returns a new worker pool with n goroutines, each running the function
@ -31,22 +29,39 @@ func New(n int, f Func, jobChan <-chan Job, resultChan chan<- Job) *Pool {
p := &Pool{ p := &Pool{
f: f, f: f,
done: make(chan struct{}), done: make(chan struct{}),
wg: &sync.WaitGroup{}, workersExit: make(chan struct{}),
allWorkersDone: make(chan struct{}),
numWorkers: n,
jobCh: jobChan, jobCh: jobChan,
resCh: resultChan, resCh: resultChan,
} }
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
p.wg.Add(1)
go p.runWorker(i) go p.runWorker(i)
} }
go p.waitForExit()
return p return p
} }
// waitForExit receives from p.workersExit until all worker functions have
// exited, then closes the result channel.
func (p *Pool) waitForExit() {
n := p.numWorkers
for n > 0 {
<-p.workersExit
n--
}
close(p.allWorkersDone)
close(p.resCh)
}
// runWorker runs a worker function. // runWorker runs a worker function.
func (p *Pool) runWorker(numWorker int) { func (p *Pool) runWorker(numWorker int) {
defer p.wg.Done() defer func() {
p.workersExit <- struct{}{}
}()
var ( var (
// enable the input channel when starting up a new goroutine // enable the input channel when starting up a new goroutine
@ -65,7 +80,6 @@ func (p *Pool) runWorker(numWorker int) {
case job, ok = <-inCh: case job, ok = <-inCh:
if !ok { if !ok {
fmt.Printf("in channel closed, worker exiting\n")
return return
} }
@ -88,6 +102,5 @@ func (p *Pool) Cancel() {
// Wait waits for all worker goroutines to terminate, afterwards the output // Wait waits for all worker goroutines to terminate, afterwards the output
// channel is closed. // channel is closed.
func (p *Pool) Wait() { func (p *Pool) Wait() {
p.wg.Wait() <-p.allWorkersDone
close(p.resCh)
} }