| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- package metrics
- import (
- "container/heap"
- "math"
- "math/rand"
- "sync"
- "time"
- )
- const rescaleThreshold = 1e9 * 60 * 60
- // Samples maintain a statistically-significant selection of values from
- // a stream.
- //
- // This is an interface so as to encourage other structs to implement
- // the Sample API as appropriate.
- type Sample interface {
- Clear()
- Size() int
- Update(int64)
- Values() []int64
- }
- // An exponentially-decaying sample using a forward-decaying priority
- // reservoir. See Cormode et al's "Forward Decay: A Practical Time Decay
- // Model for Streaming Systems".
- //
- // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
- type ExpDecaySample struct {
- alpha float64
- mutex sync.Mutex
- reservoirSize int
- t0, t1 time.Time
- values expDecaySampleHeap
- }
- // Force the compiler to check that ExpDecaySample implements Sample.
- var _ Sample = &ExpDecaySample{}
- // Create a new exponentially-decaying sample with the given reservoir size
- // and alpha.
- func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
- if !ObserverEffect {
- return NilSample{}
- }
- s := &ExpDecaySample{
- alpha: alpha,
- reservoirSize: reservoirSize,
- t0: time.Now(),
- values: make(expDecaySampleHeap, 0, reservoirSize),
- }
- s.t1 = time.Now().Add(rescaleThreshold)
- return s
- }
- // Clear all samples.
- func (s *ExpDecaySample) Clear() {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
- s.t0 = time.Now()
- s.t1 = s.t0.Add(rescaleThreshold)
- }
- // Return the size of the sample, which is at most the reservoir size.
- func (s *ExpDecaySample) Size() int {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- return len(s.values)
- }
- // Update the sample with a new value.
- func (s *ExpDecaySample) Update(v int64) {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- if len(s.values) == s.reservoirSize {
- heap.Pop(&s.values)
- }
- t := time.Now()
- heap.Push(&s.values, expDecaySample{
- k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
- v: v,
- })
- if t.After(s.t1) {
- values := s.values
- t0 := s.t0
- s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
- s.t0 = t
- s.t1 = s.t0.Add(rescaleThreshold)
- for _, v := range values {
- v.k = v.k * math.Exp(-s.alpha*float64(s.t0.Sub(t0)))
- heap.Push(&s.values, v)
- }
- }
- }
- // Return all the values in the sample.
- func (s *ExpDecaySample) Values() []int64 {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- values := make([]int64, len(s.values))
- for i, v := range s.values {
- values[i] = v.v
- }
- return values
- }
- // No-op Sample.
- type NilSample struct{}
- // Force the compiler to check that ExpDecaySample implements Sample.
- var _ Sample = NilSample{}
- // No-op.
- func (s NilSample) Clear() {}
- // No-op.
- func (s NilSample) Size() int { return 0 }
- // No-op.
- func (s NilSample) Update(v int64) {}
- // No-op.
- func (s NilSample) Values() []int64 { return []int64{} }
- // A uniform sample using Vitter's Algorithm R.
- //
- // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
- type UniformSample struct {
- mutex sync.Mutex
- reservoirSize int
- values []int64
- }
- // Create a new uniform sample with the given reservoir size.
- func NewUniformSample(reservoirSize int) Sample {
- if !ObserverEffect {
- return NilSample{}
- }
- return &UniformSample{reservoirSize: reservoirSize}
- }
- // Clear all samples.
- func (s *UniformSample) Clear() {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- s.values = make([]int64, 0, s.reservoirSize)
- }
- // Return the size of the sample, which is at most the reservoir size.
- func (s *UniformSample) Size() int {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- return len(s.values)
- }
- // Update the sample with a new value.
- func (s *UniformSample) Update(v int64) {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- if len(s.values) < s.reservoirSize {
- s.values = append(s.values, v)
- } else {
- s.values[rand.Intn(s.reservoirSize)] = v
- }
- }
- // Return all the values in the sample.
- func (s *UniformSample) Values() []int64 {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- values := make([]int64, len(s.values))
- copy(values, s.values)
- return values
- }
- // An individual sample.
- type expDecaySample struct {
- k float64
- v int64
- }
- // A min-heap of samples.
- type expDecaySampleHeap []expDecaySample
- func (q expDecaySampleHeap) Len() int {
- return len(q)
- }
- func (q expDecaySampleHeap) Less(i, j int) bool {
- return q[i].k < q[j].k
- }
- func (q *expDecaySampleHeap) Pop() interface{} {
- q_ := *q
- n := len(q_)
- i := q_[n-1]
- q_ = q_[0 : n-1]
- *q = q_
- return i
- }
- func (q *expDecaySampleHeap) Push(x interface{}) {
- q_ := *q
- n := len(q_)
- q_ = q_[0 : n+1]
- q_[n] = x.(expDecaySample)
- *q = q_
- }
- func (q expDecaySampleHeap) Swap(i, j int) {
- q[i], q[j] = q[j], q[i]
- }
|