Skip to content

Commit

Permalink
readme
Browse files Browse the repository at this point in the history
  • Loading branch information
谢小军 authored and 谢小军 committed Sep 15, 2019
1 parent f44adb4 commit fa12ea9
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 54 deletions.
104 changes: 102 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,102 @@
# gowp
golang worker pool ,线程池 , 工作池
## golang worker pool ,线程池 , 工作池

- 并发限制goroutine池。
- 限制任务执行的并发性,而不是排队的任务数。
- 无论排队多少任务,都不会阻止提交任务。
- 通过队列支持

- golang 工作池公共库

### 支持最大任务数, 放到工作池里面 并等待全部完成
```
wp := workerpool.New(10) //设置最大线程数
for i := 0; i < 20; i++ { //开启20个请求
ii := i
wp.Do(func() error {
for j := 0; j < 10; j++ { //每次打印0-10的值
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Second)
}
//time.Sleep(1 * time.Second)
return nil
})
}
wp.Wait()
fmt.Println("down")
```

### 支持错误返回
```
wp := workerpool.New(10) //设置最大线程数
for i := 0; i < 20; i++ { //开启20个请求
ii := i
wp.Do(func() error {
for j := 0; j < 10; j++ { //每次打印0-10的值
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
if ii == 1 {
return errors.Cause(errors.New("my test err")) //有err 立即返回
}
time.Sleep(1 * time.Second)
}
return nil
})
}
err := wp.Wait()
if err != nil {
fmt.Println(err)
}
fmt.Println("down")
```

### 支持判断是否完成 (非阻塞)

```
wp := workerpool.New(5) //设置最大线程数
for i := 0; i < 10; i++ { //开启20个请求
// ii := i
wp.Do(func() error {
for j := 0; j < 5; j++ { //每次打印0-10的值
//fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Second)
}
return nil
})
fmt.Println(wp.IsDone())
}
wp.Wait()
fmt.Println(wp.IsDone())
fmt.Println("down")
```

### 支持同步等待结果

```
wp := workerpool.New(5) //设置最大线程数
for i := 0; i < 10; i++ { //开启20个请求
ii := i
wp.DoWait(func() error {
for j := 0; j < 5; j++ { //每次打印0-10的值
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
// if ii == 1 {
// return errors.New("my test err")
// }
time.Sleep(1 * time.Second)
}
return nil
//time.Sleep(1 * time.Second)
//return errors.New("my test err")
})
}
err := wp.Wait()
if err != nil {
fmt.Println(err)
}
fmt.Println("down")
```
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module gowp

go 1.13

require github.com/xxjwxc/public v0.0.0-20190914140823-729bb11f966c
require github.com/xxjwxc/public v0.0.0-20190915122658-9831b23af2e1
10 changes: 5 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/xxjwxc/public v0.0.0-20190911032541-5d814c6ef57d h1:JGx6eOr2X16pd/h8yZvXTL8qEsu6jPpMNjOOTlvBo1Y=
github.com/xxjwxc/public v0.0.0-20190911032541-5d814c6ef57d/go.mod h1:rgW5aGDrLzmJM8NtrtskcxerGFyvn1OK3+Ra52YeZS0=
github.com/xxjwxc/public v0.0.0-20190914140823-729bb11f966c h1:FQ+LOgzb3F30rn6zZwwZ3fAJGjaCtR7lymFySbjfazk=
github.com/xxjwxc/public v0.0.0-20190914140823-729bb11f966c/go.mod h1:rgW5aGDrLzmJM8NtrtskcxerGFyvn1OK3+Ra52YeZS0=
github.com/xxjwxc/public v0.0.0-20190915113632-d369d5187200 h1:Hi8KKUGp+xh+x6WA/3KNmvvEvjcHFKZ6NJOduAL92k4=
github.com/xxjwxc/public v0.0.0-20190915113632-d369d5187200/go.mod h1:rgW5aGDrLzmJM8NtrtskcxerGFyvn1OK3+Ra52YeZS0=
github.com/xxjwxc/public v0.0.0-20190915122658-9831b23af2e1 h1:nCgYwrp2ZbumdJvgkyJ3+KXBaLWipRSDUujE0GdNKAU=
github.com/xxjwxc/public v0.0.0-20190915122658-9831b23af2e1/go.mod h1:rgW5aGDrLzmJM8NtrtskcxerGFyvn1OK3+Ra52YeZS0=
github.com/xxjwxc/public v1.0.4 h1:C16i87ODAsabmsun+7MEoK7goPjWNGTQOYg+53PzzCw=
github.com/xxjwxc/public v1.0.4/go.mod h1:rgW5aGDrLzmJM8NtrtskcxerGFyvn1OK3+Ra52YeZS0=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/eapache/queue.v1 v1.1.0 h1:EldqoJEGtXYiVCMRo2C9mePO2UUGnYn2+qLmlQSqPdc=
gopkg.in/eapache/queue.v1 v1.1.0/go.mod h1:wNtmx1/O7kZSR9zNT1TTOJ7GLpm3Vn7srzlfylFbQwU=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
Expand Down
12 changes: 6 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
)

func main() {
wp := workerpool.New(5) //设置最大线程数
for i := 0; i < 10; i++ { //开启20个请求
// ii := i
wp := workerpool.New(5) //设置最大线程数
for i := 0; i < 100; i++ { //开启20个请求
ii := i
wp.Do(func() error {
for j := 0; j < 5; j++ { //每次打印0-10的值
//fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Microsecond)
for j := 0; j < 50; j++ { //每次打印0-10的值
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Second)
}
return nil
})
Expand Down
13 changes: 8 additions & 5 deletions workerpool/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package workerpool
import (
"sync"
"time"

"github.com/xxjwxc/public/myqueue"
)

// TaskHandler process .定义函数回调体
Expand All @@ -17,9 +19,10 @@ type WorkerPool struct {
//sync.Mutex
//maxWorkersCount int //最大的工作协程数
//start sync.Once
closed int32
errChan chan error //错误chan
timeout time.Duration //最大超时时间
wg sync.WaitGroup
task chan TaskHandler
closed int32
errChan chan error //错误chan
timeout time.Duration //最大超时时间
wg sync.WaitGroup
task chan TaskHandler
waitingQueue *myqueue.MyQueue
}
21 changes: 21 additions & 0 deletions workerpool/err/err_2019-09-15_20.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
=========================2019-09-15 20:47:01 =========================
my test err
gowp/workerpool.TestWorkerPoolError.func1
/Users/xxj/work/workspace/github/xxjwxc/gowp/workerpool/workerpool_test.go:37
gowp/workerpool.(*WorkerPool).loop.func1
/Users/xxj/work/workspace/github/xxjwxc/gowp/workerpool/workerpool.go:133
runtime.goexit
/usr/local/go/src/runtime/asm_amd64.s:1357
goroutine 51 [running]:
runtime/debug.Stack(0x0, 0x0, 0x0)
/usr/local/go/src/runtime/debug/stack.go:24 +0xa1
github.com/xxjwxc/public/mylog.SaveError(0xc0001883c0, 0x133, 0x11b92cb, 0x3)
/Users/xxj/work/path/pkg/mod/github.com/xxjwxc/[email protected]/mylog/mylog.go:100 +0x498
github.com/xxjwxc/public/mylog.Error(0x11d7480, 0xc000172020)
/Users/xxj/work/path/pkg/mod/github.com/xxjwxc/[email protected]/mylog/mylog.go:62 +0x3d5
gowp/workerpool.(*WorkerPool).loop.func1(0xc0000da780)
/Users/xxj/work/workspace/github/xxjwxc/gowp/workerpool/workerpool.go:139 +0x34f
created by gowp/workerpool.(*WorkerPool).loop
/Users/xxj/work/workspace/github/xxjwxc/gowp/workerpool/workerpool.go:108 +0xa1

=========================end=========================
90 changes: 55 additions & 35 deletions workerpool/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync/atomic"
"time"

"github.com/xxjwxc/public/myqueue"

"github.com/xxjwxc/public/mylog"
)

Expand All @@ -16,12 +18,12 @@ func New(max int) *WorkerPool {
}

p := &WorkerPool{
task: make(chan TaskHandler, 2*max),
errChan: make(chan error, 1),
task: make(chan TaskHandler, 2*max),
errChan: make(chan error, 1),
waitingQueue: myqueue.New(),
}

go p.loop(max)

return p
}

Expand All @@ -35,7 +37,8 @@ func (p *WorkerPool) Do(fn TaskHandler) {
if p.IsClosed() { // 已关闭
return
}
p.task <- fn
p.waitingQueue.Push(fn)
//p.task <- fn
}

//DoWait 添加到工作池,并等待执行完成之后再返回
Expand All @@ -45,14 +48,60 @@ func (p *WorkerPool) DoWait(task TaskHandler) {
}

doneChan := make(chan struct{})
p.task <- func() error {
p.waitingQueue.Push(TaskHandler(func() error {
defer close(doneChan)
return task()
}
}))
<-doneChan
}

//Wait 等待工作线程执行结束
func (p *WorkerPool) Wait() error {
p.waitingQueue.Wait() //等待队列结束
close(p.task)
p.wg.Wait() //等待结束
select {
case err := <-p.errChan:
return err
default:
return nil
}
}

//IsDone 判断是否完成 (非阻塞)
func (p *WorkerPool) IsDone() bool {
if p == nil || p.task == nil {
return true
}

return len(p.task) == 0
}

//IsClosed 是否已经关闭
func (p *WorkerPool) IsClosed() bool {
if atomic.LoadInt32(&p.closed) == 1 { // 已关闭
return true
}
return false
}

func (p *WorkerPool) startQueue() {
for {
fn := p.waitingQueue.Pop().(TaskHandler)
if p.IsClosed() { // 已关闭
p.waitingQueue.Close()
break
}

if fn != nil {
p.task <- fn
}
}
}

func (p *WorkerPool) loop(maxWorkersCount int) {
go p.startQueue() //启动队列

p.wg.Add(maxWorkersCount) // 最大的工作协程数
// 启动max个worker
for i := 0; i < maxWorkersCount; i++ {
Expand Down Expand Up @@ -96,32 +145,3 @@ func (p *WorkerPool) loop(maxWorkersCount int) {
}()
}
}

//Wait 等待工作线程执行结束
func (p *WorkerPool) Wait() error {
close(p.task)
p.wg.Wait() //等待结束
select {
case err := <-p.errChan:
return err
default:
return nil
}
}

//IsDone 判断是否完成 (非阻塞)
func (p *WorkerPool) IsDone() bool {
if p == nil || p.task == nil {
return true
}

return len(p.task) == 0
}

//IsClosed 是否已经关闭
func (p *WorkerPool) IsClosed() bool {
if atomic.LoadInt32(&p.closed) == 1 { // 已关闭
return true
}
return false
}

0 comments on commit fa12ea9

Please sign in to comment.