Przeglądaj źródła

Finished exponentially-decaying sample.

Richard Crowley 14 lat temu
rodzic
commit
eee4cf072b
2 zmienionych plików z 89 dodań i 51 usunięć
  1. 2 1
      cmd/metrics/metrics.go
  2. 87 50
      sample.go

+ 2 - 1
cmd/metrics/metrics.go

@@ -56,7 +56,8 @@ func main() {
 	}
 */
 
-	s := metrics.NewUniformSample(1028)
+	s := metrics.NewExpDecaySample(1028, 0.015)
+//	s := metrics.NewUniformSample(1028)
 	h := metrics.NewHistogram(s)
 	r.RegisterHistogram("baz", h)
 	for i := 0; i < 1000; i++ {

+ 87 - 50
sample.go

@@ -1,12 +1,15 @@
 package metrics
 
 import (
+	"math"
 	"rand"
+	"time"
 )
 
+const rescaleThreshold = 1e9 * 60 * 60
+
 type Sample interface {
 	Clear()
-	Count() int
 	Size() int
 	Update(int64)
 	Values() []int64
@@ -15,40 +18,93 @@ type Sample interface {
 type expDecaySample struct {
 	reservoirSize int
 	alpha float64
-	count int
-	values []int64
+	in chan int64
+	out chan []int64
+	reset chan bool
 }
 
 func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
-	return &expDecaySample{
-		reservoirSize, alpha,
-		0,
-		make([]int64, reservoirSize),
+	s := &expDecaySample{
+		reservoirSize,
+		alpha,
+		make(chan int64),
+		make(chan []int64),
+		make(chan bool),
 	}
+	go s.arbiter()
+	return s
 }
 
 func (s *expDecaySample) Clear() {
-}
-
-func (s *expDecaySample) Count() int {
-	return s.count
+	s.reset <- true
 }
 
 func (s *expDecaySample) Size() int {
-	return 0
+	return len(<-s.out)
 }
 
 func (s *expDecaySample) Update(v int64) {
+	s.in <- v
 }
 
 func (s *expDecaySample) Values() []int64 {
-	return s.values // It might be worth copying this before returning it.
+	return <-s.out
+}
+
+func (s *expDecaySample) arbiter() {
+	count := 0
+	values := make(map[float64]int64)
+	tsStart := time.Nanoseconds()
+	tsNext := tsStart + rescaleThreshold
+	var valuesCopy []int64
+	for {
+		select {
+		case v := <-s.in:
+			ts := time.Nanoseconds()
+			k := math.Exp(float64(ts - tsStart) * s.alpha) / rand.Float64()
+			count++
+			values[k] = v
+			if count > s.reservoirSize {
+				min := math.MaxFloat64
+				for k, _ := range values {
+					if k < min { min = k }
+				}
+				values[min] = 0, false
+				valuesCopy = make([]int64, s.reservoirSize)
+			} else {
+				valuesCopy = make([]int64, count)
+			}
+			if ts > tsNext {
+				tsOldStart := tsStart
+				tsStart = time.Nanoseconds()
+				tsNext = ts + rescaleThreshold
+				oldValues := values
+				values = make(map[float64]int64, len(oldValues))
+				for k, v := range oldValues {
+					values[k * math.Exp(-s.alpha * float64(
+						tsStart - tsOldStart))] = v
+				}
+			}
+			i := 0
+			for _, v := range values {
+				valuesCopy[i] = v
+				i++
+			}
+		case s.out <- valuesCopy: // TODO Might need to make another copy here.
+		case <-s.reset:
+			count = 0
+			values = make(map[float64]int64)
+			valuesCopy = make([]int64, 0)
+			tsStart = time.Nanoseconds()
+			tsNext = tsStart + 1e9 * 60 * 60
+		}
+	}
 }
 
 type uniformSample struct {
 	reservoirSize int
 	in chan int64
-	out chan sampleV
+	out chan []int64
 	reset chan bool
 }
 
@@ -56,7 +112,7 @@ func NewUniformSample(reservoirSize int) Sample {
 	s := &uniformSample{
 		reservoirSize,
 		make(chan int64),
-		make(chan sampleV),
+		make(chan []int64),
 		make(chan bool),
 	}
 	go s.arbiter()
@@ -67,12 +123,8 @@ func (s *uniformSample) Clear() {
 	s.reset <- true
 }
 
-func (s *uniformSample) Count() int {
-	return (<-s.out).count
-}
-
 func (s *uniformSample) Size() int {
-	return (<-s.out).size()
+	return len(<-s.out)
 }
 
 func (s *uniformSample) Update(v int64) {
@@ -80,45 +132,30 @@ func (s *uniformSample) Update(v int64) {
 }
 
 func (s *uniformSample) Values() []int64 {
-	return (<-s.out).values
+	return <-s.out
 }
 
 func (s *uniformSample) arbiter() {
-	sv := newSampleV(s.reservoirSize)
+	count := 0
+	values := make([]int64, s.reservoirSize)
+	var valuesCopy []int64
 	for {
 		select {
 		case v := <-s.in:
-			sv.count++
-			if sv.count < s.reservoirSize {
-				sv.values[sv.count - 1] = v
+			count++
+			if count < s.reservoirSize {
+				values[count - 1] = v
+				valuesCopy = make([]int64, count)
 			} else {
-				sv.values[rand.Intn(s.reservoirSize)] = v
+				values[rand.Intn(s.reservoirSize)] = v
+				valuesCopy = make([]int64, len(values))
 			}
-		case s.out <- sv.dup():
+			for i := 0; i < len(valuesCopy); i++ { valuesCopy[i] = values[i] }
+		case s.out <- valuesCopy: // TODO Might need to make another copy here.
 		case <-s.reset:
-			for i, _ := range sv.values { sv.values[i] = 0 }
+			count = 0
+			values = make([]int64, s.reservoirSize)
+			valuesCopy = make([]int64, 0)
 		}
 	}
 }
-
-type sampleV struct {
-	count int
-	values []int64
-}
-
-func newSampleV(reservoirSize int) sampleV {
-	return sampleV{0, make([]int64, reservoirSize)}
-}
-
-func (sv sampleV) dup() sampleV {
-	values := make([]int64, sv.size())
-	for i := 0; i < sv.size(); i++ { values[i] = sv.values[i] }
-	return sampleV{sv.count, values}
-}
-
-func (sv sampleV) size() int {
-	if sv.count < len(sv.values) {
-		return sv.count
-	}
-	return len(sv.values)
-}