瀏覽代碼

Use a lock instead of a goroutine for Samples.

Richard Crowley 12 年之前
父節點
當前提交
307b27664c
共有 1 個文件被更改,包括 69 次插入96 次删除
  1. 69 96
      sample.go

+ 69 - 96
sample.go

@@ -4,6 +4,7 @@ import (
 	"container/heap"
 	"container/heap"
 	"math"
 	"math"
 	"math/rand"
 	"math/rand"
+	"sync"
 	"time"
 	"time"
 )
 )
 
 
@@ -27,11 +28,11 @@ type Sample interface {
 //
 //
 // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
 // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
 type ExpDecaySample struct {
 type ExpDecaySample struct {
-	reservoirSize int
 	alpha         float64
 	alpha         float64
-	in            chan int64
-	out           chan chan []int64
-	reset         chan bool
+	mutex         *sync.Mutex
+	reservoirSize int
+	t0, t1        time.Time
+	values        expDecaySampleHeap
 }
 }
 
 
 // Force the compiler to check that ExpDecaySample implements Sample.
 // Force the compiler to check that ExpDecaySample implements Sample.
@@ -41,145 +42,117 @@ var _ Sample = &ExpDecaySample{}
 // and alpha.
 // and alpha.
 func NewExpDecaySample(reservoirSize int, alpha float64) *ExpDecaySample {
 func NewExpDecaySample(reservoirSize int, alpha float64) *ExpDecaySample {
 	s := &ExpDecaySample{
 	s := &ExpDecaySample{
-		reservoirSize,
-		alpha,
-		make(chan int64),
-		make(chan chan []int64),
-		make(chan bool),
+		alpha:         alpha,
+		mutex:         &sync.Mutex{},
+		reservoirSize: reservoirSize,
+		t0:            time.Now(),
+		values:        make(expDecaySampleHeap, 0, reservoirSize),
 	}
 	}
-	go s.arbiter()
+	s.t1 = time.Now().Add(rescaleThreshold)
 	return s
 	return s
 }
 }
 
 
 // Clear all samples.
 // Clear all samples.
 func (s *ExpDecaySample) Clear() {
 func (s *ExpDecaySample) Clear() {
-	s.reset <- true
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
+	s.t0 = time.Now()
+	s.t1 = s.t0.Add(rescaleThreshold)
 }
 }
 
 
 // Return the size of the sample, which is at most the reservoir size.
 // Return the size of the sample, which is at most the reservoir size.
 func (s *ExpDecaySample) Size() int {
 func (s *ExpDecaySample) Size() int {
-	return len(s.Values())
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	return len(s.values)
 }
 }
 
 
 // Update the sample with a new value.
 // Update the sample with a new value.
 func (s *ExpDecaySample) Update(v int64) {
 func (s *ExpDecaySample) Update(v int64) {
-	s.in <- v
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	if len(s.values) == s.reservoirSize {
+		heap.Pop(&s.values)
+	}
+	t := time.Now()
+	heap.Push(&s.values, expDecaySample{
+		k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
+		v: v,
+	})
+	if t.After(s.t1) {
+		values := s.values
+		t0 := s.t0
+		s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
+		s.t0 = t
+		s.t1 = s.t0.Add(rescaleThreshold)
+		for _, v := range values {
+			v.k = v.k * math.Exp(-s.alpha*float64(s.t0.Sub(t0)))
+			heap.Push(&values, v)
+		}
+	}
 }
 }
 
 
 // Return all the values in the sample.
 // Return all the values in the sample.
 func (s *ExpDecaySample) Values() []int64 {
 func (s *ExpDecaySample) Values() []int64 {
-	ch := make(chan []int64)
-	s.out <- ch
-	return <-ch
-}
-
-// Receive inputs and send outputs.  Count and save each input value,
-// rescaling the sample if enough time has elapsed since the last rescaling.
-// Send a copy of the values as output.
-func (s *ExpDecaySample) arbiter() {
-	values := make(expDecaySampleHeap, 0, s.reservoirSize)
-	start := time.Now()
-	next := time.Now().Add(rescaleThreshold)
-	for {
-		select {
-		case v := <-s.in:
-			if len(values) == s.reservoirSize {
-				heap.Pop(&values)
-			}
-			now := time.Now()
-			k := math.Exp(now.Sub(start).Seconds()*s.alpha) / rand.Float64()
-			heap.Push(&values, expDecaySample{k: k, v: v})
-			if now.After(next) {
-				oldValues := values
-				oldStart := start
-				values = make(expDecaySampleHeap, 0, s.reservoirSize)
-				start = time.Now()
-				next = start.Add(rescaleThreshold)
-				for _, e := range oldValues {
-					e.k = e.k * math.Exp(-s.alpha*float64(start.Sub(oldStart)))
-					heap.Push(&values, e)
-				}
-			}
-		case ch := <-s.out:
-			valuesCopy := make([]int64, len(values))
-			for i, e := range values {
-				valuesCopy[i] = e.v
-			}
-			ch <- valuesCopy
-		case <-s.reset:
-			values = make(expDecaySampleHeap, 0, s.reservoirSize)
-			start = time.Now()
-			next = start.Add(rescaleThreshold)
-		}
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	values := make([]int64, len(s.values))
+	for i, v := range s.values {
+		values[i] = v.v
 	}
 	}
+	return values
 }
 }
 
 
 // A uniform sample using Vitter's Algorithm R.
 // A uniform sample using Vitter's Algorithm R.
 //
 //
 // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
 // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
 type UniformSample struct {
 type UniformSample struct {
+	mutex         *sync.Mutex
 	reservoirSize int
 	reservoirSize int
-	in            chan int64
-	out           chan chan []int64
-	reset         chan bool
+	values        []int64
 }
 }
 
 
 // Create a new uniform sample with the given reservoir size.
 // Create a new uniform sample with the given reservoir size.
 func NewUniformSample(reservoirSize int) *UniformSample {
 func NewUniformSample(reservoirSize int) *UniformSample {
-	s := &UniformSample{
-		reservoirSize,
-		make(chan int64),
-		make(chan chan []int64),
-		make(chan bool),
+	return &UniformSample{
+		mutex:         &sync.Mutex{},
+		reservoirSize: reservoirSize,
 	}
 	}
-	go s.arbiter()
-	return s
 }
 }
 
 
 // Clear all samples.
 // Clear all samples.
 func (s *UniformSample) Clear() {
 func (s *UniformSample) Clear() {
-	s.reset <- true
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	s.values = make([]int64, 0, s.reservoirSize)
 }
 }
 
 
 // Return the size of the sample, which is at most the reservoir size.
 // Return the size of the sample, which is at most the reservoir size.
 func (s *UniformSample) Size() int {
 func (s *UniformSample) Size() int {
-	return len(s.Values())
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	return len(s.values)
 }
 }
 
 
 // Update the sample with a new value.
 // Update the sample with a new value.
 func (s *UniformSample) Update(v int64) {
 func (s *UniformSample) Update(v int64) {
-	s.in <- v
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	if len(s.values) < s.reservoirSize {
+		s.values = append(s.values, v)
+	} else {
+		s.values[rand.Intn(s.reservoirSize)] = v
+	}
 }
 }
 
 
 // Return all the values in the sample.
 // Return all the values in the sample.
 func (s *UniformSample) Values() []int64 {
 func (s *UniformSample) Values() []int64 {
-	ch := make(chan []int64)
-	s.out <- ch
-	return <-ch
-}
-
-// Receive inputs and send outputs.  Count and save each input value at a
-// random index.  Send a copy of the values as output.
-func (s *UniformSample) arbiter() {
-	values := make([]int64, 0, s.reservoirSize)
-	for {
-		n := len(values)
-		select {
-		case v := <-s.in:
-			if n < s.reservoirSize {
-				values = values[0 : n+1]
-				values[n] = v
-			} else {
-				values[rand.Intn(s.reservoirSize)] = v
-			}
-		case ch := <-s.out:
-			valuesCopy := make([]int64, n)
-			copy(valuesCopy, values[:n])
-			ch <- valuesCopy
-		case <-s.reset:
-			values = make([]int64, 0, s.reservoirSize)
-		}
-	}
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	values := make([]int64, len(s.values))
+	copy(values, s.values)
+	return values
 }
 }
 
 
 // An individual sample.
 // An individual sample.