Skip to content

Commit

Permalink
Use a single goroutine for meters.
Browse files Browse the repository at this point in the history
Update meters to create a single goroutine responsible for ticking all
of the meters.
  • Loading branch information
pkwarren committed Mar 30, 2014
1 parent d177436 commit 0c229a5
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 50 deletions.
142 changes: 97 additions & 45 deletions meter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package metrics

import "time"
import (
"sync"
"time"
)

// Meters count events to produce exponentially-weighted moving average rates
// at one-, five-, and fifteen-minutes and a mean rate.
Expand Down Expand Up @@ -28,12 +31,14 @@ func NewMeter() Meter {
if UseNilMetrics {
return NilMeter{}
}
m := &StandardMeter{
make(chan int64),
make(chan *MeterSnapshot),
time.NewTicker(5e9),
m := newStandardMeter()
arbiter.Lock()
defer arbiter.Unlock()
arbiter.meters = append(arbiter.meters, m)
if !arbiter.started {
arbiter.started = true
go arbiter.tick()
}
go m.arbiter()
return m
}

Expand All @@ -48,6 +53,17 @@ func NewRegisteredMeter(name string, r Registry) Meter {
return c
}

type meterArbiter struct {
sync.RWMutex
started bool
meters []*StandardMeter
ticker *time.Ticker
}

var arbiter = meterArbiter{
ticker: time.NewTicker(5e9),
}

// MeterSnapshot is a read-only copy of another Meter.
type MeterSnapshot struct {
count int64
Expand Down Expand Up @@ -105,79 +121,115 @@ func (NilMeter) RateMean() float64 { return 0.0 }
// Snapshot is a no-op.
func (NilMeter) Snapshot() Meter { return NilMeter{} }

// StandardMeter is the standard implementation of a Meter and uses a
// goroutine to synchronize its calculations and a time.Ticker to pass time.
// StandardMeter is the standard implementation of a Meter.
type StandardMeter struct {
in chan int64
out chan *MeterSnapshot
ticker *time.Ticker
lock sync.RWMutex
snapshot *MeterSnapshot
a1, a5, a15 EWMA
startTime time.Time
}

func newStandardMeter() *StandardMeter {
return &StandardMeter{
snapshot: &MeterSnapshot{},
a1: NewEWMA1(),
a5: NewEWMA5(),
a15: NewEWMA15(),
startTime: time.Now(),
}
}

// Count returns the number of events recorded.
func (m *StandardMeter) Count() int64 {
return (<-m.out).count
m.lock.RLock()
count := m.snapshot.count
m.lock.RUnlock()
return count
}

// Mark records the occurance of n events.
func (m *StandardMeter) Mark(n int64) {
m.in <- n
m.lock.Lock()
defer m.lock.Unlock()
m.snapshot.count += n
m.a1.Update(n)
m.a5.Update(n)
m.a15.Update(n)
m.updateSnapshot()
}

// Rate1 returns the one-minute moving average rate of events per second.
func (m *StandardMeter) Rate1() float64 {
return (<-m.out).rate1
m.lock.RLock()
rate1 := m.snapshot.rate1
m.lock.RUnlock()
return rate1
}

// Rate5 returns the five-minute moving average rate of events per second.
func (m *StandardMeter) Rate5() float64 {
return (<-m.out).rate5
m.lock.RLock()
rate5 := m.snapshot.rate5
m.lock.RUnlock()
return rate5
}

// Rate15 returns the fifteen-minute moving average rate of events per second.
func (m *StandardMeter) Rate15() float64 {
return (<-m.out).rate15
m.lock.RLock()
rate15 := m.snapshot.rate15
m.lock.RUnlock()
return rate15
}

// RateMean returns the meter's mean rate of events per second.
func (m *StandardMeter) RateMean() float64 {
return (<-m.out).rateMean
m.lock.RLock()
rateMean := m.snapshot.rateMean
m.lock.RUnlock()
return rateMean
}

// Snapshot returns a read-only copy of the meter.
func (m *StandardMeter) Snapshot() Meter {
snapshot := *<-m.out
m.lock.RLock()
snapshot := *m.snapshot
m.lock.RUnlock()
return &snapshot
}

// arbiter receives inputs and sends outputs. It counts each input and updates
// the various moving averages and the mean rate of events. It sends a copy of
// the meterV as output.
func (m *StandardMeter) arbiter() {
snapshot := &MeterSnapshot{}
a1 := NewEWMA1()
a5 := NewEWMA5()
a15 := NewEWMA15()
t := time.Now()
func (m *StandardMeter) updateSnapshot() {
// should run with write lock held on m.lock
snapshot := m.snapshot
snapshot.rate1 = m.a1.Rate()
snapshot.rate5 = m.a5.Rate()
snapshot.rate15 = m.a15.Rate()
snapshot.rateMean = float64(1e9*snapshot.count) / float64(time.Since(m.startTime))
}

func (m *StandardMeter) tick() {
m.lock.Lock()
defer m.lock.Unlock()
m.a1.Tick()
m.a5.Tick()
m.a15.Tick()
m.updateSnapshot()
}

func (ma *meterArbiter) tickMeters() {
ma.RLock()
defer ma.RUnlock()
for _, meter := range ma.meters {
meter.tick()
}
}

// Ticks meters on the scheduled interval
func (ma *meterArbiter) tick() {
for {
select {
case n := <-m.in:
snapshot.count += n
a1.Update(n)
a5.Update(n)
a15.Update(n)
snapshot.rate1 = a1.Rate()
snapshot.rate5 = a5.Rate()
snapshot.rate15 = a15.Rate()
snapshot.rateMean = float64(1e9*snapshot.count) / float64(time.Since(t))
case m.out <- snapshot:
case <-m.ticker.C:
a1.Tick()
a5.Tick()
a15.Tick()
snapshot.rate1 = a1.Rate()
snapshot.rate5 = a5.Rate()
snapshot.rate15 = a15.Rate()
snapshot.rateMean = float64(1e9*snapshot.count) / float64(time.Since(t))
case <-ma.ticker.C:
ma.tickMeters()
}
}
}
10 changes: 5 additions & 5 deletions meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ func TestGetOrRegisterMeter(t *testing.T) {
}

func TestMeterDecay(t *testing.T) {
m := &StandardMeter{
make(chan int64),
make(chan *MeterSnapshot),
time.NewTicker(1),
ma := meterArbiter{
ticker: time.NewTicker(1),
}
go m.arbiter()
m := newStandardMeter()
ma.meters = append(ma.meters, m)
go ma.tick()
m.Mark(1)
rateMean := m.RateMean()
time.Sleep(1)
Expand Down

0 comments on commit 0c229a5

Please sign in to comment.