|
|
@@ -2,7 +2,7 @@ package metrics
|
|
|
|
|
|
import (
|
|
|
"math"
|
|
|
- "rand"
|
|
|
+ "math/rand"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
@@ -27,10 +27,10 @@ type Sample interface {
|
|
|
// <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
|
|
|
type ExpDecaySample struct {
|
|
|
reservoirSize int
|
|
|
- alpha float64
|
|
|
- in chan int64
|
|
|
- out chan []int64
|
|
|
- reset chan bool
|
|
|
+ alpha float64
|
|
|
+ in chan int64
|
|
|
+ out chan []int64
|
|
|
+ reset chan bool
|
|
|
}
|
|
|
|
|
|
// Create a new exponentially-decaying sample with the given reservoir size
|
|
|
@@ -73,36 +73,37 @@ func (s *ExpDecaySample) Values() []int64 {
|
|
|
func (s *ExpDecaySample) arbiter() {
|
|
|
count := 0
|
|
|
values := make(map[float64]int64)
|
|
|
- tsStart := time.Seconds()
|
|
|
- tsNext := time.Nanoseconds() + rescaleThreshold
|
|
|
+ start := time.Now()
|
|
|
+ next := time.Now().Add(rescaleThreshold)
|
|
|
var valuesCopy []int64
|
|
|
for {
|
|
|
select {
|
|
|
case v := <-s.in:
|
|
|
- ts := time.Seconds()
|
|
|
- k := math.Exp(float64(ts - tsStart) * s.alpha) / rand.Float64()
|
|
|
+ now := time.Now()
|
|
|
+ k := math.Exp(float64(now.Sub(start))*s.alpha) / rand.Float64()
|
|
|
count++
|
|
|
values[k] = v
|
|
|
if count > s.reservoirSize {
|
|
|
min := math.MaxFloat64
|
|
|
for k, _ := range values {
|
|
|
- if k < min { min = k }
|
|
|
+ if k < min {
|
|
|
+ min = k
|
|
|
+ }
|
|
|
}
|
|
|
- values[min] = 0, false
|
|
|
+ delete(values, min)
|
|
|
valuesCopy = make([]int64, s.reservoirSize)
|
|
|
} else {
|
|
|
valuesCopy = make([]int64, count)
|
|
|
}
|
|
|
- tsNano := time.Nanoseconds()
|
|
|
- if tsNano > tsNext {
|
|
|
- tsOldStart := tsStart
|
|
|
- tsStart = time.Seconds()
|
|
|
- tsNext = tsNano + rescaleThreshold
|
|
|
+ if now.After(next) {
|
|
|
+ oldStart := start
|
|
|
+ start = time.Now()
|
|
|
+ next = now.Add(rescaleThreshold)
|
|
|
oldValues := values
|
|
|
values = make(map[float64]int64, len(oldValues))
|
|
|
for k, v := range oldValues {
|
|
|
- values[k * math.Exp(-s.alpha * float64(
|
|
|
- tsStart - tsOldStart))] = v
|
|
|
+ values[k*math.Exp(-s.alpha*float64(
|
|
|
+ start.Sub(oldStart)))] = v
|
|
|
}
|
|
|
}
|
|
|
i := 0
|
|
|
@@ -115,8 +116,8 @@ func (s *ExpDecaySample) arbiter() {
|
|
|
count = 0
|
|
|
values = make(map[float64]int64)
|
|
|
valuesCopy = make([]int64, 0)
|
|
|
- tsStart = time.Seconds()
|
|
|
- tsNext = tsStart + rescaleThreshold
|
|
|
+ start = time.Now()
|
|
|
+ next = start.Add(rescaleThreshold)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -126,9 +127,9 @@ func (s *ExpDecaySample) arbiter() {
|
|
|
// <http://www.cs.umd.edu/~samir/498/vitter.pdf>
|
|
|
type UniformSample struct {
|
|
|
reservoirSize int
|
|
|
- in chan int64
|
|
|
- out chan []int64
|
|
|
- reset chan bool
|
|
|
+ in chan int64
|
|
|
+ out chan []int64
|
|
|
+ reset chan bool
|
|
|
}
|
|
|
|
|
|
// Create a new uniform sample with the given reservoir size.
|
|
|
@@ -174,13 +175,15 @@ func (s *UniformSample) arbiter() {
|
|
|
case v := <-s.in:
|
|
|
count++
|
|
|
if count < s.reservoirSize {
|
|
|
- values[count - 1] = v
|
|
|
+ values[count-1] = v
|
|
|
valuesCopy = make([]int64, count)
|
|
|
} else {
|
|
|
values[rand.Intn(s.reservoirSize)] = v
|
|
|
valuesCopy = make([]int64, len(values))
|
|
|
}
|
|
|
- for i := 0; i < len(valuesCopy); i++ { valuesCopy[i] = values[i] }
|
|
|
+ for i := 0; i < len(valuesCopy); i++ {
|
|
|
+ valuesCopy[i] = values[i]
|
|
|
+ }
|
|
|
case s.out <- valuesCopy: // TODO Might need to make another copy here.
|
|
|
case <-s.reset:
|
|
|
count = 0
|