Procházet zdrojové kódy

Added the uniform sampler for percentiles.

Richard Crowley před 14 roky
rodič
revize
1410ebe031
5 změnil soubory, kde provedl 193 přidání a 27 odebrání
  1. 1 0
      Makefile
  2. 8 2
      cmd/metrics/metrics.go
  3. 3 3
      gauge.go
  4. 57 22
      histogram.go
  5. 124 0
      sample.go

+ 1 - 0
Makefile

@@ -9,6 +9,7 @@ GOFILES=\
 	meter.go\
 	metrics.go\
 	registry.go\
+	sample.go\
 	timer.go\
 
 include $(GOROOT)/src/Make.pkg

+ 8 - 2
cmd/metrics/metrics.go

@@ -56,7 +56,8 @@ func main() {
 	}
 */
 
-	h := metrics.NewHistogram()
+	s := metrics.NewUniformSample(1028)
+	h := metrics.NewHistogram(s)
 	r.RegisterHistogram("baz", h)
 	for i := 0; i < 1000; i++ {
 		go func() {
@@ -73,7 +74,12 @@ func main() {
 		}()
 	}
 	for {
-		fmt.Printf("h: %v %v %v %v %v %v\n", h.Count(), h.Sum(), h.Min(), h.Max(), h.StdDev(), h.Variance())
+		fmt.Printf(
+			"h: %v %v %v %v %v %v %v %v %v\n",
+			h.Count(), h.Sum(), h.Min(), h.Max(),
+			h.Percentile(95.0), h.Percentile(99.0), h.Percentile(99.9),
+			h.StdDev(), h.Variance(),
+		)
 		time.Sleep(500e6)
 	}
 

+ 3 - 3
gauge.go

@@ -15,8 +15,8 @@ func NewGauge() Gauge {
 	return g
 }
 
-func (g *gauge) Update(i int64) {
-	g.in <- i
+func (g *gauge) Update(v int64) {
+	g.in <- v
 }
 
 func (g *gauge) Value() int64 {
@@ -27,7 +27,7 @@ func (g *gauge) arbiter() {
 	var value int64
 	for {
 		select {
-		case i := <-g.in: value = i
+		case v := <-g.in: value = v
 		case g.out <- value:
 		}
 	}

+ 57 - 22
histogram.go

@@ -2,6 +2,7 @@ package metrics
 
 import (
 	"math"
+	"sort"
 )
 
 type Histogram interface {
@@ -11,6 +12,7 @@ type Histogram interface {
 	Mean() float64
 	Min() int64
 	Percentile(float64) float64
+	Percentiles([]float64) []float64
 	StdDev() float64
 	Sum() int64
 	Update(int64)
@@ -18,6 +20,7 @@ type Histogram interface {
 }
 
 type histogram struct {
+	s Sample
 	in chan int64
 	out chan histogramV
 	reset chan bool
@@ -28,12 +31,24 @@ type histogramV struct {
 	variance [2]float64
 }
 
-func NewHistogram() Histogram {
-	h := &histogram{make(chan int64), make(chan histogramV), make(chan bool)}
+func NewHistogram(s Sample) Histogram {
+	h := &histogram{
+		s,
+		make(chan int64),
+		make(chan histogramV),
+		make(chan bool),
+	}
 	go h.arbiter()
 	return h
 }
 
+func newHistogramV() histogramV {
+	return histogramV{
+		0, 0, math.MaxInt64, math.MinInt64,
+		[2]float64{-1.0, 0.0},
+	}
+}
+
 func (h *histogram) Clear() {
 	h.reset <- true
 }
@@ -63,9 +78,27 @@ func (h *histogram) Min() int64 {
 }
 
 func (h *histogram) Percentile(p float64) float64 {
-	// This requires sampling, which is more involved than I have time to
-	// implement this afternoon.
-	return 0.0
+	return h.Percentiles([]float64{p})[0]
+}
+
+func (h *histogram) Percentiles(ps []float64) []float64 {
+	scores := make([]float64, len(ps))
+	values := Int64Slice(h.s.Values())
+	size := len(values)
+	sort.Sort(values)
+	for i, p := range ps {
+		pos := p * float64(size + 1)
+		if pos < 1.0 {
+			scores[i] = float64(values[0])
+		} else if pos >= float64(size) {
+			scores[i] = float64(values[size - 1])
+		} else {
+			lower := float64(values[int(pos) - 1])
+			upper := float64(values[int(pos)])
+			scores[i] = lower + (pos - math.Floor(pos)) * (upper - lower)
+		}
+	}
+	return scores
 }
 
 func (h *histogram) StdDev() float64 {
@@ -76,8 +109,8 @@ func (h *histogram) Sum() int64 {
 	return (<-h.out).sum
 }
 
-func (h *histogram) Update(i int64) {
-	h.in <- i
+func (h *histogram) Update(v int64) {
+	h.in <- v
 }
 
 func (h *histogram) Variance() float64 {
@@ -90,30 +123,32 @@ func (h *histogram) arbiter() {
 	hv := newHistogramV()
 	for {
 		select {
-		case i := <-h.in:
+		case v := <-h.in:
+			h.s.Update(v)
 			hv.count++
-			if i < hv.min { hv.min = i }
-			if i > hv.max { hv.max = i }
-			hv.sum += i
-			f := float64(i)
+			if v < hv.min { hv.min = v }
+			if v > hv.max { hv.max = v }
+			hv.sum += v
+			fv := float64(v)
 			if -1.0 == hv.variance[0] {
-				hv.variance[0] = f
+				hv.variance[0] = fv
 				hv.variance[1] = 0.0
 			} else {
 				m := hv.variance[0]
 				s := hv.variance[1]
-				hv.variance[0] = m + (f - m) / float64(hv.count)
-				hv.variance[1] = s + (f - m) * (f - hv.variance[0])
+				hv.variance[0] = m + (fv - m) / float64(hv.count)
+				hv.variance[1] = s + (fv - m) * (fv - hv.variance[0])
 			}
 		case h.out <- hv:
-		case <- h.reset: hv = newHistogramV()
+		case <- h.reset:
+			h.s.Clear()
+			hv = newHistogramV()
 		}
 	}
 }
 
-func newHistogramV() histogramV {
-	return histogramV{
-		0, 0, math.MaxInt64, math.MinInt64,
-		[2]float64{-1.0, 0.0},
-	}
-}
+// Cribbed from the standard library's `sort` package.
+type Int64Slice []int64
+func (p Int64Slice) Len() int           { return len(p) }
+func (p Int64Slice) Less(i, j int) bool { return p[i] < p[j] }
+func (p Int64Slice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }

+ 124 - 0
sample.go

@@ -0,0 +1,124 @@
+package metrics
+
+import (
+	"rand"
+)
+
+type Sample interface {
+	Clear()
+	Count() int
+	Size() int
+	Update(int64)
+	Values() []int64
+}
+
+type expDecaySample struct {
+	reservoirSize int
+	alpha float64
+	count int
+	values []int64
+}
+
+func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
+	return &expDecaySample{
+		reservoirSize, alpha,
+		0,
+		make([]int64, reservoirSize),
+	}
+}
+
+func (s *expDecaySample) Clear() {
+}
+
+func (s *expDecaySample) Count() int {
+	return s.count
+}
+
+func (s *expDecaySample) Size() int {
+	return 0
+}
+
+func (s *expDecaySample) Update(v int64) {
+}
+
+func (s *expDecaySample) Values() []int64 {
+	return s.values // It might be worth copying this before returning it.
+}
+
+type uniformSample struct {
+	reservoirSize int
+	in chan int64
+	out chan sampleV
+	reset chan bool
+}
+
+func NewUniformSample(reservoirSize int) Sample {
+	s := &uniformSample{
+		reservoirSize,
+		make(chan int64),
+		make(chan sampleV),
+		make(chan bool),
+	}
+	go s.arbiter()
+	return s
+}
+
+func (s *uniformSample) Clear() {
+	s.reset <- true
+}
+
+func (s *uniformSample) Count() int {
+	return (<-s.out).count
+}
+
+func (s *uniformSample) Size() int {
+	return (<-s.out).size()
+}
+
+func (s *uniformSample) Update(v int64) {
+	s.in <- v
+}
+
+func (s *uniformSample) Values() []int64 {
+	return (<-s.out).values
+}
+
+func (s *uniformSample) arbiter() {
+	sv := newSampleV(s.reservoirSize)
+	for {
+		select {
+		case v := <-s.in:
+			sv.count++
+			if sv.count < s.reservoirSize {
+				sv.values[sv.count - 1] = v
+			} else {
+				sv.values[rand.Intn(s.reservoirSize)] = v
+			}
+		case s.out <- sv.dup():
+		case <-s.reset:
+			for i, _ := range sv.values { sv.values[i] = 0 }
+		}
+	}
+}
+
+type sampleV struct {
+	count int
+	values []int64
+}
+
+func newSampleV(reservoirSize int) sampleV {
+	return sampleV{0, make([]int64, reservoirSize)}
+}
+
+func (sv sampleV) dup() sampleV {
+	values := make([]int64, sv.size())
+	for i := 0; i < sv.size(); i++ { values[i] = sv.values[i] }
+	return sampleV{sv.count, values}
+}
+
+func (sv sampleV) size() int {
+	if sv.count < len(sv.values) {
+		return sv.count
+	}
+	return len(sv.values)
+}