|
|
@@ -30,7 +30,7 @@ type ExpDecaySample struct {
|
|
|
reservoirSize int
|
|
|
alpha float64
|
|
|
in chan int64
|
|
|
- out chan []int64
|
|
|
+ out chan chan []int64
|
|
|
reset chan bool
|
|
|
}
|
|
|
|
|
|
@@ -41,7 +41,7 @@ func NewExpDecaySample(reservoirSize int, alpha float64) *ExpDecaySample {
|
|
|
reservoirSize,
|
|
|
alpha,
|
|
|
make(chan int64),
|
|
|
- make(chan []int64),
|
|
|
+ make(chan chan []int64),
|
|
|
make(chan bool),
|
|
|
}
|
|
|
go s.arbiter()
|
|
|
@@ -55,7 +55,7 @@ func (s *ExpDecaySample) Clear() {
|
|
|
|
|
|
// Return the size of the sample, which is at most the reservoir size.
|
|
|
func (s *ExpDecaySample) Size() int {
|
|
|
- return len(<-s.out)
|
|
|
+ return len(s.Values())
|
|
|
}
|
|
|
|
|
|
// Update the sample with a new value.
|
|
|
@@ -65,7 +65,9 @@ func (s *ExpDecaySample) Update(v int64) {
|
|
|
|
|
|
// Return all the values in the sample.
|
|
|
func (s *ExpDecaySample) Values() []int64 {
|
|
|
- return <-s.out
|
|
|
+ c := make(chan []int64)
|
|
|
+ s.out <- c
|
|
|
+ return <-c
|
|
|
}
|
|
|
|
|
|
// An individual sample.
|
|
|
@@ -113,7 +115,6 @@ func (s *ExpDecaySample) arbiter() {
|
|
|
values := make(expDecaySampleHeap, 0, s.reservoirSize)
|
|
|
start := time.Now()
|
|
|
next := time.Now().Add(rescaleThreshold)
|
|
|
- var valuesCopy []int64
|
|
|
for {
|
|
|
select {
|
|
|
case v := <-s.in:
|
|
|
@@ -123,7 +124,6 @@ func (s *ExpDecaySample) arbiter() {
|
|
|
now := time.Now()
|
|
|
k := math.Exp(now.Sub(start).Seconds()*s.alpha) / rand.Float64()
|
|
|
heap.Push(&values, expDecaySample{k: k, v: v})
|
|
|
- valuesCopy = make([]int64, len(values))
|
|
|
if now.After(next) {
|
|
|
oldValues := values
|
|
|
oldStart := start
|
|
|
@@ -135,13 +135,14 @@ func (s *ExpDecaySample) arbiter() {
|
|
|
heap.Push(&values, e)
|
|
|
}
|
|
|
}
|
|
|
+ case c := <-s.out:
|
|
|
+ valuesCopy := make([]int64, len(values))
|
|
|
for i, e := range values {
|
|
|
valuesCopy[i] = e.v
|
|
|
}
|
|
|
- case s.out <- valuesCopy: // TODO Might need to make another copy here.
|
|
|
+ c <- valuesCopy
|
|
|
case <-s.reset:
|
|
|
values = make(expDecaySampleHeap, 0, s.reservoirSize)
|
|
|
- valuesCopy = make([]int64, 0)
|
|
|
start = time.Now()
|
|
|
next = start.Add(rescaleThreshold)
|
|
|
}
|
|
|
@@ -154,7 +155,7 @@ func (s *ExpDecaySample) arbiter() {
|
|
|
type UniformSample struct {
|
|
|
reservoirSize int
|
|
|
in chan int64
|
|
|
- out chan []int64
|
|
|
+ out chan chan []int64
|
|
|
reset chan bool
|
|
|
}
|
|
|
|
|
|
@@ -163,7 +164,7 @@ func NewUniformSample(reservoirSize int) *UniformSample {
|
|
|
s := &UniformSample{
|
|
|
reservoirSize,
|
|
|
make(chan int64),
|
|
|
- make(chan []int64),
|
|
|
+ make(chan chan []int64),
|
|
|
make(chan bool),
|
|
|
}
|
|
|
go s.arbiter()
|
|
|
@@ -177,7 +178,7 @@ func (s *UniformSample) Clear() {
|
|
|
|
|
|
// Return the size of the sample, which is at most the reservoir size.
|
|
|
func (s *UniformSample) Size() int {
|
|
|
- return len(<-s.out)
|
|
|
+ return len(s.Values())
|
|
|
}
|
|
|
|
|
|
// Update the sample with a new value.
|
|
|
@@ -187,34 +188,33 @@ func (s *UniformSample) Update(v int64) {
|
|
|
|
|
|
// Return all the values in the sample.
|
|
|
func (s *UniformSample) Values() []int64 {
|
|
|
- return <-s.out
|
|
|
+ c := make(chan []int64)
|
|
|
+ s.out <- c
|
|
|
+ return <-c
|
|
|
}
|
|
|
|
|
|
// Receive inputs and send outputs. Count and save each input value at a
|
|
|
// random index. Send a copy of the values as output.
|
|
|
func (s *UniformSample) arbiter() {
|
|
|
- count := 0
|
|
|
- values := make([]int64, s.reservoirSize)
|
|
|
- var valuesCopy []int64
|
|
|
+ values := make([]int64, 0, s.reservoirSize)
|
|
|
for {
|
|
|
+ n := len(values)
|
|
|
select {
|
|
|
case v := <-s.in:
|
|
|
- if count < s.reservoirSize {
|
|
|
- values[count] = v
|
|
|
- count++
|
|
|
- valuesCopy = make([]int64, count)
|
|
|
+ if n < s.reservoirSize {
|
|
|
+ values = values[0 : n+1]
|
|
|
+ values[n] = v
|
|
|
} else {
|
|
|
values[rand.Intn(s.reservoirSize)] = v
|
|
|
- valuesCopy = make([]int64, len(values))
|
|
|
}
|
|
|
- for i := 0; i < len(valuesCopy); i++ {
|
|
|
+ case c := <-s.out:
|
|
|
+ valuesCopy := make([]int64, n)
|
|
|
+ for i := 0; i < n; i++ {
|
|
|
valuesCopy[i] = values[i]
|
|
|
}
|
|
|
- case s.out <- valuesCopy: // TODO Might need to make another copy here.
|
|
|
+ c <- valuesCopy
|
|
|
case <-s.reset:
|
|
|
- count = 0
|
|
|
- values = make([]int64, s.reservoirSize)
|
|
|
- valuesCopy = make([]int64, 0)
|
|
|
+ values = make([]int64, 0, s.reservoirSize)
|
|
|
}
|
|
|
}
|
|
|
}
|