|
|
@@ -2,6 +2,7 @@ package metrics
|
|
|
|
|
|
import (
|
|
|
"math"
|
|
|
+ "sync"
|
|
|
"sync/atomic"
|
|
|
)
|
|
|
|
|
|
@@ -21,9 +22,10 @@ type EWMA interface {
|
|
|
// to manage uncounted events.
|
|
|
type StandardEWMA struct {
|
|
|
alpha float64
|
|
|
+ init bool
|
|
|
+ mutex *sync.Mutex
|
|
|
+ rate float64
|
|
|
uncounted int64
|
|
|
- in chan bool
|
|
|
- out chan float64
|
|
|
}
|
|
|
|
|
|
// Force the compiler to check that StandardEWMA implements EWMA.
|
|
|
@@ -32,9 +34,10 @@ var _ EWMA = &StandardEWMA{}
|
|
|
// Create a new EWMA with the given alpha. Create the clock channel and
|
|
|
// start the ticker goroutine.
|
|
|
func NewEWMA(alpha float64) *StandardEWMA {
|
|
|
- a := &StandardEWMA{alpha, 0, make(chan bool), make(chan float64)}
|
|
|
- go a.arbiter()
|
|
|
- return a
|
|
|
+ return &StandardEWMA{
|
|
|
+ alpha: alpha,
|
|
|
+ mutex: &sync.Mutex{},
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Create a new EWMA with alpha set for a one-minute moving average.
|
|
|
@@ -54,37 +57,25 @@ func NewEWMA15() *StandardEWMA {
|
|
|
|
|
|
// Return the moving average rate of events per second.
|
|
|
func (a *StandardEWMA) Rate() float64 {
|
|
|
- return <-a.out * float64(1e9)
|
|
|
+ return a.rate * float64(1e9)
|
|
|
}
|
|
|
|
|
|
// Tick the clock to update the moving average.
|
|
|
func (a *StandardEWMA) Tick() {
|
|
|
- a.in <- true
|
|
|
+ count := atomic.LoadInt64(&a.uncounted)
|
|
|
+ atomic.AddInt64(&a.uncounted, -count)
|
|
|
+ instantRate := float64(count) / float64(5e9)
|
|
|
+ a.mutex.Lock()
|
|
|
+ defer a.mutex.Unlock()
|
|
|
+ if a.init {
|
|
|
+ a.rate += a.alpha * (instantRate - a.rate)
|
|
|
+ } else {
|
|
|
+ a.init = true
|
|
|
+ a.rate = instantRate
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Add n uncounted events.
|
|
|
func (a *StandardEWMA) Update(n int64) {
|
|
|
atomic.AddInt64(&a.uncounted, n)
|
|
|
}
|
|
|
-
|
|
|
-// On each clock tick, update the moving average to reflect the number of
|
|
|
-// events seen since the last tick.
|
|
|
-func (a *StandardEWMA) arbiter() {
|
|
|
- var initialized bool
|
|
|
- var rate float64
|
|
|
- for {
|
|
|
- select {
|
|
|
- case <-a.in:
|
|
|
- count := atomic.LoadInt64(&a.uncounted)
|
|
|
- atomic.AddInt64(&a.uncounted, -count)
|
|
|
- instantRate := float64(count) / float64(5e9)
|
|
|
- if initialized {
|
|
|
- rate += a.alpha * (instantRate - rate)
|
|
|
- } else {
|
|
|
- initialized = true
|
|
|
- rate = instantRate
|
|
|
- }
|
|
|
- case a.out <- rate:
|
|
|
- }
|
|
|
- }
|
|
|
-}
|