Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug : panic: send on closed channel #3

Merged
merged 1 commit into from
Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions workpool/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type TaskHandler func() error
// WorkPool serves incoming connections via a pool of workers
type WorkPool struct {
closed int32
isQueTask bool // Mark whether queue retrieval is task.标记是否队列取出任务
errChan chan error // error chan
timeout time.Duration // max timeout
wg sync.WaitGroup
Expand Down
14 changes: 14 additions & 0 deletions workpool/workpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package workpool

import (
"context"
"runtime"
"sync/atomic"
"time"

Expand Down Expand Up @@ -55,6 +56,7 @@ func (p *WorkPool) DoWait(task TaskHandler) { // 添加到工作池,并等待
// Wait Waiting for the worker thread to finish executing
func (p *WorkPool) Wait() error { // 等待工作线程执行结束
p.waitingQueue.Wait() // 等待队列结束
p.waitQueTask() // wait que down
close(p.task)
p.wg.Wait() // 等待结束
select {
Expand Down Expand Up @@ -85,14 +87,26 @@ func (p *WorkPool) IsClosed() bool { // 是否已经关闭
func (p *WorkPool) startQueue() {
for {
fn := p.waitingQueue.Pop().(TaskHandler)
p.isQueTask = true
if p.IsClosed() { // closed
p.waitingQueue.Close()
p.isQueTask = false
break
}

if fn != nil {
p.task <- fn
}
p.isQueTask = false
}
}

func (p *WorkPool) waitQueTask() {
for {
runtime.Gosched() // 出让时间片
if !p.isQueTask {
break
}
}
}

Expand Down