Skip to content

Commit

Permalink
Added TunnyInterupt call for custom workers
Browse files Browse the repository at this point in the history
TunnyInterupt is an optional func that workers may implement which is
called when a job has timed out. It indicates that there is no longer a
client on the other end of this job, and it can be discarded (return
nil).

The call is made on an outside goroutine and you need to be careful how
this is implemented.
  • Loading branch information
Ashley Jeffs authored and Jeffail committed Mar 23, 2015
1 parent dc72deb commit ee32733
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 36 deletions.
52 changes: 31 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,41 +161,28 @@ func (worker *customWorker) Job(data interface{}) interface{} {
func TestCustomWorkers (t *testing.T) {
outChan := make(chan int, 10)

wg := new(sync.WaitGroup)
wg.Add(10)

workers := make([]tunny.TunnyWorker, 4)
for i, _ := range workers {
workers[i] = &(customWorker{})
}

pool, errPool := tunny.CreateCustomPool(workers).Open()

if errPool != nil {
t.Errorf("Error starting pool: ", errPool)
return
}
pool, _ := tunny.CreateCustomPool(workers).Open()

defer pool.Close()

for i := 0; i < 10; i++ {
go func() {
if value, err := pool.SendWork("hello world"); err != nil {

t.Errorf("Error returned: ", err)

} else {

str, _ := value.(string)
if "custom job done: hello world" != str {
t.Errorf("Unexpected output from custom worker")
}
value, _ := pool.SendWork("hello world")
fmt.Println(value.(string))

}
outChan <- 1
wg.Done()
}()
}

for i := 0; i < 10; i++ {
<-outChan
}
wg.Wait()
}

...
Expand Down Expand Up @@ -231,6 +218,29 @@ func (worker *customWorker) Terminate() {
...
```

##Can a worker detect when a timeout occurs during a job?

Yes, you can optionally implement the following method on your worker:

```go
...

func (worker *interuptableWorker) TunnyInterupt() {

/* This is called from a separate goroutine, so only use thread safe
* methods to communicate to your worker.
* Something like this can be used to indicate midway through a job
* that it should be abandoned, in your Job call you can simply
* return nil.
*/
worker.stopChan<-1

}

...
```

##Can SendWork be called asynchronously?

There are the helper functions SendWorkAsync and SendWorkTimedAsync, that are the same as their respective sync calls with an optional second argument func(interface{}, error), this is the call made when a result is returned and can be nil if there is no need for the closure.
Expand Down
21 changes: 12 additions & 9 deletions tunny.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ type TunnyWorker interface {
}

type TunnyExtendedWorker interface {
Job(interface{}) (interface{})
Ready() bool
Initialize()
Terminate()
}

type TunnyInteruptable interface {
TunnyInterupt()
}

// Default implementation of worker

type tunnyDefaultWorker struct {
Expand Down Expand Up @@ -89,11 +91,11 @@ func (pool *WorkPool) SendWorkTimed(milliTimeout time.Duration, jobData interfac
// Wait for workers, or time out
if chosen, _, ok := reflect.Select(selectCases); ok {
if chosen < ( len(selectCases) - 1 ) {
(*pool.workers[chosen]).jobChan <- jobData
pool.workers[chosen].jobChan <- jobData

// Wait for response, or time out
select {
case data, open := <-(*pool.workers[chosen]).outputChan:
case data, open := <-pool.workers[chosen].outputChan:
if !open {
return nil, errors.New("Worker was closed before reaching a result")
}
Expand All @@ -104,7 +106,8 @@ func (pool *WorkPool) SendWorkTimed(milliTimeout time.Duration, jobData interfac
* waiting process into a new goroutine.
*/
go func() {
<-(*pool.workers[chosen]).outputChan
pool.workers[chosen].Interupt()
<-pool.workers[chosen].outputChan
}()
return nil, errors.New("Request timed out whilst waiting for job to complete")
}
Expand Down Expand Up @@ -151,8 +154,8 @@ func (pool *WorkPool) SendWork(jobData interface{}) (interface{}, error) {
if pool.running {

if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 {
(*pool.workers[chosen]).jobChan <- jobData
result, open := <-(*pool.workers[chosen]).outputChan
pool.workers[chosen].jobChan <- jobData
result, open := <-pool.workers[chosen].outputChan

if !open {
return nil, errors.New("Worker was closed before reaching a result")
Expand Down Expand Up @@ -193,11 +196,11 @@ func (pool *WorkPool) Open() (*WorkPool, error) {
pool.selects = make( []reflect.SelectCase, len(pool.workers) )

for i, workerWrapper := range pool.workers {
(*workerWrapper).Open()
workerWrapper.Open()

pool.selects[i] = reflect.SelectCase {
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf((*workerWrapper).readyChan),
Chan: reflect.ValueOf(workerWrapper.readyChan),
}
}

Expand Down
59 changes: 53 additions & 6 deletions tunny_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestTimeout(t *testing.T) {
routines := runtime.NumGoroutine()

pool, errPool := CreatePool(1, func(object interface{}) interface{} {
time.Sleep(500 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
return nil
}).Open()

Expand All @@ -54,11 +54,11 @@ func TestTimeout(t *testing.T) {
before := time.Now()

go func() {
if _, err := pool.SendWorkTimed(200, nil); err == nil {
if _, err := pool.SendWorkTimed(20, nil); err == nil {
t.Errorf("No timeout triggered thread one")
} else {
taken := ( time.Since(before) / time.Millisecond )
if taken > 210 {
if taken > 21 {
t.Errorf("Time taken at thread one: ", taken, ", with error: ", err)
}
}
Expand All @@ -74,11 +74,11 @@ func TestTimeout(t *testing.T) {
}()

go func() {
if _, err := pool.SendWorkTimed(200, nil); err == nil {
if _, err := pool.SendWorkTimed(20, nil); err == nil {
t.Errorf("No timeout triggered thread two")
} else {
taken := ( time.Since(before) / time.Millisecond )
if taken > 210 {
if taken > 21 {
t.Errorf("Time taken at thread two: ", taken, ", with error: ", err)
}
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func validateReturnInt(t *testing.T, expecting int, object interface{}) {
}

func TestBasic(t *testing.T) {
sizePool, repeats, sleepFor, margin := 16, 2, 250, 100
sizePool, repeats, sleepFor, margin := 16, 2, 50, 5
outChan := make(chan int, sizePool)
routines := runtime.NumGoroutine()

Expand Down Expand Up @@ -525,6 +525,53 @@ func TestAsyncCalls(t *testing.T) {
}
}

type interuptableWorker struct {
stopChan chan int
}

func (worker *interuptableWorker) Job(data interface{}) interface{} {
select {
case <-time.After(100 * time.Millisecond):
return 25
case <- worker.stopChan:
return 50
}
return 25
}

func (worker *interuptableWorker) Ready() bool {
return true
}

func (worker *interuptableWorker) TunnyInterupt() {
worker.stopChan<-1
}

func Test(t *testing.T) {
routines := runtime.NumGoroutine()

workers := make([]TunnyWorker, 1)
workers[0] = &interuptableWorker{ make(chan int, 1) }

pool, poolErr := CreateCustomPool(workers).Open()

if poolErr != nil {
t.Errorf("Error starting pool: ", poolErr)
return
}

res, err := pool.SendWorkTimed(25, 50)
if err == nil || res == 25 {
t.Errorf("Interupt not activated!")
}

pool.Close()

if routines != runtime.NumGoroutine() {
t.Errorf("Excess goroutines: %v", runtime.NumGoroutine() - routines)
}
}

/*
Test template
Expand Down
8 changes: 8 additions & 0 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func (wrapper *workerWrapper) Open() {
// Follow this with Join(), otherwise terminate isn't called on the worker
func (wrapper *workerWrapper) Close() {
close(wrapper.jobChan)

// Breaks the worker out of a Ready() -> false loop
atomic.SwapUint32(&wrapper.poolOpen, uint32(0))
}

Expand All @@ -97,3 +99,9 @@ func (wrapper *workerWrapper) Join() {
extWorker.Terminate()
}
}

func (wrapper *workerWrapper) Interupt() {
if extWorker, ok := wrapper.worker.(TunnyInteruptable); ok {
extWorker.TunnyInterupt()
}
}

0 comments on commit ee32733

Please sign in to comment.