فهرست منبع

Expand the Sample interface and implementations.

Now all the statistics that Histograms report can be driven directly by Samples.
Richard Crowley 12 سال پیش
والد
کامیت
5a4179c70b
2فایلهای تغییر یافته به همراه356 افزوده شده و 12 حذف شده
  1. 251 12
      sample.go
  2. 105 0
      sample_test.go

+ 251 - 12
sample.go

@@ -4,7 +4,9 @@ import (
 	"container/heap"
 	"math"
 	"math/rand"
+	"sort"
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
@@ -17,26 +19,36 @@ const rescaleThreshold = 1e9 * 60 * 60
 // the Sample API as appropriate.
 type Sample interface {
 	Clear()
+	Count() int64
+	Dup() Sample
+	Max() int64
+	Mean() float64
+	Min() int64
+	Percentile(float64) float64
+	Percentiles([]float64) []float64
 	Size() int
+	StdDev() float64
 	Update(int64)
 	Values() []int64
+	Variance() float64
 }
 
-// An exponentially-decaying sample using a forward-decaying priority
-// reservoir.  See Cormode et al's "Forward Decay: A Practical Time Decay
-// Model for Streaming Systems".
+// ExpDecaySample is an exponentially-decaying sample using a forward-decaying
+// priority reservoir.  See Cormode et al's "Forward Decay: A Practical Time
+// Decay Model for Streaming Systems".
 //
 // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
 type ExpDecaySample struct {
 	alpha         float64
+	count         int64
 	mutex         sync.Mutex
 	reservoirSize int
 	t0, t1        time.Time
 	values        expDecaySampleHeap
 }
 
-// Create a new exponentially-decaying sample with the given reservoir size
-// and alpha.
+// NewExpDecaySample constructs a new exponentially-decaying sample with the
+// given reservoir size and alpha.
 func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
 	if UseNilMetrics {
 		return NilSample{}
@@ -51,26 +63,82 @@ func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
 	return s
 }
 
-// Clear all samples.
+// Clear clears all samples.
 func (s *ExpDecaySample) Clear() {
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
-	s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
+	s.count = 0
 	s.t0 = time.Now()
 	s.t1 = s.t0.Add(rescaleThreshold)
+	s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
 }
 
-// Return the size of the sample, which is at most the reservoir size.
+// Dup returns a copy of the sample.
+func (s *ExpDecaySample) Dup() Sample {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	values := make(expDecaySampleHeap, len(s.values))
+	copy(values, s.values)
+	return &ExpDecaySample{
+		alpha:         s.alpha,
+		count:         s.count,
+		reservoirSize: s.reservoirSize,
+		t0:            s.t0,
+		t1:            s.t1,
+		values:        values,
+	}
+}
+
+// Count returns the number of samples recorded, which may exceed the
+// reservoir size.
+func (s *ExpDecaySample) Count() int64 {
+	return atomic.LoadInt64(&s.count)
+}
+
+// Max returns the maximum value in the sample, which may not be the maximum
+// value ever to be part of the sample.
+func (s *ExpDecaySample) Max() int64 {
+	return max(s.Values())
+}
+
+// Return the mean of all values seen since the histogram was last cleared.
+func (s *ExpDecaySample) Mean() float64 {
+	return mean(s.Values())
+}
+
+// Min returns the minimum value in the sample, which may not be the minimum
+// value ever to be part of the sample.
+func (s *ExpDecaySample) Min() int64 {
+	return min(s.Values())
+}
+
+// Percentile returns an arbitrary percentile of sampled values.
+func (s *ExpDecaySample) Percentile(p float64) float64 {
+	return s.Percentiles([]float64{p})[0]
+}
+
+// Percentiles returns a slice of arbitrary percentiles of sampled values.
+func (s *ExpDecaySample) Percentiles(ps []float64) []float64 {
+	return percentiles(s.Values(), ps)
+}
+
+// Size returns the size of the sample, which is at most the reservoir size.
 func (s *ExpDecaySample) Size() int {
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
 	return len(s.values)
 }
 
-// Update the sample with a new value.
+// StdDev returns the standard deviation of the sample.
+func (s *ExpDecaySample) StdDev() float64 {
+	return math.Sqrt(s.Variance())
+}
+
+// Update samples a new value.
 func (s *ExpDecaySample) Update(v int64) {
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
+	s.count++
 	if len(s.values) == s.reservoirSize {
 		heap.Pop(&s.values)
 	}
@@ -92,7 +160,7 @@ func (s *ExpDecaySample) Update(v int64) {
 	}
 }
 
-// Return all the values in the sample.
+// Values returns a copy of the values in the sample.
 func (s *ExpDecaySample) Values() []int64 {
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
@@ -103,25 +171,60 @@ func (s *ExpDecaySample) Values() []int64 {
 	return values
 }
 
+// Variance returns the variance of the sample.
+func (s *ExpDecaySample) Variance() float64 {
+	return variance(s.Values())
+}
+
 // No-op Sample.
 type NilSample struct{}
 
 // No-op.
 func (NilSample) Clear() {}
 
+// No-op.
+func (NilSample) Count() int64 { return 0 }
+
+// No-op.
+func (NilSample) Dup() Sample { return NilSample{} }
+
+// No-op.
+func (NilSample) Max() int64 { return 0 }
+
+// No-op.
+func (NilSample) Mean() float64 { return 0.0 }
+
+// No-op.
+func (NilSample) Min() int64 { return 0 }
+
+// No-op.
+func (NilSample) Percentile(p float64) float64 { return 0.0 }
+
+// No-op.
+func (NilSample) Percentiles(ps []float64) []float64 {
+	return make([]float64, len(ps))
+}
+
 // No-op.
 func (NilSample) Size() int { return 0 }
 
+// No-op.
+func (NilSample) StdDev() float64 { return 0.0 }
+
 // No-op.
 func (NilSample) Update(v int64) {}
 
 // No-op.
 func (NilSample) Values() []int64 { return []int64{} }
 
+// No-op.
+func (NilSample) Variance() float64 { return 0.0 }
+
 // A uniform sample using Vitter's Algorithm R.
 //
 // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
 type UniformSample struct {
+	count         int64
 	mutex         sync.Mutex
 	reservoirSize int
 	values        []int64
@@ -139,9 +242,64 @@ func NewUniformSample(reservoirSize int) Sample {
 func (s *UniformSample) Clear() {
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
+	s.count = 0
 	s.values = make([]int64, 0, s.reservoirSize)
 }
 
+// Count returns the number of samples recorded, which may exceed the
+// reservoir size.
+func (s *UniformSample) Count() int64 {
+	return atomic.LoadInt64(&s.count)
+}
+
+// Dup returns a copy of the sample.
+func (s *UniformSample) Dup() Sample {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	values := make([]int64, len(s.values))
+	copy(values, s.values)
+	return &UniformSample{
+		count:         s.count,
+		reservoirSize: s.reservoirSize,
+		values:        values,
+	}
+}
+
+// Max returns the maximum value in the sample, which may not be the maximum
+// value ever to be part of the sample.
+func (s *UniformSample) Max() int64 {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	return max(s.values)
+}
+
+// Return the mean of all values seen since the histogram was last cleared.
+func (s *UniformSample) Mean() float64 {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	return mean(s.values)
+}
+
+// Min returns the minimum value in the sample, which may not be the minimum
+// value ever to be part of the sample.
+func (s *UniformSample) Min() int64 {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	return min(s.values)
+}
+
+// Percentile returns an arbitrary percentile of sampled values.
+func (s *UniformSample) Percentile(p float64) float64 {
+	return s.Percentiles([]float64{p})[0]
+}
+
+// Percentiles returns a slice of arbitrary percentiles of sampled values.
+func (s *UniformSample) Percentiles(ps []float64) []float64 {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	return percentiles(s.values, ps)
+}
+
 // Return the size of the sample, which is at most the reservoir size.
 func (s *UniformSample) Size() int {
 	s.mutex.Lock()
@@ -149,10 +307,16 @@ func (s *UniformSample) Size() int {
 	return len(s.values)
 }
 
+// StdDev returns the standard deviation of the sample.
+func (s *UniformSample) StdDev() float64 {
+	return math.Sqrt(s.Variance())
+}
+
 // Update the sample with a new value.
 func (s *UniformSample) Update(v int64) {
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
+	s.count++
 	if len(s.values) < s.reservoirSize {
 		s.values = append(s.values, v)
 	} else {
@@ -169,13 +333,20 @@ func (s *UniformSample) Values() []int64 {
 	return values
 }
 
-// An individual sample.
+// Variance returns the variance of the sample.
+func (s *UniformSample) Variance() float64 {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	return variance(s.values)
+}
+
+// expDecaySample represents an individual sample in a heap.
 type expDecaySample struct {
 	k float64
 	v int64
 }
 
-// A min-heap of samples.
+// expDecaySampleHeap is a min-heap of expDecaySamples.
 type expDecaySampleHeap []expDecaySample
 
 func (q expDecaySampleHeap) Len() int {
@@ -206,3 +377,71 @@ func (q *expDecaySampleHeap) Push(x interface{}) {
 func (q expDecaySampleHeap) Swap(i, j int) {
 	q[i], q[j] = q[j], q[i]
 }
+
+func max(values []int64) int64 {
+	if 0 == len(values) {
+		return 0
+	}
+	var max int64 = math.MinInt64
+	for _, v := range values {
+		if max < v {
+			max = v
+		}
+	}
+	return max
+}
+
+func mean(values []int64) float64 {
+	if 0 == len(values) {
+		return 0.0
+	}
+	var sum int64
+	for _, v := range values {
+		sum += v
+	}
+	return float64(sum) / float64(len(values))
+}
+
+func min(values []int64) int64 {
+	if 0 == len(values) {
+		return 0
+	}
+	var min int64 = math.MaxInt64
+	for _, v := range values {
+		if min > v {
+			min = v
+		}
+	}
+	return min
+}
+
+func percentiles(values int64Slice, ps []float64) []float64 {
+	scores := make([]float64, len(ps))
+	size := len(values)
+	if size > 0 {
+		sort.Sort(values)
+		for i, p := range ps {
+			pos := p * float64(size+1)
+			if pos < 1.0 {
+				scores[i] = float64(values[0])
+			} else if pos >= float64(size) {
+				scores[i] = float64(values[size-1])
+			} else {
+				lower := float64(values[int(pos)-1])
+				upper := float64(values[int(pos)])
+				scores[i] = lower + (pos-math.Floor(pos))*(upper-lower)
+			}
+		}
+	}
+	return scores
+}
+
+func variance(values []int64) float64 {
+	m := mean(values)
+	var sum float64
+	for _, v := range values {
+		d := float64(v) - m
+		sum += d * d
+	}
+	return sum / float64(len(values))
+}

+ 105 - 0
sample_test.go

@@ -1,6 +1,7 @@
 package metrics
 
 import (
+	"math/rand"
 	"runtime"
 	"testing"
 	"time"
@@ -31,10 +32,14 @@ func BenchmarkUniformSample1028(b *testing.B) {
 }
 
 func TestExpDecaySample10(t *testing.T) {
+	rand.Seed(1)
 	s := NewExpDecaySample(100, 0.99)
 	for i := 0; i < 10; i++ {
 		s.Update(int64(i))
 	}
+	if size := s.Count(); 10 != size {
+		t.Errorf("s.Count(): 10 != %v\n", size)
+	}
 	if size := s.Size(); 10 != size {
 		t.Errorf("s.Size(): 10 != %v\n", size)
 	}
@@ -49,10 +54,14 @@ func TestExpDecaySample10(t *testing.T) {
 }
 
 func TestExpDecaySample100(t *testing.T) {
+	rand.Seed(1)
 	s := NewExpDecaySample(1000, 0.01)
 	for i := 0; i < 100; i++ {
 		s.Update(int64(i))
 	}
+	if size := s.Count(); 100 != size {
+		t.Errorf("s.Count(): 100 != %v\n", size)
+	}
 	if size := s.Size(); 100 != size {
 		t.Errorf("s.Size(): 100 != %v\n", size)
 	}
@@ -67,10 +76,14 @@ func TestExpDecaySample100(t *testing.T) {
 }
 
 func TestExpDecaySample1000(t *testing.T) {
+	rand.Seed(1)
 	s := NewExpDecaySample(100, 0.99)
 	for i := 0; i < 1000; i++ {
 		s.Update(int64(i))
 	}
+	if size := s.Count(); 1000 != size {
+		t.Errorf("s.Count(): 1000 != %v\n", size)
+	}
 	if size := s.Size(); 100 != size {
 		t.Errorf("s.Size(): 100 != %v\n", size)
 	}
@@ -84,11 +97,22 @@ func TestExpDecaySample1000(t *testing.T) {
 	}
 }
 
+func TestExpDecaySampleDup(t *testing.T) {
+	s1 := NewExpDecaySample(100, 0.99)
+	s1.Update(1)
+	s2 := s1.Dup()
+	s1.Update(1)
+	if 1 != s2.Size() {
+		t.Fatal(s2)
+	}
+}
+
 // This test makes sure that the sample's priority is not amplified by using
 // nanosecond duration since start rather than second duration since start.
 // The priority becomes +Inf quickly after starting if this is done,
 // effectively freezing the set of samples until a rescale step happens.
 func TestExpDecaySampleNanosecondRegression(t *testing.T) {
+	rand.Seed(1)
 	s := NewExpDecaySample(100, 0.99)
 	for i := 0; i < 100; i++ {
 		s.Update(10)
@@ -108,11 +132,48 @@ func TestExpDecaySampleNanosecondRegression(t *testing.T) {
 	}
 }
 
+func TestExpDecaySampleStatistics(t *testing.T) {
+	rand.Seed(1)
+	s := NewExpDecaySample(100, 0.99)
+	for i := 1; i <= 10000; i++ {
+		s.Update(int64(i))
+	}
+	if count := s.Count(); 10000 != count {
+		t.Errorf("s.Count(): 10000 != %v\n", count)
+	}
+	if min := s.Min(); 107 != min {
+		t.Errorf("s.Min(): 107 != %v\n", min)
+	}
+	if max := s.Max(); 10000 != max {
+		t.Errorf("s.Max(): 10000 != %v\n", max)
+	}
+	if mean := s.Mean(); 4965.98 != mean {
+		t.Errorf("s.Mean(): 4965.98 != %v\n", mean)
+	}
+	if stdDev := s.StdDev(); 2959.825156930727 != stdDev {
+		t.Errorf("s.StdDev(): 2959.825156930727 != %v\n", stdDev)
+	}
+	ps := s.Percentiles([]float64{0.5, 0.75, 0.99})
+	if 4615 != ps[0] {
+		t.Errorf("median: 4615 != %v\n", ps[0])
+	}
+	if 7672 != ps[1] {
+		t.Errorf("75th percentile: 7672 != %v\n", ps[1])
+	}
+	if 9998.99 != ps[2] {
+		t.Errorf("99th percentile: 9998.99 != %v\n", ps[2])
+	}
+}
+
 func TestUniformSample(t *testing.T) {
+	rand.Seed(1)
 	s := NewUniformSample(100)
 	for i := 0; i < 1000; i++ {
 		s.Update(int64(i))
 	}
+	if size := s.Count(); 1000 != size {
+		t.Errorf("s.Count(): 1000 != %v\n", size)
+	}
 	if size := s.Size(); 100 != size {
 		t.Errorf("s.Size(): 100 != %v\n", size)
 	}
@@ -126,7 +187,18 @@ func TestUniformSample(t *testing.T) {
 	}
 }
 
+func TestUniformSampleDup(t *testing.T) {
+	s1 := NewUniformSample(100)
+	s1.Update(1)
+	s2 := s1.Dup()
+	s1.Update(1)
+	if 1 != s2.Size() {
+		t.Fatal(s2)
+	}
+}
+
 func TestUniformSampleIncludesTail(t *testing.T) {
+	rand.Seed(1)
 	s := NewUniformSample(100)
 	max := 100
 
@@ -147,6 +219,39 @@ func TestUniformSampleIncludesTail(t *testing.T) {
 	}
 }
 
+func TestUniformSampleStatistics(t *testing.T) {
+	rand.Seed(1)
+	s := NewUniformSample(100)
+	for i := 1; i <= 10000; i++ {
+		s.Update(int64(i))
+	}
+	if count := s.Count(); 10000 != count {
+		t.Errorf("s.Count(): 10000 != %v\n", count)
+	}
+	if min := s.Min(); 9412 != min {
+		t.Errorf("s.Min(): 9412 != %v\n", min)
+	}
+	if max := s.Max(); 10000 != max {
+		t.Errorf("s.Max(): 10000 != %v\n", max)
+	}
+	if mean := s.Mean(); 9902.26 != mean {
+		t.Errorf("s.Mean(): 9902.26 != %v\n", mean)
+	}
+	if stdDev := s.StdDev(); 101.8667384380201 != stdDev {
+		t.Errorf("s.StdDev(): 101.8667384380201 != %v\n", stdDev)
+	}
+	ps := s.Percentiles([]float64{0.5, 0.75, 0.99})
+	if 9930.5 != ps[0] {
+		t.Errorf("median: 9930.5 != %v\n", ps[0])
+	}
+	if 9973.75 != ps[1] {
+		t.Errorf("75th percentile: 9973.75 != %v\n", ps[1])
+	}
+	if 9999.99 != ps[2] {
+		t.Errorf("99th percentile: 9999.99 != %v\n", ps[2])
+	}
+}
+
 func benchmarkSample(b *testing.B, s Sample) {
 	var memStats runtime.MemStats
 	runtime.ReadMemStats(&memStats)