Procházet zdrojové kódy

Merge branch 'uncontended'

Richard Crowley před 12 roky
rodič
revize
08e5fc0f36
11 změnil soubory, kde provedl 436 přidání a 382 odebrání
  1. 28 7
      debug.go
  2. 20 31
      ewma.go
  3. 8 0
      ewma_test.go
  4. 63 82
      histogram.go
  5. 7 0
      histogram_test.go
  6. 2 3
      meter.go
  7. 13 8
      registry.go
  8. 27 0
      registry_test.go
  9. 132 80
      runtime.go
  10. 102 133
      sample.go
  11. 34 38
      sample_test.go

+ 28 - 7
debug.go

@@ -5,7 +5,19 @@ import (
 	"time"
 )
 
-var gcStats debug.GCStats
+var (
+	debugMetrics struct {
+		GCStats struct {
+			LastGC Gauge
+			NumGC  Gauge
+			Pause  Histogram
+			//PauseQuantiles Histogram
+			PauseTotal Gauge
+		}
+		ReadGCStats Timer
+	}
+	gcStats debug.GCStats
+)
 
 // Capture new values for the Go garbage collector statistics exported in
 // debug.GCStats.  This is designed to be called as a goroutine.
@@ -22,23 +34,32 @@ func CaptureDebugGCStats(r Registry, d time.Duration) {
 // panic.
 func CaptureDebugGCStatsOnce(r Registry) {
 	lastGC := gcStats.LastGC
+	t := time.Now()
 	debug.ReadGCStats(&gcStats)
-	r.Get("debug.GCStats.LastGC").(Gauge).Update(int64(gcStats.LastGC.UnixNano()))
-	r.Get("debug.GCStats.NumGC").(Gauge).Update(int64(gcStats.NumGC))
-	r.Get("debug.GCStats.PauseTotal").(Gauge).Update(int64(gcStats.PauseTotal))
+	debugMetrics.ReadGCStats.UpdateSince(t)
+
+	debugMetrics.GCStats.LastGC.Update(int64(gcStats.LastGC.UnixNano()))
+	debugMetrics.GCStats.NumGC.Update(int64(gcStats.NumGC))
 	if lastGC != gcStats.LastGC && 0 < len(gcStats.Pause) {
-		r.Get("debug.GCStats.Pause").(Histogram).Update(int64(gcStats.Pause[0]))
+		debugMetrics.GCStats.Pause.Update(int64(gcStats.Pause[0]))
 	}
-	//r.Get("debug.GCStats.PauseQuantiles").(Histogram).Update(gcStats.PauseQuantiles)
+	//debugMetrics.GCStats.PauseQuantiles.Update(gcStats.PauseQuantiles)
+	debugMetrics.GCStats.PauseTotal.Update(int64(gcStats.PauseTotal))
 }
 
 // Register metrics for the Go garbage collector statistics exported in
 // debug.GCStats.  The metrics are named by their fully-qualified Go symbols,
 // i.e. debug.GCStats.PauseTotal.
 func RegisterDebugGCStats(r Registry) {
+	debugMetrics.GCStats.LastGC = NewGauge()
+	debugMetrics.GCStats.NumGC = NewGauge()
+	debugMetrics.GCStats.Pause = NewHistogram(NewExpDecaySample(1028, 0.015))
+	//debugMetrics.GCStats.PauseQuantiles = NewHistogram(NewExpDecaySample(1028, 0.015))
+	debugMetrics.GCStats.PauseTotal = NewGauge()
+
 	r.Register("debug.GCStats.LastGC", NewGauge())
 	r.Register("debug.GCStats.NumGC", NewGauge())
-	r.Register("debug.GCStats.PauseTotal", NewGauge())
 	r.Register("debug.GCStats.Pause", NewHistogram(NewExpDecaySample(1028, 0.015)))
 	//r.Register("debug.GCStats.PauseQuantiles", NewHistogram(NewExpDecaySample(1028, 0.015)))
+	r.Register("debug.GCStats.PauseTotal", NewGauge())
 }

+ 20 - 31
ewma.go

@@ -2,6 +2,7 @@ package metrics
 
 import (
 	"math"
+	"sync"
 	"sync/atomic"
 )
 
@@ -21,20 +22,18 @@ type EWMA interface {
 // to manage uncounted events.
 type StandardEWMA struct {
 	alpha     float64
+	init      bool
+	mutex     sync.Mutex
+	rate      float64
 	uncounted int64
-	in        chan bool
-	out       chan float64
 }
 
 // Force the compiler to check that StandardEWMA implements EWMA.
 var _ EWMA = &StandardEWMA{}
 
-// Create a new EWMA with the given alpha.  Create the clock channel and
-// start the ticker goroutine.
+// Create a new EWMA with the given alpha.
 func NewEWMA(alpha float64) *StandardEWMA {
-	a := &StandardEWMA{alpha, 0, make(chan bool), make(chan float64)}
-	go a.arbiter()
-	return a
+	return &StandardEWMA{alpha: alpha}
 }
 
 // Create a new EWMA with alpha set for a one-minute moving average.
@@ -54,37 +53,27 @@ func NewEWMA15() *StandardEWMA {
 
 // Return the moving average rate of events per second.
 func (a *StandardEWMA) Rate() float64 {
-	return <-a.out * float64(1e9)
+	a.mutex.Lock()
+	defer a.mutex.Unlock()
+	return a.rate * float64(1e9)
 }
 
 // Tick the clock to update the moving average.
 func (a *StandardEWMA) Tick() {
-	a.in <- true
+	count := atomic.LoadInt64(&a.uncounted)
+	atomic.AddInt64(&a.uncounted, -count)
+	instantRate := float64(count) / float64(5e9)
+	a.mutex.Lock()
+	defer a.mutex.Unlock()
+	if a.init {
+		a.rate += a.alpha * (instantRate - a.rate)
+	} else {
+		a.init = true
+		a.rate = instantRate
+	}
 }
 
 // Add n uncounted events.
 func (a *StandardEWMA) Update(n int64) {
 	atomic.AddInt64(&a.uncounted, n)
 }
-
-// On each clock tick, update the moving average to reflect the number of
-// events seen since the last tick.
-func (a *StandardEWMA) arbiter() {
-	var initialized bool
-	var rate float64
-	for {
-		select {
-		case <-a.in:
-			count := atomic.LoadInt64(&a.uncounted)
-			atomic.AddInt64(&a.uncounted, -count)
-			instantRate := float64(count) / float64(5e9)
-			if initialized {
-				rate += a.alpha * (instantRate - rate)
-			} else {
-				initialized = true
-				rate = instantRate
-			}
-		case a.out <- rate:
-		}
-	}
-}

+ 8 - 0
ewma_test.go

@@ -2,6 +2,14 @@ package metrics
 
 import "testing"
 
+func BenchmarkEWMA(b *testing.B) {
+	a := NewEWMA1()
+	for i := 0; i < b.N; i++ {
+		a.Update(1)
+		a.Tick()
+	}
+}
+
 func TestEWMA1(t *testing.T) {
 	a := NewEWMA1()
 	a.Update(3)

+ 63 - 82
histogram.go

@@ -3,6 +3,8 @@ package metrics
 import (
 	"math"
 	"sort"
+	"sync"
+	"sync/atomic"
 )
 
 // Histograms calculate distribution statistics from an int64 value.
@@ -25,80 +27,72 @@ type Histogram interface {
 // The standard implementation of a Histogram uses a Sample and a goroutine
 // to synchronize its calculations.
 type StandardHistogram struct {
-	s     Sample
-	in    chan int64
-	out   chan histogramV
-	reset chan bool
+	count, sum, min, max int64
+	mutex                sync.Mutex
+	s                    Sample
+	variance             [2]float64
 }
 
 // Force the compiler to check that StandardHistogram implements Histogram.
 var _ Histogram = &StandardHistogram{}
 
-// A histogramV contains all the values that would need to be passed back
-// from the synchronizing goroutine.
-type histogramV struct {
-	count, sum, min, max int64
-	variance             [2]float64
-}
-
-// Create a new histogram with the given Sample.  Create the communication
-// channels and start the synchronizing goroutine.
+// Create a new histogram with the given Sample.  The initial values compare
+// so that the first value will be both min and max and the variance is flagged
+// for special treatment on its first iteration.
 func NewHistogram(s Sample) *StandardHistogram {
-	h := &StandardHistogram{
-		s,
-		make(chan int64),
-		make(chan histogramV),
-		make(chan bool),
-	}
-	go h.arbiter()
-	return h
-}
-
-// Create a new histogramV.  The initial values compare so that the first
-// value will be both min and max and the variance is flagged for special
-// treatment on its first iteration.
-func newHistogramV() histogramV {
-	return histogramV{
-		0, 0, math.MaxInt64, math.MinInt64,
-		[2]float64{-1.0, 0.0},
+	return &StandardHistogram{
+		max:      math.MinInt64,
+		min:      math.MaxInt64,
+		s:        s,
+		variance: [2]float64{-1.0, 0.0},
 	}
 }
 
 // Clear the histogram.
 func (h *StandardHistogram) Clear() {
-	h.reset <- true
+	h.mutex.Lock()
+	defer h.mutex.Unlock()
+	h.count = 0
+	h.max = math.MinInt64
+	h.min = math.MaxInt64
+	h.s.Clear()
+	h.sum = 0
+	h.variance = [...]float64{-1.0, 0.0}
 }
 
 // Return the count of inputs since the histogram was last cleared.
 func (h *StandardHistogram) Count() int64 {
-	return (<-h.out).count
+	return atomic.LoadInt64(&h.count)
 }
 
 // Return the maximal value seen since the histogram was last cleared.
 func (h *StandardHistogram) Max() int64 {
-	hv := <-h.out
-	if 0 < hv.count {
-		return hv.max
+	h.mutex.Lock()
+	defer h.mutex.Unlock()
+	if 0 == h.count {
+		return 0
 	}
-	return 0
+	return h.max
 }
 
 // Return the mean of all values seen since the histogram was last cleared.
 func (h *StandardHistogram) Mean() float64 {
-	hv := <-h.out
-	if 0 < hv.count {
-		return float64(hv.sum) / float64(hv.count)
+	h.mutex.Lock()
+	defer h.mutex.Unlock()
+	if 0 == h.count {
+		return 0
 	}
-	return 0
+	return float64(h.sum) / float64(h.count)
 }
 
 // Return the minimal value seen since the histogram was last cleared.
 func (h *StandardHistogram) Min() int64 {
-	hv := <-h.out
-	if 0 < hv.count {
-		return hv.min
+	h.mutex.Lock()
+	defer h.mutex.Unlock()
+	if 0 == h.count {
+		return 0
 	}
-	return 0
+	return h.min
 }
 
 // Return an arbitrary percentile of all values seen since the histogram was
@@ -139,50 +133,37 @@ func (h *StandardHistogram) StdDev() float64 {
 
 // Update the histogram with a new value.
 func (h *StandardHistogram) Update(v int64) {
-	h.in <- v
+	h.mutex.Lock()
+	defer h.mutex.Unlock()
+	h.s.Update(v)
+	h.count++
+	if v < h.min {
+		h.min = v
+	}
+	if v > h.max {
+		h.max = v
+	}
+	h.sum += v
+	fv := float64(v)
+	if -1.0 == h.variance[0] {
+		h.variance[0] = fv
+		h.variance[1] = 0.0
+	} else {
+		m := h.variance[0]
+		s := h.variance[1]
+		h.variance[0] = m + (fv-m)/float64(h.count)
+		h.variance[1] = s + (fv-m)*(fv-h.variance[0])
+	}
 }
 
 // Return the variance of all values seen since the histogram was last cleared.
 func (h *StandardHistogram) Variance() float64 {
-	hv := <-h.out
-	if 1 >= hv.count {
+	h.mutex.Lock()
+	defer h.mutex.Unlock()
+	if 1 >= h.count {
 		return 0.0
 	}
-	return hv.variance[1] / float64(hv.count-1)
-}
-
-// Receive inputs and send outputs.  Sample each input and update values in
-// the histogramV.  Send a copy of the histogramV as output.
-func (h *StandardHistogram) arbiter() {
-	hv := newHistogramV()
-	for {
-		select {
-		case v := <-h.in:
-			h.s.Update(v)
-			hv.count++
-			if v < hv.min {
-				hv.min = v
-			}
-			if v > hv.max {
-				hv.max = v
-			}
-			hv.sum += v
-			fv := float64(v)
-			if -1.0 == hv.variance[0] {
-				hv.variance[0] = fv
-				hv.variance[1] = 0.0
-			} else {
-				m := hv.variance[0]
-				s := hv.variance[1]
-				hv.variance[0] = m + (fv-m)/float64(hv.count)
-				hv.variance[1] = s + (fv-m)*(fv-hv.variance[0])
-			}
-		case h.out <- hv:
-		case <-h.reset:
-			h.s.Clear()
-			hv = newHistogramV()
-		}
-	}
+	return h.variance[1] / float64(h.count-1)
 }
 
 // Cribbed from the standard library's `sort` package.

+ 7 - 0
histogram_test.go

@@ -2,6 +2,13 @@ package metrics
 
 import "testing"
 
+func BenchmarkHistogram(b *testing.B) {
+	h := NewHistogram(NewUniformSample(100))
+	for i := 0; i < b.N; i++ {
+		h.Update(1)
+	}
+}
+
 func TestEmptyHistogram(t *testing.T) {
 	h := NewHistogram(NewUniformSample(100))
 	if count := h.Count(); 0 != count {

+ 2 - 3
meter.go

@@ -85,7 +85,7 @@ func (m *StandardMeter) arbiter() {
 	a1 := NewEWMA1()
 	a5 := NewEWMA5()
 	a15 := NewEWMA15()
-	tsStart := time.Now()
+	t := time.Now()
 	for {
 		select {
 		case n := <-m.in:
@@ -96,8 +96,7 @@ func (m *StandardMeter) arbiter() {
 			mv.rate5 = a5.Rate()
 			a15.Update(n)
 			mv.rate15 = a15.Rate()
-			mv.rateMean = float64(1e9*mv.count) / float64(
-				time.Now().Sub(tsStart))
+			mv.rateMean = float64(1e9*mv.count) / float64(time.Since(t))
 		case m.out <- mv:
 		case <-m.ticker.C:
 			a1.Tick()

+ 13 - 8
registry.go

@@ -28,8 +28,8 @@ type Registry interface {
 // The standard implementation of a Registry is a mutex-protected map
 // of names to metrics.
 type StandardRegistry struct {
-	mutex   *sync.Mutex
 	metrics map[string]interface{}
+	mutex   sync.Mutex
 }
 
 // Force the compiler to check that StandardRegistry implements Registry.
@@ -37,17 +37,12 @@ var _ Registry = &StandardRegistry{}
 
 // Create a new registry.
 func NewRegistry() *StandardRegistry {
-	return &StandardRegistry{
-		&sync.Mutex{},
-		make(map[string]interface{}),
-	}
+	return &StandardRegistry{metrics: make(map[string]interface{})}
 }
 
 // Call the given function for each registered metric.
 func (r *StandardRegistry) Each(f func(string, interface{})) {
-	r.mutex.Lock()
-	defer r.mutex.Unlock()
-	for name, i := range r.metrics {
+	for name, i := range r.registered() {
 		f(name, i)
 	}
 }
@@ -87,6 +82,16 @@ func (r *StandardRegistry) Unregister(name string) {
 	delete(r.metrics, name)
 }
 
+func (r *StandardRegistry) registered() map[string]interface{} {
+	metrics := make(map[string]interface{}, len(r.metrics))
+	r.mutex.Lock()
+	defer r.mutex.Unlock()
+	for name, i := range r.metrics {
+		metrics[name] = i
+	}
+	return metrics
+}
+
 var DefaultRegistry *StandardRegistry = NewRegistry()
 
 // Call the given function for each registered metric.

+ 27 - 0
registry_test.go

@@ -0,0 +1,27 @@
+package metrics
+
+import "testing"
+
+func TestRegistry(t *testing.T) {
+	r := NewRegistry()
+	r.Register("foo", NewCounter())
+	i := 0
+	r.Each(func(name string, iface interface{}) {
+		i++
+		if "foo" != name {
+			t.Fatal(name)
+		}
+		if _, ok := iface.(Counter); !ok {
+			t.Fatal(iface)
+		}
+	})
+	if 1 != i {
+		t.Fatal(i)
+	}
+	r.Unregister("foo")
+	i = 0
+	r.Each(func(string, interface{}) { i++ })
+	if 0 != i {
+		t.Fatal(i)
+	}
+}

+ 132 - 80
runtime.go

@@ -6,8 +6,41 @@ import (
 )
 
 var (
-	numGC    uint32
-	memStats runtime.MemStats
+	memStats       runtime.MemStats
+	runtimeMetrics struct {
+		MemStats struct {
+			Alloc        Gauge
+			BuckHashSys  Gauge
+			DebugGC      Gauge
+			EnableGC     Gauge
+			Frees        Gauge
+			HeapAlloc    Gauge
+			HeapIdle     Gauge
+			HeapInuse    Gauge
+			HeapObjects  Gauge
+			HeapReleased Gauge
+			HeapSys      Gauge
+			LastGC       Gauge
+			Lookups      Gauge
+			Mallocs      Gauge
+			MCacheInuse  Gauge
+			MCacheSys    Gauge
+			MSpanInuse   Gauge
+			MSpanSys     Gauge
+			NextGC       Gauge
+			NumGC        Gauge
+			PauseNs      Histogram
+			PauseTotalNs Gauge
+			StackInuse   Gauge
+			StackSys     Gauge
+			Sys          Gauge
+			TotalAlloc   Gauge
+		}
+		NumCgoCall   Gauge
+		NumGoroutine Gauge
+		ReadMemStats Timer
+	}
+	numGC uint32
 )
 
 // Capture new values for the Go runtime statistics exported in
@@ -24,92 +57,111 @@ func CaptureRuntimeMemStats(r Registry, d time.Duration) {
 // goroutine.  Giving a registry which has not been given to
 // RegisterRuntimeMemStats will panic.
 func CaptureRuntimeMemStatsOnce(r Registry) {
+	t := time.Now()
 	runtime.ReadMemStats(&memStats)
+	runtimeMetrics.ReadMemStats.UpdateSince(t)
 
-	r.Get("runtime.MemStats.Alloc").(Gauge).Update(int64(memStats.Alloc))
-	r.Get("runtime.MemStats.TotalAlloc").(Gauge).Update(int64(memStats.TotalAlloc))
-	r.Get("runtime.MemStats.Sys").(Gauge).Update(int64(memStats.Sys))
-	r.Get("runtime.MemStats.Lookups").(Gauge).Update(int64(memStats.Lookups))
-	r.Get("runtime.MemStats.Mallocs").(Gauge).Update(int64(memStats.Mallocs))
-	r.Get("runtime.MemStats.Frees").(Gauge).Update(int64(memStats.Frees))
-
-	r.Get("runtime.MemStats.HeapAlloc").(Gauge).Update(int64(memStats.HeapAlloc))
-	r.Get("runtime.MemStats.HeapSys").(Gauge).Update(int64(memStats.HeapSys))
-	r.Get("runtime.MemStats.HeapIdle").(Gauge).Update(int64(memStats.HeapIdle))
-	r.Get("runtime.MemStats.HeapInuse").(Gauge).Update(int64(memStats.HeapInuse))
-	r.Get("runtime.MemStats.HeapReleased").(Gauge).Update(int64(memStats.HeapReleased))
-	r.Get("runtime.MemStats.HeapObjects").(Gauge).Update(int64(memStats.HeapObjects))
-
-	r.Get("runtime.MemStats.StackInuse").(Gauge).Update(int64(memStats.StackInuse))
-	r.Get("runtime.MemStats.StackSys").(Gauge).Update(int64(memStats.StackSys))
-	r.Get("runtime.MemStats.MSpanInuse").(Gauge).Update(int64(memStats.MSpanInuse))
-	r.Get("runtime.MemStats.MSpanSys").(Gauge).Update(int64(memStats.MSpanSys))
-	r.Get("runtime.MemStats.MCacheInuse").(Gauge).Update(int64(memStats.MCacheInuse))
-	r.Get("runtime.MemStats.MCacheSys").(Gauge).Update(int64(memStats.MCacheSys))
-	r.Get("runtime.MemStats.BuckHashSys").(Gauge).Update(int64(memStats.BuckHashSys))
-
-	r.Get("runtime.MemStats.NextGC").(Gauge).Update(int64(memStats.NextGC))
-	r.Get("runtime.MemStats.LastGC").(Gauge).Update(int64(memStats.LastGC))
-	r.Get("runtime.MemStats.PauseTotalNs").(Gauge).Update(int64(memStats.PauseTotalNs))
-	// <https://code.google.com/p/go/source/browse/src/pkg/runtime/mgc0.c>
-	for i := uint32(1); i <= memStats.NumGC-numGC; i++ {
-		r.Get("runtime.MemStats.PauseNs").(Histogram).Update(int64(memStats.PauseNs[(memStats.NumGC%256-i)%256]))
-	}
-	r.Get("runtime.MemStats.NumGC").(Gauge).Update(int64(memStats.NumGC))
-	if memStats.EnableGC {
-		r.Get("runtime.MemStats.EnableGC").(Gauge).Update(1)
+	runtimeMetrics.MemStats.Alloc.Update(int64(memStats.Alloc))
+	runtimeMetrics.MemStats.BuckHashSys.Update(int64(memStats.BuckHashSys))
+	if memStats.DebugGC {
+		runtimeMetrics.MemStats.DebugGC.Update(1)
 	} else {
-		r.Get("runtime.MemStats.EnableGC").(Gauge).Update(0)
+		runtimeMetrics.MemStats.DebugGC.Update(0)
 	}
 	if memStats.EnableGC {
-		r.Get("runtime.MemStats.DebugGC").(Gauge).Update(1)
+		runtimeMetrics.MemStats.EnableGC.Update(1)
 	} else {
-		r.Get("runtime.MemStats.DebugGC").(Gauge).Update(0)
+		runtimeMetrics.MemStats.EnableGC.Update(0)
 	}
-
-	r.Get("runtime.NumCgoCall").(Gauge).Update(int64(runtime.NumCgoCall()))
-	r.Get("runtime.NumGoroutine").(Gauge).Update(int64(runtime.NumGoroutine()))
-
+	runtimeMetrics.MemStats.Frees.Update(int64(memStats.Frees))
+	runtimeMetrics.MemStats.HeapAlloc.Update(int64(memStats.HeapAlloc))
+	runtimeMetrics.MemStats.HeapIdle.Update(int64(memStats.HeapIdle))
+	runtimeMetrics.MemStats.HeapInuse.Update(int64(memStats.HeapInuse))
+	runtimeMetrics.MemStats.HeapObjects.Update(int64(memStats.HeapObjects))
+	runtimeMetrics.MemStats.HeapReleased.Update(int64(memStats.HeapReleased))
+	runtimeMetrics.MemStats.HeapSys.Update(int64(memStats.HeapSys))
+	runtimeMetrics.MemStats.LastGC.Update(int64(memStats.LastGC))
+	runtimeMetrics.MemStats.Lookups.Update(int64(memStats.Lookups))
+	runtimeMetrics.MemStats.Mallocs.Update(int64(memStats.Mallocs))
+	runtimeMetrics.MemStats.MCacheInuse.Update(int64(memStats.MCacheInuse))
+	runtimeMetrics.MemStats.MCacheSys.Update(int64(memStats.MCacheSys))
+	runtimeMetrics.MemStats.MSpanInuse.Update(int64(memStats.MSpanInuse))
+	runtimeMetrics.MemStats.MSpanSys.Update(int64(memStats.MSpanSys))
+	runtimeMetrics.MemStats.NextGC.Update(int64(memStats.NextGC))
+	runtimeMetrics.MemStats.NumGC.Update(int64(memStats.NumGC))
+	for i := uint32(1); i <= memStats.NumGC-numGC; i++ {
+		runtimeMetrics.MemStats.PauseNs.Update(int64(memStats.PauseNs[(memStats.NumGC%256-i)%256])) // <https://code.google.com/p/go/source/browse/src/pkg/runtime/mgc0.c>
+	}
+	runtimeMetrics.MemStats.PauseTotalNs.Update(int64(memStats.PauseTotalNs))
+	runtimeMetrics.MemStats.StackInuse.Update(int64(memStats.StackInuse))
+	runtimeMetrics.MemStats.StackSys.Update(int64(memStats.StackSys))
+	runtimeMetrics.MemStats.Sys.Update(int64(memStats.Sys))
+	runtimeMetrics.MemStats.TotalAlloc.Update(int64(memStats.TotalAlloc))
+	runtimeMetrics.NumCgoCall.Update(int64(runtime.NumCgoCall()))
+	runtimeMetrics.NumGoroutine.Update(int64(runtime.NumGoroutine()))
 }
 
-// Register metrics for the Go runtime statistics exported in
-// runtime.MemStats.  The metrics are named by their fully-qualified
-// Go symbols, i.e. runtime.MemStatsAlloc.  In addition to
-// runtime.MemStats, register the return value of runtime.Goroutines()
-// as runtime.Goroutines.
+// Register runtimeMetrics for the Go runtime statistics exported in runtime and
+// specifically runtime.MemStats.  The runtimeMetrics are named by their
+// fully-qualified Go symbols, i.e. runtime.MemStats.Alloc.
 func RegisterRuntimeMemStats(r Registry) {
-
-	r.Register("runtime.MemStats.Alloc", NewGauge())
-	r.Register("runtime.MemStats.TotalAlloc", NewGauge())
-	r.Register("runtime.MemStats.Sys", NewGauge())
-	r.Register("runtime.MemStats.Lookups", NewGauge())
-	r.Register("runtime.MemStats.Mallocs", NewGauge())
-	r.Register("runtime.MemStats.Frees", NewGauge())
-
-	r.Register("runtime.MemStats.HeapAlloc", NewGauge())
-	r.Register("runtime.MemStats.HeapSys", NewGauge())
-	r.Register("runtime.MemStats.HeapIdle", NewGauge())
-	r.Register("runtime.MemStats.HeapInuse", NewGauge())
-	r.Register("runtime.MemStats.HeapReleased", NewGauge())
-	r.Register("runtime.MemStats.HeapObjects", NewGauge())
-
-	r.Register("runtime.MemStats.StackInuse", NewGauge())
-	r.Register("runtime.MemStats.StackSys", NewGauge())
-	r.Register("runtime.MemStats.MSpanInuse", NewGauge())
-	r.Register("runtime.MemStats.MSpanSys", NewGauge())
-	r.Register("runtime.MemStats.MCacheInuse", NewGauge())
-	r.Register("runtime.MemStats.MCacheSys", NewGauge())
-	r.Register("runtime.MemStats.BuckHashSys", NewGauge())
-
-	r.Register("runtime.MemStats.NextGC", NewGauge())
-	r.Register("runtime.MemStats.LastGC", NewGauge())
-	r.Register("runtime.MemStats.PauseTotalNs", NewGauge())
-	r.Register("runtime.MemStats.PauseNs", NewHistogram(NewExpDecaySample(1028, 0.015)))
-	r.Register("runtime.MemStats.NumGC", NewGauge())
-	r.Register("runtime.MemStats.EnableGC", NewGauge())
-	r.Register("runtime.MemStats.DebugGC", NewGauge())
-
-	r.Register("runtime.NumCgoCall", NewGauge())
-	r.Register("runtime.NumGoroutine", NewGauge())
-
+	runtimeMetrics.MemStats.Alloc = NewGauge()
+	runtimeMetrics.MemStats.BuckHashSys = NewGauge()
+	runtimeMetrics.MemStats.DebugGC = NewGauge()
+	runtimeMetrics.MemStats.EnableGC = NewGauge()
+	runtimeMetrics.MemStats.Frees = NewGauge()
+	runtimeMetrics.MemStats.HeapAlloc = NewGauge()
+	runtimeMetrics.MemStats.HeapIdle = NewGauge()
+	runtimeMetrics.MemStats.HeapInuse = NewGauge()
+	runtimeMetrics.MemStats.HeapObjects = NewGauge()
+	runtimeMetrics.MemStats.HeapReleased = NewGauge()
+	runtimeMetrics.MemStats.HeapSys = NewGauge()
+	runtimeMetrics.MemStats.LastGC = NewGauge()
+	runtimeMetrics.MemStats.Lookups = NewGauge()
+	runtimeMetrics.MemStats.Mallocs = NewGauge()
+	runtimeMetrics.MemStats.MCacheInuse = NewGauge()
+	runtimeMetrics.MemStats.MCacheSys = NewGauge()
+	runtimeMetrics.MemStats.MSpanInuse = NewGauge()
+	runtimeMetrics.MemStats.MSpanSys = NewGauge()
+	runtimeMetrics.MemStats.NextGC = NewGauge()
+	runtimeMetrics.MemStats.NumGC = NewGauge()
+	runtimeMetrics.MemStats.PauseNs = NewHistogram(NewExpDecaySample(1028, 0.015))
+	runtimeMetrics.MemStats.PauseTotalNs = NewGauge()
+	runtimeMetrics.MemStats.StackInuse = NewGauge()
+	runtimeMetrics.MemStats.StackSys = NewGauge()
+	runtimeMetrics.MemStats.Sys = NewGauge()
+	runtimeMetrics.MemStats.TotalAlloc = NewGauge()
+	runtimeMetrics.NumCgoCall = NewGauge()
+	runtimeMetrics.NumGoroutine = NewGauge()
+	runtimeMetrics.ReadMemStats = NewTimer()
+
+	r.Register("runtime.MemStats.Alloc", runtimeMetrics.MemStats.Alloc)
+	r.Register("runtime.MemStats.BuckHashSys", runtimeMetrics.MemStats.BuckHashSys)
+	r.Register("runtime.MemStats.DebugGC", runtimeMetrics.MemStats.DebugGC)
+	r.Register("runtime.MemStats.EnableGC", runtimeMetrics.MemStats.EnableGC)
+	r.Register("runtime.MemStats.Frees", runtimeMetrics.MemStats.Frees)
+	r.Register("runtime.MemStats.HeapAlloc", runtimeMetrics.MemStats.HeapAlloc)
+	r.Register("runtime.MemStats.HeapIdle", runtimeMetrics.MemStats.HeapIdle)
+	r.Register("runtime.MemStats.HeapInuse", runtimeMetrics.MemStats.HeapInuse)
+	r.Register("runtime.MemStats.HeapObjects", runtimeMetrics.MemStats.HeapObjects)
+	r.Register("runtime.MemStats.HeapReleased", runtimeMetrics.MemStats.HeapReleased)
+	r.Register("runtime.MemStats.HeapSys", runtimeMetrics.MemStats.HeapSys)
+	r.Register("runtime.MemStats.LastGC", runtimeMetrics.MemStats.LastGC)
+	r.Register("runtime.MemStats.Lookups", runtimeMetrics.MemStats.Lookups)
+	r.Register("runtime.MemStats.Mallocs", runtimeMetrics.MemStats.Mallocs)
+	r.Register("runtime.MemStats.MCacheInuse", runtimeMetrics.MemStats.MCacheInuse)
+	r.Register("runtime.MemStats.MCacheSys", runtimeMetrics.MemStats.MCacheSys)
+	r.Register("runtime.MemStats.MSpanInuse", runtimeMetrics.MemStats.MSpanInuse)
+	r.Register("runtime.MemStats.MSpanSys", runtimeMetrics.MemStats.MSpanSys)
+	r.Register("runtime.MemStats.NextGC", runtimeMetrics.MemStats.NextGC)
+	r.Register("runtime.MemStats.NumGC", runtimeMetrics.MemStats.NumGC)
+	r.Register("runtime.MemStats.PauseNs", runtimeMetrics.MemStats.PauseNs)
+	r.Register("runtime.MemStats.PauseTotalNs", runtimeMetrics.MemStats.PauseTotalNs)
+	r.Register("runtime.MemStats.StackInuse", runtimeMetrics.MemStats.StackInuse)
+	r.Register("runtime.MemStats.StackSys", runtimeMetrics.MemStats.StackSys)
+	r.Register("runtime.MemStats.Sys", runtimeMetrics.MemStats.Sys)
+	r.Register("runtime.MemStats.TotalAlloc", runtimeMetrics.MemStats.TotalAlloc)
+	r.Register("runtime.NumCgoCall", runtimeMetrics.NumCgoCall)
+	r.Register("runtime.NumGoroutine", runtimeMetrics.NumGoroutine)
+	r.Register("runtime.ReadMemStats", runtimeMetrics.ReadMemStats)
 }

+ 102 - 133
sample.go

@@ -4,6 +4,7 @@ import (
 	"container/heap"
 	"math"
 	"math/rand"
+	"sync"
 	"time"
 )
 
@@ -27,11 +28,11 @@ type Sample interface {
 //
 // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
 type ExpDecaySample struct {
-	reservoirSize int
 	alpha         float64
-	in            chan int64
-	out           chan chan []int64
-	reset         chan bool
+	mutex         sync.Mutex
+	reservoirSize int
+	t0, t1        time.Time
+	values        expDecaySampleHeap
 }
 
 // Force the compiler to check that ExpDecaySample implements Sample.
@@ -41,36 +42,113 @@ var _ Sample = &ExpDecaySample{}
 // and alpha.
 func NewExpDecaySample(reservoirSize int, alpha float64) *ExpDecaySample {
 	s := &ExpDecaySample{
-		reservoirSize,
-		alpha,
-		make(chan int64),
-		make(chan chan []int64),
-		make(chan bool),
+		alpha:         alpha,
+		reservoirSize: reservoirSize,
+		t0:            time.Now(),
+		values:        make(expDecaySampleHeap, 0, reservoirSize),
 	}
-	go s.arbiter()
+	s.t1 = time.Now().Add(rescaleThreshold)
 	return s
 }
 
 // Clear all samples.
 func (s *ExpDecaySample) Clear() {
-	s.reset <- true
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
+	s.t0 = time.Now()
+	s.t1 = s.t0.Add(rescaleThreshold)
 }
 
 // Return the size of the sample, which is at most the reservoir size.
 func (s *ExpDecaySample) Size() int {
-	return len(s.Values())
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	return len(s.values)
 }
 
 // Update the sample with a new value.
 func (s *ExpDecaySample) Update(v int64) {
-	s.in <- v
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	if len(s.values) == s.reservoirSize {
+		heap.Pop(&s.values)
+	}
+	t := time.Now()
+	heap.Push(&s.values, expDecaySample{
+		k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
+		v: v,
+	})
+	if t.After(s.t1) {
+		values := s.values
+		t0 := s.t0
+		s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
+		s.t0 = t
+		s.t1 = s.t0.Add(rescaleThreshold)
+		for _, v := range values {
+			v.k = v.k * math.Exp(-s.alpha*float64(s.t0.Sub(t0)))
+			heap.Push(&values, v)
+		}
+	}
 }
 
 // Return all the values in the sample.
 func (s *ExpDecaySample) Values() []int64 {
-	c := make(chan []int64)
-	s.out <- c
-	return <-c
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	values := make([]int64, len(s.values))
+	for i, v := range s.values {
+		values[i] = v.v
+	}
+	return values
+}
+
+// A uniform sample using Vitter's Algorithm R.
+//
+// <http://www.cs.umd.edu/~samir/498/vitter.pdf>
+type UniformSample struct {
+	mutex         sync.Mutex
+	reservoirSize int
+	values        []int64
+}
+
+// Create a new uniform sample with the given reservoir size.
+func NewUniformSample(reservoirSize int) *UniformSample {
+	return &UniformSample{reservoirSize: reservoirSize}
+}
+
+// Clear all samples.
+func (s *UniformSample) Clear() {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	s.values = make([]int64, 0, s.reservoirSize)
+}
+
+// Return the size of the sample, which is at most the reservoir size.
+func (s *UniformSample) Size() int {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	return len(s.values)
+}
+
+// Update the sample with a new value.
+func (s *UniformSample) Update(v int64) {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	if len(s.values) < s.reservoirSize {
+		s.values = append(s.values, v)
+	} else {
+		s.values[rand.Intn(s.reservoirSize)] = v
+	}
+}
+
+// Return all the values in the sample.
+func (s *UniformSample) Values() []int64 {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	values := make([]int64, len(s.values))
+	copy(values, s.values)
+	return values
 }
 
 // An individual sample.
@@ -90,18 +168,6 @@ func (q expDecaySampleHeap) Less(i, j int) bool {
 	return q[i].k < q[j].k
 }
 
-func (q expDecaySampleHeap) Swap(i, j int) {
-	q[i], q[j] = q[j], q[i]
-}
-
-func (q *expDecaySampleHeap) Push(x interface{}) {
-	q_ := *q
-	n := len(q_)
-	q_ = q_[0 : n+1]
-	q_[n] = x.(expDecaySample)
-	*q = q_
-}
-
 func (q *expDecaySampleHeap) Pop() interface{} {
 	q_ := *q
 	n := len(q_)
@@ -111,111 +177,14 @@ func (q *expDecaySampleHeap) Pop() interface{} {
 	return i
 }
 
-// Receive inputs and send outputs.  Count and save each input value,
-// rescaling the sample if enough time has elapsed since the last rescaling.
-// Send a copy of the values as output.
-func (s *ExpDecaySample) arbiter() {
-	values := make(expDecaySampleHeap, 0, s.reservoirSize)
-	start := time.Now()
-	next := time.Now().Add(rescaleThreshold)
-	for {
-		select {
-		case v := <-s.in:
-			if len(values) == s.reservoirSize {
-				heap.Pop(&values)
-			}
-			now := time.Now()
-			k := math.Exp(now.Sub(start).Seconds()*s.alpha) / rand.Float64()
-			heap.Push(&values, expDecaySample{k: k, v: v})
-			if now.After(next) {
-				oldValues := values
-				oldStart := start
-				values = make(expDecaySampleHeap, 0, s.reservoirSize)
-				start = time.Now()
-				next = start.Add(rescaleThreshold)
-				for _, e := range oldValues {
-					e.k = e.k * math.Exp(-s.alpha*float64(start.Sub(oldStart)))
-					heap.Push(&values, e)
-				}
-			}
-		case ch := <-s.out:
-			valuesCopy := make([]int64, len(values))
-			for i, e := range values {
-				valuesCopy[i] = e.v
-			}
-			ch <- valuesCopy
-		case <-s.reset:
-			values = make(expDecaySampleHeap, 0, s.reservoirSize)
-			start = time.Now()
-			next = start.Add(rescaleThreshold)
-		}
-	}
-}
-
-// A uniform sample using Vitter's Algorithm R.
-//
-// <http://www.cs.umd.edu/~samir/498/vitter.pdf>
-type UniformSample struct {
-	reservoirSize int
-	in            chan int64
-	out           chan chan []int64
-	reset         chan bool
-}
-
-// Create a new uniform sample with the given reservoir size.
-func NewUniformSample(reservoirSize int) *UniformSample {
-	s := &UniformSample{
-		reservoirSize,
-		make(chan int64),
-		make(chan chan []int64),
-		make(chan bool),
-	}
-	go s.arbiter()
-	return s
-}
-
-// Clear all samples.
-func (s *UniformSample) Clear() {
-	s.reset <- true
-}
-
-// Return the size of the sample, which is at most the reservoir size.
-func (s *UniformSample) Size() int {
-	return len(s.Values())
-}
-
-// Update the sample with a new value.
-func (s *UniformSample) Update(v int64) {
-	s.in <- v
+func (q *expDecaySampleHeap) Push(x interface{}) {
+	q_ := *q
+	n := len(q_)
+	q_ = q_[0 : n+1]
+	q_[n] = x.(expDecaySample)
+	*q = q_
 }
 
-// Return all the values in the sample.
-func (s *UniformSample) Values() []int64 {
-	c := make(chan []int64)
-	s.out <- c
-	return <-c
-}
-
-// Receive inputs and send outputs.  Count and save each input value at a
-// random index.  Send a copy of the values as output.
-func (s *UniformSample) arbiter() {
-	values := make([]int64, 0, s.reservoirSize)
-	for {
-		n := len(values)
-		select {
-		case v := <-s.in:
-			if n < s.reservoirSize {
-				values = values[0 : n+1]
-				values[n] = v
-			} else {
-				values[rand.Intn(s.reservoirSize)] = v
-			}
-		case ch := <-s.out:
-			valuesCopy := make([]int64, n)
-			copy(valuesCopy, values[:n])
-			ch <- valuesCopy
-		case <-s.reset:
-			values = make([]int64, 0, s.reservoirSize)
-		}
-	}
+func (q expDecaySampleHeap) Swap(i, j int) {
+	q[i], q[j] = q[j], q[i]
 }

+ 34 - 38
sample_test.go

@@ -1,12 +1,35 @@
 package metrics
 
 import (
-	"math/rand"
 	"runtime"
 	"testing"
 	"time"
 )
 
+func BenchmarkExpDecaySample257(b *testing.B) {
+	benchmarkSample(b, NewExpDecaySample(257, 0.015))
+}
+
+func BenchmarkExpDecaySample514(b *testing.B) {
+	benchmarkSample(b, NewExpDecaySample(514, 0.015))
+}
+
+func BenchmarkExpDecaySample1028(b *testing.B) {
+	benchmarkSample(b, NewExpDecaySample(1028, 0.015))
+}
+
+func BenchmarkUniformSample257(b *testing.B) {
+	benchmarkSample(b, NewUniformSample(257))
+}
+
+func BenchmarkUniformSample514(b *testing.B) {
+	benchmarkSample(b, NewUniformSample(514))
+}
+
+func BenchmarkUniformSample1028(b *testing.B) {
+	benchmarkSample(b, NewUniformSample(1028))
+}
+
 func TestExpDecaySample10(t *testing.T) {
 	s := NewExpDecaySample(100, 0.99)
 	for i := 0; i < 10; i++ {
@@ -131,44 +154,17 @@ func TestUniformSampleIncludesTail(t *testing.T) {
 }
 
 func benchmarkSample(b *testing.B, s Sample) {
-	var m runtime.MemStats
-	var p [2]uint64
-
-	runtime.ReadMemStats(&m)
-	p[0] = m.PauseTotalNs
-
+	b.StopTimer()
+	var memStats runtime.MemStats
+	var pauseTotalNs uint64
+	runtime.ReadMemStats(&memStats)
+	pauseTotalNs = memStats.PauseTotalNs
+	b.StartTimer()
 	for i := 0; i < b.N; i++ {
-		s.Update(rand.Int63())
+		s.Update(1)
 	}
-
+	b.StopTimer()
 	runtime.GC()
-
-	runtime.ReadMemStats(&m)
-	p[1] = m.PauseTotalNs
-
-	b.Logf("GC cost: %d ns/op", int(p[1]-p[0])/b.N)
-}
-
-func BenchmarkExpDecaySample257(b *testing.B) {
-	benchmarkSample(b, NewExpDecaySample(257, 0.015))
-}
-
-func BenchmarkExpDecaySample514(b *testing.B) {
-	benchmarkSample(b, NewExpDecaySample(514, 0.015))
-}
-
-func BenchmarkExpDecaySample1028(b *testing.B) {
-	benchmarkSample(b, NewExpDecaySample(1028, 0.015))
-}
-
-func BenchmarkUniformSample257(b *testing.B) {
-	benchmarkSample(b, NewUniformSample(257))
-}
-
-func BenchmarkUniformSample514(b *testing.B) {
-	benchmarkSample(b, NewUniformSample(514))
-}
-
-func BenchmarkUniformSample1028(b *testing.B) {
-	benchmarkSample(b, NewUniformSample(1028))
+	runtime.ReadMemStats(&memStats)
+	b.Logf("GC cost: %d ns/op", int(memStats.PauseTotalNs-pauseTotalNs)/b.N)
 }