From 68ca549f56f9f7b6ab07baa92c74f5911b124940 Mon Sep 17 00:00:00 2001 From: Pantelis Sampaziotis Date: Mon, 21 Mar 2022 12:01:15 +0100 Subject: [PATCH] apply jittered duration everywhere --- queue.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/queue.go b/queue.go index abd6c20..e01883c 100644 --- a/queue.go +++ b/queue.go @@ -178,9 +178,8 @@ func (queue *redisQueue) consume() { case queue.errChan <- &ConsumeError{RedisErr: err, Count: errorCount}: default: } - - time.Sleep(jitteredDuration(queue.pollDuration)) } + time.Sleep(jitteredDuration(queue.pollDuration)) } } @@ -199,8 +198,6 @@ func (queue *redisQueue) consumeBatch() error { batchSize := queue.prefetchLimit - unackedCount if batchSize <= 0 { - // already at prefetch limit, wait for consumers to finish - time.Sleep(queue.pollDuration) // sleep before retry return nil } @@ -213,8 +210,6 @@ func (queue *redisQueue) consumeBatch() error { payload, err := queue.redisClient.RPopLPush(queue.readyKey, queue.unackedKey) if err == ErrorNotFound { - // ready list currently empty, wait for new deliveries - time.Sleep(queue.pollDuration) return nil } @@ -540,6 +535,6 @@ func (queue *redisQueue) ensureConsuming() error { // jitteredDuration calculates and returns a value that is +/-10% the input duration func jitteredDuration(duration time.Duration) time.Duration { - factor := 0.9 + randSrc.Float64() * 0.2 // a jitter factor between 0.9 and 1.1 (+-10%) + factor := 0.9 + randSrc.Float64()*0.2 // a jitter factor between 0.9 and 1.1 (+-10%) return time.Duration(float64(duration) * factor) }