|
@@ -18,21 +18,20 @@ type EWMA interface {
|
|
|
|
|
|
|
|
// The standard implementation of an EWMA tracks the number of uncounted
|
|
// The standard implementation of an EWMA tracks the number of uncounted
|
|
|
// events and processes them on each tick. It uses the sync/atomic package
|
|
// events and processes them on each tick. It uses the sync/atomic package
|
|
|
-// to manage uncounted events. When the latest weeklies land in a release,
|
|
|
|
|
-// atomic.LoadInt64 will be available and this code will become safe on
|
|
|
|
|
-// 32-bit architectures.
|
|
|
|
|
|
|
+// to manage uncounted events.
|
|
|
type StandardEWMA struct {
|
|
type StandardEWMA struct {
|
|
|
alpha float64
|
|
alpha float64
|
|
|
uncounted int64
|
|
uncounted int64
|
|
|
rate float64
|
|
rate float64
|
|
|
initialized bool
|
|
initialized bool
|
|
|
tick chan bool
|
|
tick chan bool
|
|
|
|
|
+ out chan float64
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Create a new EWMA with the given alpha. Create the clock channel and
|
|
// Create a new EWMA with the given alpha. Create the clock channel and
|
|
|
// start the ticker goroutine.
|
|
// start the ticker goroutine.
|
|
|
func NewEWMA(alpha float64) *StandardEWMA {
|
|
func NewEWMA(alpha float64) *StandardEWMA {
|
|
|
- a := &StandardEWMA{alpha, 0, 0.0, false, make(chan bool)}
|
|
|
|
|
|
|
+ a := &StandardEWMA{alpha, 0, 0.0, false, make(chan bool), make(chan float64)}
|
|
|
go a.ticker()
|
|
go a.ticker()
|
|
|
return a
|
|
return a
|
|
|
}
|
|
}
|
|
@@ -70,15 +69,19 @@ func (a *StandardEWMA) Update(n int64) {
|
|
|
// On each clock tick, update the moving average to reflect the number of
|
|
// On each clock tick, update the moving average to reflect the number of
|
|
|
// events seen since the last tick.
|
|
// events seen since the last tick.
|
|
|
func (a *StandardEWMA) ticker() {
|
|
func (a *StandardEWMA) ticker() {
|
|
|
- for <-a.tick {
|
|
|
|
|
- count := a.uncounted
|
|
|
|
|
- atomic.AddInt64(&a.uncounted, -count)
|
|
|
|
|
- instantRate := float64(count) / float64(5e9)
|
|
|
|
|
- if a.initialized {
|
|
|
|
|
- a.rate += a.alpha * (instantRate - a.rate)
|
|
|
|
|
- } else {
|
|
|
|
|
- a.initialized = true
|
|
|
|
|
- a.rate = instantRate
|
|
|
|
|
|
|
+ for {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-a.tick:
|
|
|
|
|
+ count := atomic.LoadInt64(&a.uncounted)
|
|
|
|
|
+ atomic.AddInt64(&a.uncounted, -count)
|
|
|
|
|
+ instantRate := float64(count) / float64(5e9)
|
|
|
|
|
+ if a.initialized {
|
|
|
|
|
+ a.rate += a.alpha * (instantRate - a.rate)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ a.initialized = true
|
|
|
|
|
+ a.rate = instantRate
|
|
|
|
|
+ }
|
|
|
|
|
+ case a.out <- a.rate:
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|