|
|
@@ -1,6 +1,7 @@
|
|
|
package metrics
|
|
|
|
|
|
import (
|
|
|
+ "container/heap"
|
|
|
"math"
|
|
|
"math/rand"
|
|
|
"time"
|
|
|
@@ -67,54 +68,79 @@ func (s *ExpDecaySample) Values() []int64 {
|
|
|
return <-s.out
|
|
|
}
|
|
|
|
|
|
+// An individual sample.
|
|
|
+type expDecaySample struct {
|
|
|
+ k float64
|
|
|
+ v int64
|
|
|
+}
|
|
|
+
|
|
|
+// A min-heap of samples.
|
|
|
+type expDecaySampleHeap []expDecaySample
|
|
|
+
|
|
|
+func (q expDecaySampleHeap) Len() int {
|
|
|
+ return len(q)
|
|
|
+}
|
|
|
+
|
|
|
+func (q expDecaySampleHeap) Less(i, j int) bool {
|
|
|
+ return q[i].k < q[j].k
|
|
|
+}
|
|
|
+
|
|
|
+func (q expDecaySampleHeap) Swap(i, j int) {
|
|
|
+ q[i], q[j] = q[j], q[i]
|
|
|
+}
|
|
|
+
|
|
|
+func (q *expDecaySampleHeap) Push(x interface{}) {
|
|
|
+ q_ := *q
|
|
|
+ n := len(q_)
|
|
|
+ q_ = q_[0 : n+1]
|
|
|
+ q_[n] = x.(expDecaySample)
|
|
|
+ *q = q_
|
|
|
+}
|
|
|
+
|
|
|
+func (q *expDecaySampleHeap) Pop() interface{} {
|
|
|
+ q_ := *q
|
|
|
+ n := len(q_)
|
|
|
+ i := q_[n-1]
|
|
|
+ q_ = q_[0 : n-1]
|
|
|
+ *q = q_
|
|
|
+ return i
|
|
|
+}
|
|
|
+
|
|
|
// Receive inputs and send outputs. Count and save each input value,
|
|
|
// rescaling the sample if enough time has elapsed since the last rescaling.
|
|
|
// Send a copy of the values as output.
|
|
|
func (s *ExpDecaySample) arbiter() {
|
|
|
- count := 0
|
|
|
- values := make(map[float64]int64)
|
|
|
+ values := make(expDecaySampleHeap, 0, s.reservoirSize)
|
|
|
start := time.Now()
|
|
|
next := time.Now().Add(rescaleThreshold)
|
|
|
var valuesCopy []int64
|
|
|
for {
|
|
|
select {
|
|
|
case v := <-s.in:
|
|
|
+ if len(values) == s.reservoirSize {
|
|
|
+ heap.Pop(&values)
|
|
|
+ }
|
|
|
now := time.Now()
|
|
|
k := math.Exp(now.Sub(start).Seconds()*s.alpha) / rand.Float64()
|
|
|
- count++
|
|
|
- values[k] = v
|
|
|
- if count > s.reservoirSize {
|
|
|
- min := math.MaxFloat64
|
|
|
- for k, _ := range values {
|
|
|
- if k < min {
|
|
|
- min = k
|
|
|
- }
|
|
|
- }
|
|
|
- delete(values, min)
|
|
|
- valuesCopy = make([]int64, s.reservoirSize)
|
|
|
- } else {
|
|
|
- valuesCopy = make([]int64, count)
|
|
|
- }
|
|
|
+ heap.Push(&values, expDecaySample{k: k, v: v})
|
|
|
+ valuesCopy = make([]int64, len(values))
|
|
|
if now.After(next) {
|
|
|
+ oldValues := values
|
|
|
oldStart := start
|
|
|
+ values = make(expDecaySampleHeap, 0, s.reservoirSize)
|
|
|
start = time.Now()
|
|
|
- next = now.Add(rescaleThreshold)
|
|
|
- oldValues := values
|
|
|
- values = make(map[float64]int64, len(oldValues))
|
|
|
- for k, v := range oldValues {
|
|
|
- values[k*math.Exp(-s.alpha*float64(
|
|
|
- start.Sub(oldStart)))] = v
|
|
|
+ next = start.Add(rescaleThreshold)
|
|
|
+ for _, e := range oldValues {
|
|
|
+ e.k = e.k * math.Exp(-s.alpha*float64(start.Sub(oldStart)))
|
|
|
+ heap.Push(&values, e)
|
|
|
}
|
|
|
}
|
|
|
- i := 0
|
|
|
- for _, v := range values {
|
|
|
- valuesCopy[i] = v
|
|
|
- i++
|
|
|
+ for i, e := range values {
|
|
|
+ valuesCopy[i] = e.v
|
|
|
}
|
|
|
case s.out <- valuesCopy: // TODO Might need to make another copy here.
|
|
|
case <-s.reset:
|
|
|
- count = 0
|
|
|
- values = make(map[float64]int64)
|
|
|
+ values = make(expDecaySampleHeap, 0, s.reservoirSize)
|
|
|
valuesCopy = make([]int64, 0)
|
|
|
start = time.Now()
|
|
|
next = start.Add(rescaleThreshold)
|