Skip to content

Commit

Permalink
Merge remote-tracking branch 'kisielk/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
rcrowley committed Oct 5, 2014
2 parents d4f1d62 + a1ae257 commit a1f30dd
Showing 1 changed file with 69 additions and 35 deletions.
104 changes: 69 additions & 35 deletions sample.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package metrics

import (
"container/heap"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -41,7 +40,7 @@ type ExpDecaySample struct {
mutex sync.Mutex
reservoirSize int
t0, t1 time.Time
values expDecaySampleHeap
values *expDecaySampleHeap
}

// NewExpDecaySample constructs a new exponentially-decaying sample with the
Expand All @@ -54,7 +53,7 @@ func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
alpha: alpha,
reservoirSize: reservoirSize,
t0: time.Now(),
values: make(expDecaySampleHeap, 0, reservoirSize),
values: newExpDecaySampleHeap(reservoirSize),
}
s.t1 = time.Now().Add(rescaleThreshold)
return s
Expand All @@ -67,7 +66,7 @@ func (s *ExpDecaySample) Clear() {
s.count = 0
s.t0 = time.Now()
s.t1 = s.t0.Add(rescaleThreshold)
s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
s.values = newExpDecaySampleHeap(s.reservoirSize)
}

// Count returns the number of samples recorded, which may exceed the
Expand Down Expand Up @@ -110,15 +109,16 @@ func (s *ExpDecaySample) Percentiles(ps []float64) []float64 {
func (s *ExpDecaySample) Size() int {
s.mutex.Lock()
defer s.mutex.Unlock()
return len(s.values)
return s.values.Size()
}

// Snapshot returns a read-only copy of the sample.
func (s *ExpDecaySample) Snapshot() Sample {
s.mutex.Lock()
defer s.mutex.Unlock()
values := make([]int64, len(s.values))
for i, v := range s.values {
vals := s.values.Values()
values := make([]int64, len(vals))
for i, v := range vals {
values[i] = v.v
}
return &SampleSnapshot{
Expand Down Expand Up @@ -146,8 +146,9 @@ func (s *ExpDecaySample) Update(v int64) {
func (s *ExpDecaySample) Values() []int64 {
s.mutex.Lock()
defer s.mutex.Unlock()
values := make([]int64, len(s.values))
for i, v := range s.values {
vals := s.values.Values()
values := make([]int64, len(vals))
for i, v := range vals {
values[i] = v.v
}
return values
Expand All @@ -164,22 +165,22 @@ func (s *ExpDecaySample) update(t time.Time, v int64) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.count++
if len(s.values) == s.reservoirSize {
heap.Pop(&s.values)
if s.values.Size() == s.reservoirSize {
s.values.Pop()
}
heap.Push(&s.values, expDecaySample{
s.values.Push(expDecaySample{
k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
v: v,
})
if t.After(s.t1) {
values := s.values
values := s.values.Values()
t0 := s.t0
s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
s.values = newExpDecaySampleHeap(s.reservoirSize)
s.t0 = t
s.t1 = s.t0.Add(rescaleThreshold)
for _, v := range values {
v.k = v.k * math.Exp(-s.alpha*float64(s.t0.Sub(t0)))
heap.Push(&s.values, v)
s.values.Push(v)
}
}
}
Expand Down Expand Up @@ -529,36 +530,69 @@ type expDecaySample struct {
v int64
}

func newExpDecaySampleHeap(reservoirSize int) *expDecaySampleHeap {
return &expDecaySampleHeap{make([]expDecaySample, 0, reservoirSize)}
}

// expDecaySampleHeap is a min-heap of expDecaySamples.
type expDecaySampleHeap []expDecaySample
// The internal implementation is copied from the standard library's container/heap
type expDecaySampleHeap struct {
s []expDecaySample
}

func (h *expDecaySampleHeap) Push(s expDecaySample) {
n := len(h.s)
h.s = h.s[0 : n+1]
h.s[n] = s
h.up(n)
}

func (q expDecaySampleHeap) Len() int {
return len(q)
func (h *expDecaySampleHeap) Pop() expDecaySample {
n := len(h.s) - 1
h.s[0], h.s[n] = h.s[n], h.s[0]
h.down(0, n)

n = len(h.s)
s := h.s[n-1]
h.s = h.s[0 : n-1]
return s
}

func (q expDecaySampleHeap) Less(i, j int) bool {
return q[i].k < q[j].k
func (h *expDecaySampleHeap) Size() int {
return len(h.s)
}

func (q *expDecaySampleHeap) Pop() interface{} {
q_ := *q
n := len(q_)
i := q_[n-1]
q_ = q_[0 : n-1]
*q = q_
return i
func (h *expDecaySampleHeap) Values() []expDecaySample {
return h.s
}

func (q *expDecaySampleHeap) Push(x interface{}) {
q_ := *q
n := len(q_)
q_ = q_[0 : n+1]
q_[n] = x.(expDecaySample)
*q = q_
func (h *expDecaySampleHeap) up(j int) {
for {
i := (j - 1) / 2 // parent
if i == j || !(h.s[j].k < h.s[i].k) {
break
}
h.s[i], h.s[j] = h.s[j], h.s[i]
j = i
}
}

func (q expDecaySampleHeap) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
func (h *expDecaySampleHeap) down(i, n int) {
for {
j1 := 2*i + 1
if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
break
}
j := j1 // left child
if j2 := j1 + 1; j2 < n && !(h.s[j1].k < h.s[j2].k) {
j = j2 // = 2*i + 2 // right child
}
if !(h.s[j].k < h.s[i].k) {
break
}
h.s[i], h.s[j] = h.s[j], h.s[i]
i = j
}
}

type int64Slice []int64
Expand Down

0 comments on commit a1f30dd

Please sign in to comment.