From d24c73119f04aac7eee0b3d7b547935dd9002869 Mon Sep 17 00:00:00 2001 From: Patrick Kaeding Date: Thu, 28 Apr 2016 12:43:06 -0700 Subject: [PATCH 1/3] added functions to get number of pending async jobs --- tunny.go | 24 ++++++++++++++++++++---- tunny_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/tunny.go b/tunny.go index f43f7bc..394c501 100644 --- a/tunny.go +++ b/tunny.go @@ -102,10 +102,11 @@ You may open and close a pool as many times as you wish, calling close is a bloc guarantees all goroutines are stopped. */ type WorkPool struct { - workers []*workerWrapper - selects []reflect.SelectCase - statusMutex sync.RWMutex - running uint32 + workers []*workerWrapper + selects []reflect.SelectCase + statusMutex sync.RWMutex + running uint32 + pendingAsyncJobs int32 } func (pool *WorkPool) isRunning() bool { @@ -284,7 +285,9 @@ func (pool *WorkPool) SendWorkTimedAsync( jobData interface{}, after func(interface{}, error), ) { + atomic.AddInt32(&pool.pendingAsyncJobs, 1) go func() { + defer atomic.AddInt32(&pool.pendingAsyncJobs, -1) result, err := pool.SendWorkTimed(milliTimeout, jobData) if after != nil { after(result, err) @@ -320,10 +323,23 @@ result to a receiving closure. You may set the closure to nil if no further acti are required. */ func (pool *WorkPool) SendWorkAsync(jobData interface{}, after func(interface{}, error)) { + atomic.AddInt32(&pool.pendingAsyncJobs, 1) go func() { + defer atomic.AddInt32(&pool.pendingAsyncJobs, -1) result, err := pool.SendWork(jobData) if after != nil { after(result, err) } }() } + +/* +NumPendingAsyncJobs - Get the current count of async jobs either in flight, or waiting for a worker +*/ +func (pool *WorkPool) NumPendingAsyncJobs() int32 { + return atomic.LoadInt32(&pool.pendingAsyncJobs) +} + +func (pool *WorkPool) NumWorkers() int { + return len(pool.workers) +} diff --git a/tunny_test.go b/tunny_test.go index 026628b..8406376 100644 --- a/tunny_test.go +++ b/tunny_test.go @@ -25,6 +25,7 @@ package tunny import ( "sync" "testing" + "time" ) /*-------------------------------------------------------------------------------------------------- @@ -233,5 +234,53 @@ func TestDummyExtIntWorker(t *testing.T) { } } +func TestNumWorkers(t *testing.T) { + numWorkers := 10 + pool, err := CreatePoolGeneric(numWorkers).Open() + if err != nil { + t.Errorf("Failed to create pool: %v", err) + return + } + defer pool.Close() + actual := pool.NumWorkers() + if actual != numWorkers { + t.Errorf("Expected to get %d workers, but got %d", numWorkers, actual) + } +} + +var waitHalfSecond = func() { + time.Sleep(500 * time.Millisecond) +} + +func TestNumPendingReportsAllWorkersWithNoWork(t *testing.T) { + numWorkers := 10 + pool, err := CreatePoolGeneric(numWorkers).Open() + if err != nil { + t.Errorf("Failed to create pool: %v", err) + return + } + defer pool.Close() + actual := pool.NumPendingAsyncJobs() + if actual != 0 { + t.Errorf("Expected to get 0 pending jobs when pool is quiet, but got %d", numWorkers, actual) + } +} + +func TestNumPendingReportsNotAllWorkersWhenSomeBusy(t *testing.T) { + numWorkers := 10 + pool, err := CreatePoolGeneric(numWorkers).Open() + if err != nil { + t.Errorf("Failed to create pool: %v", err) + return + } + defer pool.Close() + pool.SendWorkAsync(waitHalfSecond, nil) + actual := pool.NumPendingAsyncJobs() + expected := int32(1) + if actual != expected { + t.Errorf("Expected to get %d pending jobs when pool has work, but got %d", expected, actual) + } +} + /*-------------------------------------------------------------------------------------------------- */ From 61f22cd3ff59aa351650c51f179080f1178354e5 Mon Sep 17 00:00:00 2001 From: Patrick Kaeding Date: Thu, 28 Apr 2016 13:29:33 -0700 Subject: [PATCH 2/3] publish metrics to expvar --- tunny.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tunny.go b/tunny.go index 394c501..70cc032 100644 --- a/tunny.go +++ b/tunny.go @@ -25,7 +25,9 @@ package tunny import ( "errors" + "expvar" "reflect" + "strconv" "sync" "sync/atomic" "time" @@ -340,6 +342,30 @@ func (pool *WorkPool) NumPendingAsyncJobs() int32 { return atomic.LoadInt32(&pool.pendingAsyncJobs) } +/* +NumWorkers - Number of workers in the pool +*/ func (pool *WorkPool) NumWorkers() int { return len(pool.workers) } + +type liveVarAccessor func() string + +func (a liveVarAccessor) String() string { + return a() +} + +/* +Publishes the NumWorkers and NumPendingAsyncJobs to expvars +*/ +func (pool *WorkPool) PublishExpvarMetrics(poolName string) { + ret := expvar.NewMap(poolName) + asyncJobsFn := func() string { + return strconv.FormatInt(int64(pool.NumPendingAsyncJobs()), 10) + } + numWorkersFn := func() string { + return strconv.FormatInt(int64(pool.NumWorkers()), 10) + } + ret.Set("pendingAsyncJobs", liveVarAccessor(asyncJobsFn)) + ret.Set("numWorkers", liveVarAccessor(numWorkersFn)) +} From fbd2b8fdd077b846e1078073f791ca540febbd1b Mon Sep 17 00:00:00 2001 From: jeffail Date: Tue, 14 Jun 2016 13:37:04 +0100 Subject: [PATCH 3/3] Minor lint and vet fixes --- tunny.go | 2 +- tunny_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tunny.go b/tunny.go index 70cc032..9f00599 100644 --- a/tunny.go +++ b/tunny.go @@ -356,7 +356,7 @@ func (a liveVarAccessor) String() string { } /* -Publishes the NumWorkers and NumPendingAsyncJobs to expvars +PublishExpvarMetrics - Publishes the NumWorkers and NumPendingAsyncJobs to expvars */ func (pool *WorkPool) PublishExpvarMetrics(poolName string) { ret := expvar.NewMap(poolName) diff --git a/tunny_test.go b/tunny_test.go index 8406376..f241cbe 100644 --- a/tunny_test.go +++ b/tunny_test.go @@ -262,7 +262,7 @@ func TestNumPendingReportsAllWorkersWithNoWork(t *testing.T) { defer pool.Close() actual := pool.NumPendingAsyncJobs() if actual != 0 { - t.Errorf("Expected to get 0 pending jobs when pool is quiet, but got %d", numWorkers, actual) + t.Errorf("Expected to get 0 pending jobs when pool is quiet, but got %d", actual) } }