|
|
@@ -3,6 +3,8 @@ package metrics
|
|
|
import (
|
|
|
"math"
|
|
|
"sort"
|
|
|
+ "sync"
|
|
|
+ "sync/atomic"
|
|
|
)
|
|
|
|
|
|
// Histograms calculate distribution statistics from an int64 value.
|
|
|
@@ -25,80 +27,75 @@ type Histogram interface {
|
|
|
// The standard implementation of a Histogram uses a Sample and a goroutine
|
|
|
// to synchronize its calculations.
|
|
|
type StandardHistogram struct {
|
|
|
- s Sample
|
|
|
- in chan int64
|
|
|
- out chan histogramV
|
|
|
- reset chan bool
|
|
|
+ count, sum, min, max int64
|
|
|
+ mutex *sync.Mutex
|
|
|
+ s Sample
|
|
|
+ variance [2]float64
|
|
|
}
|
|
|
|
|
|
// Force the compiler to check that StandardHistogram implements Histogram.
|
|
|
var _ Histogram = &StandardHistogram{}
|
|
|
|
|
|
-// A histogramV contains all the values that would need to be passed back
|
|
|
-// from the synchronizing goroutine.
|
|
|
-type histogramV struct {
|
|
|
- count, sum, min, max int64
|
|
|
- variance [2]float64
|
|
|
-}
|
|
|
-
|
|
|
-// Create a new histogram with the given Sample. Create the communication
|
|
|
-// channels and start the synchronizing goroutine.
|
|
|
+// Create a new histogram with the given Sample. The initial values compare
|
|
|
+// so that the first value will be both min and max and the variance is flagged
|
|
|
+// for special treatment on its first iteration.
|
|
|
func NewHistogram(s Sample) *StandardHistogram {
|
|
|
- h := &StandardHistogram{
|
|
|
- s,
|
|
|
- make(chan int64),
|
|
|
- make(chan histogramV),
|
|
|
- make(chan bool),
|
|
|
- }
|
|
|
- go h.arbiter()
|
|
|
- return h
|
|
|
-}
|
|
|
-
|
|
|
-// Create a new histogramV. The initial values compare so that the first
|
|
|
-// value will be both min and max and the variance is flagged for special
|
|
|
-// treatment on its first iteration.
|
|
|
-func newHistogramV() histogramV {
|
|
|
- return histogramV{
|
|
|
- 0, 0, math.MaxInt64, math.MinInt64,
|
|
|
- [2]float64{-1.0, 0.0},
|
|
|
+ return &StandardHistogram{
|
|
|
+ count: 0,
|
|
|
+ max: math.MinInt64,
|
|
|
+ min: math.MaxInt64,
|
|
|
+ mutex: &sync.Mutex{},
|
|
|
+ s: s,
|
|
|
+ sum: 0,
|
|
|
+ variance: [2]float64{-1.0, 0.0},
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Clear the histogram.
|
|
|
func (h *StandardHistogram) Clear() {
|
|
|
- h.reset <- true
|
|
|
+ h.s.Clear()
|
|
|
+ h.mutex.Lock()
|
|
|
+ defer h.mutex.Unlock()
|
|
|
+ h.count = 0
|
|
|
+ h.max = math.MinInt64
|
|
|
+ h.min = math.MaxInt64
|
|
|
+ h.sum = 0
|
|
|
+ h.variance = [...]float64{-1.0, 0.0}
|
|
|
}
|
|
|
|
|
|
// Return the count of inputs since the histogram was last cleared.
|
|
|
func (h *StandardHistogram) Count() int64 {
|
|
|
- return (<-h.out).count
|
|
|
+ return atomic.LoadInt64(&h.count)
|
|
|
}
|
|
|
|
|
|
// Return the maximal value seen since the histogram was last cleared.
|
|
|
func (h *StandardHistogram) Max() int64 {
|
|
|
- hv := <-h.out
|
|
|
- if 0 < hv.count {
|
|
|
- return hv.max
|
|
|
+ h.mutex.Lock()
|
|
|
+ defer h.mutex.Unlock()
|
|
|
+ if 0 == h.count {
|
|
|
+ return 0
|
|
|
}
|
|
|
- return 0
|
|
|
+ return h.max
|
|
|
}
|
|
|
|
|
|
// Return the mean of all values seen since the histogram was last cleared.
|
|
|
func (h *StandardHistogram) Mean() float64 {
|
|
|
- hv := <-h.out
|
|
|
- if 0 < hv.count {
|
|
|
- return float64(hv.sum) / float64(hv.count)
|
|
|
+ h.mutex.Lock()
|
|
|
+ defer h.mutex.Unlock()
|
|
|
+ if 0 == h.count {
|
|
|
+ return 0
|
|
|
}
|
|
|
- return 0
|
|
|
+ return float64(h.sum) / float64(h.count)
|
|
|
}
|
|
|
|
|
|
// Return the minimal value seen since the histogram was last cleared.
|
|
|
func (h *StandardHistogram) Min() int64 {
|
|
|
- hv := <-h.out
|
|
|
- if 0 < hv.count {
|
|
|
- return hv.min
|
|
|
+ h.mutex.Lock()
|
|
|
+ defer h.mutex.Unlock()
|
|
|
+ if 0 == h.count {
|
|
|
+ return 0
|
|
|
}
|
|
|
- return 0
|
|
|
+ return h.min
|
|
|
}
|
|
|
|
|
|
// Return an arbitrary percentile of all values seen since the histogram was
|
|
|
@@ -139,50 +136,37 @@ func (h *StandardHistogram) StdDev() float64 {
|
|
|
|
|
|
// Update the histogram with a new value.
|
|
|
func (h *StandardHistogram) Update(v int64) {
|
|
|
- h.in <- v
|
|
|
+ h.mutex.Lock()
|
|
|
+ defer h.mutex.Unlock()
|
|
|
+ h.s.Update(v)
|
|
|
+ h.count++
|
|
|
+ if v < h.min {
|
|
|
+ h.min = v
|
|
|
+ }
|
|
|
+ if v > h.max {
|
|
|
+ h.max = v
|
|
|
+ }
|
|
|
+ h.sum += v
|
|
|
+ fv := float64(v)
|
|
|
+ if -1.0 == h.variance[0] {
|
|
|
+ h.variance[0] = fv
|
|
|
+ h.variance[1] = 0.0
|
|
|
+ } else {
|
|
|
+ m := h.variance[0]
|
|
|
+ s := h.variance[1]
|
|
|
+ h.variance[0] = m + (fv-m)/float64(h.count)
|
|
|
+ h.variance[1] = s + (fv-m)*(fv-h.variance[0])
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Return the variance of all values seen since the histogram was last cleared.
|
|
|
func (h *StandardHistogram) Variance() float64 {
|
|
|
- hv := <-h.out
|
|
|
- if 1 >= hv.count {
|
|
|
+ h.mutex.Lock()
|
|
|
+ defer h.mutex.Unlock()
|
|
|
+ if 1 >= h.count {
|
|
|
return 0.0
|
|
|
}
|
|
|
- return hv.variance[1] / float64(hv.count-1)
|
|
|
-}
|
|
|
-
|
|
|
-// Receive inputs and send outputs. Sample each input and update values in
|
|
|
-// the histogramV. Send a copy of the histogramV as output.
|
|
|
-func (h *StandardHistogram) arbiter() {
|
|
|
- hv := newHistogramV()
|
|
|
- for {
|
|
|
- select {
|
|
|
- case v := <-h.in:
|
|
|
- h.s.Update(v)
|
|
|
- hv.count++
|
|
|
- if v < hv.min {
|
|
|
- hv.min = v
|
|
|
- }
|
|
|
- if v > hv.max {
|
|
|
- hv.max = v
|
|
|
- }
|
|
|
- hv.sum += v
|
|
|
- fv := float64(v)
|
|
|
- if -1.0 == hv.variance[0] {
|
|
|
- hv.variance[0] = fv
|
|
|
- hv.variance[1] = 0.0
|
|
|
- } else {
|
|
|
- m := hv.variance[0]
|
|
|
- s := hv.variance[1]
|
|
|
- hv.variance[0] = m + (fv-m)/float64(hv.count)
|
|
|
- hv.variance[1] = s + (fv-m)*(fv-hv.variance[0])
|
|
|
- }
|
|
|
- case h.out <- hv:
|
|
|
- case <-h.reset:
|
|
|
- h.s.Clear()
|
|
|
- hv = newHistogramV()
|
|
|
- }
|
|
|
- }
|
|
|
+ return h.variance[1] / float64(h.count-1)
|
|
|
}
|
|
|
|
|
|
// Cribbed from the standard library's `sort` package.
|