浏览代码

Merge pull request #206 from inooka-shiroyuki/stop-timer

Stop timer/meter
Mikhail P 8 年之前
父节点
当前提交
39aa482949
共有 7 个文件被更改,包括 122 次插入6 次删除
  1. 11 1
      README.md
  2. 35 4
      meter.go
  3. 14 1
      meter_test.go
  4. 15 0
      registry.go
  5. 17 0
      registry_test.go
  6. 18 0
      timer.go
  7. 12 0
      timer_test.go

+ 11 - 1
README.md

@@ -42,12 +42,22 @@ t.Update(47)
 Register() is not threadsafe. For threadsafe metric registration use
 GetOrRegister:
 
-```
+```go
 t := metrics.GetOrRegisterTimer("account.create.latency", nil)
 t.Time(func() {})
 t.Update(47)
 ```
 
+**NOTE:** Be sure to unregister short-lived meters and timers otherwise they will
+leak memory:
+
+```go
+// Will call Stop() on the Meter to allow for garbage collection
+metrics.Unregister("quux")
+// Or similarly for a Timer that embeds a Meter
+metrics.Unregister("bang")
+```
+
 Periodically log every metric in human-readable form to standard error:
 
 ```go

+ 35 - 4
meter.go

@@ -15,10 +15,13 @@ type Meter interface {
 	Rate15() float64
 	RateMean() float64
 	Snapshot() Meter
+	Stop()
 }
 
 // GetOrRegisterMeter returns an existing Meter or constructs and registers a
 // new StandardMeter.
+// Be sure to unregister the meter from the registry once it is of no use to
+// allow for garbage collection.
 func GetOrRegisterMeter(name string, r Registry) Meter {
 	if nil == r {
 		r = DefaultRegistry
@@ -27,6 +30,7 @@ func GetOrRegisterMeter(name string, r Registry) Meter {
 }
 
 // NewMeter constructs a new StandardMeter and launches a goroutine.
+// Be sure to call Stop() once the meter is of no use to allow for garbage collection.
 func NewMeter() Meter {
 	if UseNilMetrics {
 		return NilMeter{}
@@ -34,7 +38,7 @@ func NewMeter() Meter {
 	m := newStandardMeter()
 	arbiter.Lock()
 	defer arbiter.Unlock()
-	arbiter.meters = append(arbiter.meters, m)
+	arbiter.meters[m] = struct{}{}
 	if !arbiter.started {
 		arbiter.started = true
 		go arbiter.tick()
@@ -44,6 +48,8 @@ func NewMeter() Meter {
 
 // NewMeter constructs and registers a new StandardMeter and launches a
 // goroutine.
+// Be sure to unregister the meter from the registry once it is of no use to
+// allow for garbage collection.
 func NewRegisteredMeter(name string, r Registry) Meter {
 	c := NewMeter()
 	if nil == r {
@@ -86,6 +92,9 @@ func (m *MeterSnapshot) RateMean() float64 { return m.rateMean }
 // Snapshot returns the snapshot.
 func (m *MeterSnapshot) Snapshot() Meter { return m }
 
+// Stop is a no-op.
+func (m *MeterSnapshot) Stop() {}
+
 // NilMeter is a no-op Meter.
 type NilMeter struct{}
 
@@ -110,12 +119,16 @@ func (NilMeter) RateMean() float64 { return 0.0 }
 // Snapshot is a no-op.
 func (NilMeter) Snapshot() Meter { return NilMeter{} }
 
+// Stop is a no-op.
+func (NilMeter) Stop() {}
+
 // StandardMeter is the standard implementation of a Meter.
 type StandardMeter struct {
 	lock        sync.RWMutex
 	snapshot    *MeterSnapshot
 	a1, a5, a15 EWMA
 	startTime   time.Time
+	stopped     bool
 }
 
 func newStandardMeter() *StandardMeter {
@@ -128,6 +141,19 @@ func newStandardMeter() *StandardMeter {
 	}
 }
 
+// Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
+func (m *StandardMeter) Stop() {
+	m.lock.Lock()
+	stopped := m.stopped
+	m.stopped = true
+	m.lock.Unlock()
+	if !stopped {
+		arbiter.Lock()
+		delete(arbiter.meters, m)
+		arbiter.Unlock()
+	}
+}
+
 // Count returns the number of events recorded.
 func (m *StandardMeter) Count() int64 {
 	m.lock.RLock()
@@ -140,6 +166,9 @@ func (m *StandardMeter) Count() int64 {
 func (m *StandardMeter) Mark(n int64) {
 	m.lock.Lock()
 	defer m.lock.Unlock()
+	if m.stopped {
+		return
+	}
 	m.snapshot.count += n
 	m.a1.Update(n)
 	m.a5.Update(n)
@@ -205,14 +234,16 @@ func (m *StandardMeter) tick() {
 	m.updateSnapshot()
 }
 
+// meterArbiter ticks meters every 5s from a single goroutine.
+// meters are references in a set for future stopping.
 type meterArbiter struct {
 	sync.RWMutex
 	started bool
-	meters  []*StandardMeter
+	meters  map[*StandardMeter]struct{}
 	ticker  *time.Ticker
 }
 
-var arbiter = meterArbiter{ticker: time.NewTicker(5e9)}
+var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})}
 
 // Ticks meters on the scheduled interval
 func (ma *meterArbiter) tick() {
@@ -227,7 +258,7 @@ func (ma *meterArbiter) tick() {
 func (ma *meterArbiter) tickMeters() {
 	ma.RLock()
 	defer ma.RUnlock()
-	for _, meter := range ma.meters {
+	for meter := range ma.meters {
 		meter.tick()
 	}
 }

+ 14 - 1
meter_test.go

@@ -24,9 +24,10 @@ func TestGetOrRegisterMeter(t *testing.T) {
 func TestMeterDecay(t *testing.T) {
 	ma := meterArbiter{
 		ticker: time.NewTicker(time.Millisecond),
+		meters: make(map[*StandardMeter]struct{}),
 	}
 	m := newStandardMeter()
-	ma.meters = append(ma.meters, m)
+	ma.meters[m] = struct{}{}
 	go ma.tick()
 	m.Mark(1)
 	rateMean := m.RateMean()
@@ -44,6 +45,18 @@ func TestMeterNonzero(t *testing.T) {
 	}
 }
 
+func TestMeterStop(t *testing.T) {
+	l := len(arbiter.meters)
+	m := NewMeter()
+	if len(arbiter.meters) != l+1 {
+		t.Errorf("arbiter.meters: %d != %d\n", l+1, len(arbiter.meters))
+	}
+	m.Stop()
+	if len(arbiter.meters) != l {
+		t.Errorf("arbiter.meters: %d != %d\n", l, len(arbiter.meters))
+	}
+}
+
 func TestMeterSnapshot(t *testing.T) {
 	m := NewMeter()
 	m.Mark(1)

+ 15 - 0
registry.go

@@ -113,6 +113,7 @@ func (r *StandardRegistry) RunHealthchecks() {
 func (r *StandardRegistry) Unregister(name string) {
 	r.mutex.Lock()
 	defer r.mutex.Unlock()
+	r.stop(name)
 	delete(r.metrics, name)
 }
 
@@ -121,6 +122,7 @@ func (r *StandardRegistry) UnregisterAll() {
 	r.mutex.Lock()
 	defer r.mutex.Unlock()
 	for name, _ := range r.metrics {
+		r.stop(name)
 		delete(r.metrics, name)
 	}
 }
@@ -146,6 +148,19 @@ func (r *StandardRegistry) registered() map[string]interface{} {
 	return metrics
 }
 
+func (r *StandardRegistry) stop(name string) {
+	if i, ok := r.metrics[name]; ok {
+		if s, ok := i.(Stoppable); ok {
+			s.Stop()
+		}
+	}
+}
+
+// Stoppable defines the metrics which has to be stopped.
+type Stoppable interface {
+	Stop()
+}
+
 type PrefixedRegistry struct {
 	underlying Registry
 	prefix     string

+ 17 - 0
registry_test.go

@@ -119,6 +119,23 @@ func TestRegistryGetOrRegisterWithLazyInstantiation(t *testing.T) {
 	}
 }
 
+func TestRegistryUnregister(t *testing.T) {
+	l := len(arbiter.meters)
+	r := NewRegistry()
+	r.Register("foo", NewCounter())
+	r.Register("bar", NewMeter())
+	r.Register("baz", NewTimer())
+	if len(arbiter.meters) != l+2 {
+		t.Errorf("arbiter.meters: %d != %d\n", l+2, len(arbiter.meters))
+	}
+	r.Unregister("foo")
+	r.Unregister("bar")
+	r.Unregister("baz")
+	if len(arbiter.meters) != l {
+		t.Errorf("arbiter.meters: %d != %d\n", l+2, len(arbiter.meters))
+	}
+}
+
 func TestPrefixedChildRegistryGetOrRegister(t *testing.T) {
 	r := NewRegistry()
 	pr := NewPrefixedChildRegistry(r, "prefix.")

+ 18 - 0
timer.go

@@ -19,6 +19,7 @@ type Timer interface {
 	RateMean() float64
 	Snapshot() Timer
 	StdDev() float64
+	Stop()
 	Sum() int64
 	Time(func())
 	Update(time.Duration)
@@ -28,6 +29,8 @@ type Timer interface {
 
 // GetOrRegisterTimer returns an existing Timer or constructs and registers a
 // new StandardTimer.
+// Be sure to unregister the meter from the registry once it is of no use to
+// allow for garbage collection.
 func GetOrRegisterTimer(name string, r Registry) Timer {
 	if nil == r {
 		r = DefaultRegistry
@@ -36,6 +39,7 @@ func GetOrRegisterTimer(name string, r Registry) Timer {
 }
 
 // NewCustomTimer constructs a new StandardTimer from a Histogram and a Meter.
+// Be sure to call Stop() once the timer is of no use to allow for garbage collection.
 func NewCustomTimer(h Histogram, m Meter) Timer {
 	if UseNilMetrics {
 		return NilTimer{}
@@ -47,6 +51,8 @@ func NewCustomTimer(h Histogram, m Meter) Timer {
 }
 
 // NewRegisteredTimer constructs and registers a new StandardTimer.
+// Be sure to unregister the meter from the registry once it is of no use to
+// allow for garbage collection.
 func NewRegisteredTimer(name string, r Registry) Timer {
 	c := NewTimer()
 	if nil == r {
@@ -58,6 +64,7 @@ func NewRegisteredTimer(name string, r Registry) Timer {
 
 // NewTimer constructs a new StandardTimer using an exponentially-decaying
 // sample with the same reservoir size and alpha as UNIX load averages.
+// Be sure to call Stop() once the timer is of no use to allow for garbage collection.
 func NewTimer() Timer {
 	if UseNilMetrics {
 		return NilTimer{}
@@ -112,6 +119,9 @@ func (NilTimer) Snapshot() Timer { return NilTimer{} }
 // StdDev is a no-op.
 func (NilTimer) StdDev() float64 { return 0.0 }
 
+// Stop is a no-op.
+func (NilTimer) Stop() {}
+
 // Sum is a no-op.
 func (NilTimer) Sum() int64 { return 0 }
 
@@ -201,6 +211,11 @@ func (t *StandardTimer) StdDev() float64 {
 	return t.histogram.StdDev()
 }
 
+// Stop stops the meter.
+func (t *StandardTimer) Stop() {
+	t.meter.Stop()
+}
+
 // Sum returns the sum in the sample.
 func (t *StandardTimer) Sum() int64 {
 	return t.histogram.Sum()
@@ -288,6 +303,9 @@ func (t *TimerSnapshot) Snapshot() Timer { return t }
 // was taken.
 func (t *TimerSnapshot) StdDev() float64 { return t.histogram.StdDev() }
 
+// Stop is a no-op.
+func (t *TimerSnapshot) Stop() {}
+
 // Sum returns the sum at the time the snapshot was taken.
 func (t *TimerSnapshot) Sum() int64 { return t.histogram.Sum() }
 

+ 12 - 0
timer_test.go

@@ -32,6 +32,18 @@ func TestTimerExtremes(t *testing.T) {
 	}
 }
 
+func TestTimerStop(t *testing.T) {
+	l := len(arbiter.meters)
+	tm := NewTimer()
+	if len(arbiter.meters) != l+1 {
+		t.Errorf("arbiter.meters: %d != %d\n", l+1, len(arbiter.meters))
+	}
+	tm.Stop()
+	if len(arbiter.meters) != l {
+		t.Errorf("arbiter.meters: %d != %d\n", l, len(arbiter.meters))
+	}
+}
+
 func TestTimerFunc(t *testing.T) {
 	tm := NewTimer()
 	tm.Time(func() { time.Sleep(50e6) })