|
|
@@ -18,22 +18,19 @@ type EWMA interface {
|
|
|
|
|
|
// The standard implementation of an EWMA tracks the number of uncounted
|
|
|
// events and processes them on each tick. It uses the sync/atomic package
|
|
|
-// to manage uncounted events. When the latest weeklies land in a release,
|
|
|
-// atomic.LoadInt64 will be available and this code will become safe on
|
|
|
-// 32-bit architectures.
|
|
|
+// to manage uncounted events.
|
|
|
type StandardEWMA struct {
|
|
|
alpha float64
|
|
|
uncounted int64
|
|
|
- rate float64
|
|
|
- initialized bool
|
|
|
- tick chan bool
|
|
|
+ in chan bool
|
|
|
+ out chan float64
|
|
|
}
|
|
|
|
|
|
// Create a new EWMA with the given alpha. Create the clock channel and
|
|
|
// start the ticker goroutine.
|
|
|
func NewEWMA(alpha float64) *StandardEWMA {
|
|
|
- a := &StandardEWMA{alpha, 0, 0.0, false, make(chan bool)}
|
|
|
- go a.ticker()
|
|
|
+ a := &StandardEWMA{alpha, 0, make(chan bool), make(chan float64)}
|
|
|
+ go a.arbiter()
|
|
|
return a
|
|
|
}
|
|
|
|
|
|
@@ -54,12 +51,12 @@ func NewEWMA15() *StandardEWMA {
|
|
|
|
|
|
// Return the moving average rate of events per second.
|
|
|
func (a *StandardEWMA) Rate() float64 {
|
|
|
- return a.rate * float64(1e9)
|
|
|
+ return <-a.out * float64(1e9)
|
|
|
}
|
|
|
|
|
|
// Tick the clock to update the moving average.
|
|
|
func (a *StandardEWMA) Tick() {
|
|
|
- a.tick <- true
|
|
|
+ a.in <- true
|
|
|
}
|
|
|
|
|
|
// Add n uncounted events.
|
|
|
@@ -69,16 +66,22 @@ func (a *StandardEWMA) Update(n int64) {
|
|
|
|
|
|
// On each clock tick, update the moving average to reflect the number of
|
|
|
// events seen since the last tick.
|
|
|
-func (a *StandardEWMA) ticker() {
|
|
|
- for <-a.tick {
|
|
|
- count := a.uncounted
|
|
|
- atomic.AddInt64(&a.uncounted, -count)
|
|
|
- instantRate := float64(count) / float64(5e9)
|
|
|
- if a.initialized {
|
|
|
- a.rate += a.alpha * (instantRate - a.rate)
|
|
|
- } else {
|
|
|
- a.initialized = true
|
|
|
- a.rate = instantRate
|
|
|
+func (a *StandardEWMA) arbiter() {
|
|
|
+ var initialized bool
|
|
|
+ var rate float64
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-a.in:
|
|
|
+ count := atomic.LoadInt64(&a.uncounted)
|
|
|
+ atomic.AddInt64(&a.uncounted, -count)
|
|
|
+ instantRate := float64(count) / float64(5e9)
|
|
|
+ if initialized {
|
|
|
+ rate += a.alpha * (instantRate - rate)
|
|
|
+ } else {
|
|
|
+ initialized = true
|
|
|
+ rate = instantRate
|
|
|
+ }
|
|
|
+ case a.out <- rate:
|
|
|
}
|
|
|
}
|
|
|
}
|