Skip to content

Commit

Permalink
Merge branch 'launchdarkly-pk/pool-metrics'
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Jun 14, 2016
2 parents 863db27 + fbd2b8f commit 18dedac
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 4 deletions.
50 changes: 46 additions & 4 deletions tunny.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ package tunny

import (
"errors"
"expvar"
"reflect"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -102,10 +104,11 @@ You may open and close a pool as many times as you wish, calling close is a bloc
guarantees all goroutines are stopped.
*/
type WorkPool struct {
workers []*workerWrapper
selects []reflect.SelectCase
statusMutex sync.RWMutex
running uint32
workers []*workerWrapper
selects []reflect.SelectCase
statusMutex sync.RWMutex
running uint32
pendingAsyncJobs int32
}

func (pool *WorkPool) isRunning() bool {
Expand Down Expand Up @@ -284,7 +287,9 @@ func (pool *WorkPool) SendWorkTimedAsync(
jobData interface{},
after func(interface{}, error),
) {
atomic.AddInt32(&pool.pendingAsyncJobs, 1)
go func() {
defer atomic.AddInt32(&pool.pendingAsyncJobs, -1)
result, err := pool.SendWorkTimed(milliTimeout, jobData)
if after != nil {
after(result, err)
Expand Down Expand Up @@ -320,10 +325,47 @@ result to a receiving closure. You may set the closure to nil if no further acti
are required.
*/
func (pool *WorkPool) SendWorkAsync(jobData interface{}, after func(interface{}, error)) {
atomic.AddInt32(&pool.pendingAsyncJobs, 1)
go func() {
defer atomic.AddInt32(&pool.pendingAsyncJobs, -1)
result, err := pool.SendWork(jobData)
if after != nil {
after(result, err)
}
}()
}

/*
NumPendingAsyncJobs - Get the current count of async jobs either in flight, or waiting for a worker
*/
func (pool *WorkPool) NumPendingAsyncJobs() int32 {
return atomic.LoadInt32(&pool.pendingAsyncJobs)
}

/*
NumWorkers - Number of workers in the pool
*/
func (pool *WorkPool) NumWorkers() int {
return len(pool.workers)
}

type liveVarAccessor func() string

func (a liveVarAccessor) String() string {
return a()
}

/*
PublishExpvarMetrics - Publishes the NumWorkers and NumPendingAsyncJobs to expvars
*/
func (pool *WorkPool) PublishExpvarMetrics(poolName string) {
ret := expvar.NewMap(poolName)
asyncJobsFn := func() string {
return strconv.FormatInt(int64(pool.NumPendingAsyncJobs()), 10)
}
numWorkersFn := func() string {
return strconv.FormatInt(int64(pool.NumWorkers()), 10)
}
ret.Set("pendingAsyncJobs", liveVarAccessor(asyncJobsFn))
ret.Set("numWorkers", liveVarAccessor(numWorkersFn))
}
49 changes: 49 additions & 0 deletions tunny_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package tunny
import (
"sync"
"testing"
"time"
)

/*--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -233,5 +234,53 @@ func TestDummyExtIntWorker(t *testing.T) {
}
}

func TestNumWorkers(t *testing.T) {
numWorkers := 10
pool, err := CreatePoolGeneric(numWorkers).Open()
if err != nil {
t.Errorf("Failed to create pool: %v", err)
return
}
defer pool.Close()
actual := pool.NumWorkers()
if actual != numWorkers {
t.Errorf("Expected to get %d workers, but got %d", numWorkers, actual)
}
}

var waitHalfSecond = func() {
time.Sleep(500 * time.Millisecond)
}

func TestNumPendingReportsAllWorkersWithNoWork(t *testing.T) {
numWorkers := 10
pool, err := CreatePoolGeneric(numWorkers).Open()
if err != nil {
t.Errorf("Failed to create pool: %v", err)
return
}
defer pool.Close()
actual := pool.NumPendingAsyncJobs()
if actual != 0 {
t.Errorf("Expected to get 0 pending jobs when pool is quiet, but got %d", actual)
}
}

func TestNumPendingReportsNotAllWorkersWhenSomeBusy(t *testing.T) {
numWorkers := 10
pool, err := CreatePoolGeneric(numWorkers).Open()
if err != nil {
t.Errorf("Failed to create pool: %v", err)
return
}
defer pool.Close()
pool.SendWorkAsync(waitHalfSecond, nil)
actual := pool.NumPendingAsyncJobs()
expected := int32(1)
if actual != expected {
t.Errorf("Expected to get %d pending jobs when pool has work, but got %d", expected, actual)
}
}

/*--------------------------------------------------------------------------------------------------
*/

0 comments on commit 18dedac

Please sign in to comment.