| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- package metrics
- import (
- "container/heap"
- "math"
- "math/rand"
- "time"
- )
- const rescaleThreshold = 1e9 * 60 * 60
- // Samples maintain a statistically-significant selection of values from
- // a stream.
- //
- // This is an interface so as to encourage other structs to implement
- // the Sample API as appropriate.
- type Sample interface {
- Clear()
- Size() int
- Update(int64)
- Values() []int64
- }
- // An exponentially-decaying sample using a forward-decaying priority
- // reservoir. See Cormode et al's "Forward Decay: A Practical Time Decay
- // Model for Streaming Systems".
- //
- // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
- type ExpDecaySample struct {
- reservoirSize int
- alpha float64
- in chan int64
- out chan chan []int64
- reset chan bool
- }
- // Force the compiler to check that ExpDecaySample implements Sample.
- var _ Sample = &ExpDecaySample{}
- // Create a new exponentially-decaying sample with the given reservoir size
- // and alpha.
- func NewExpDecaySample(reservoirSize int, alpha float64) *ExpDecaySample {
- s := &ExpDecaySample{
- reservoirSize,
- alpha,
- make(chan int64),
- make(chan chan []int64),
- make(chan bool),
- }
- go s.arbiter()
- return s
- }
- // Clear all samples.
- func (s *ExpDecaySample) Clear() {
- s.reset <- true
- }
- // Return the size of the sample, which is at most the reservoir size.
- func (s *ExpDecaySample) Size() int {
- return len(s.Values())
- }
- // Update the sample with a new value.
- func (s *ExpDecaySample) Update(v int64) {
- s.in <- v
- }
- // Return all the values in the sample.
- func (s *ExpDecaySample) Values() []int64 {
- c := make(chan []int64)
- s.out <- c
- return <-c
- }
- // An individual sample.
- type expDecaySample struct {
- k float64
- v int64
- }
- // A min-heap of samples.
- type expDecaySampleHeap []expDecaySample
- func (q expDecaySampleHeap) Len() int {
- return len(q)
- }
- func (q expDecaySampleHeap) Less(i, j int) bool {
- return q[i].k < q[j].k
- }
- func (q expDecaySampleHeap) Swap(i, j int) {
- q[i], q[j] = q[j], q[i]
- }
- func (q *expDecaySampleHeap) Push(x interface{}) {
- q_ := *q
- n := len(q_)
- q_ = q_[0 : n+1]
- q_[n] = x.(expDecaySample)
- *q = q_
- }
- func (q *expDecaySampleHeap) Pop() interface{} {
- q_ := *q
- n := len(q_)
- i := q_[n-1]
- q_ = q_[0 : n-1]
- *q = q_
- return i
- }
- // Receive inputs and send outputs. Count and save each input value,
- // rescaling the sample if enough time has elapsed since the last rescaling.
- // Send a copy of the values as output.
- func (s *ExpDecaySample) arbiter() {
- values := make(expDecaySampleHeap, 0, s.reservoirSize)
- start := time.Now()
- next := time.Now().Add(rescaleThreshold)
- for {
- select {
- case v := <-s.in:
- if len(values) == s.reservoirSize {
- heap.Pop(&values)
- }
- now := time.Now()
- k := math.Exp(now.Sub(start).Seconds()*s.alpha) / rand.Float64()
- heap.Push(&values, expDecaySample{k: k, v: v})
- if now.After(next) {
- oldValues := values
- oldStart := start
- values = make(expDecaySampleHeap, 0, s.reservoirSize)
- start = time.Now()
- next = start.Add(rescaleThreshold)
- for _, e := range oldValues {
- e.k = e.k * math.Exp(-s.alpha*float64(start.Sub(oldStart)))
- heap.Push(&values, e)
- }
- }
- case c := <-s.out:
- valuesCopy := make([]int64, len(values))
- for i, e := range values {
- valuesCopy[i] = e.v
- }
- c <- valuesCopy
- case <-s.reset:
- values = make(expDecaySampleHeap, 0, s.reservoirSize)
- start = time.Now()
- next = start.Add(rescaleThreshold)
- }
- }
- }
- // A uniform sample using Vitter's Algorithm R.
- //
- // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
- type UniformSample struct {
- reservoirSize int
- in chan int64
- out chan chan []int64
- reset chan bool
- }
- // Create a new uniform sample with the given reservoir size.
- func NewUniformSample(reservoirSize int) *UniformSample {
- s := &UniformSample{
- reservoirSize,
- make(chan int64),
- make(chan chan []int64),
- make(chan bool),
- }
- go s.arbiter()
- return s
- }
- // Clear all samples.
- func (s *UniformSample) Clear() {
- s.reset <- true
- }
- // Return the size of the sample, which is at most the reservoir size.
- func (s *UniformSample) Size() int {
- return len(s.Values())
- }
- // Update the sample with a new value.
- func (s *UniformSample) Update(v int64) {
- s.in <- v
- }
- // Return all the values in the sample.
- func (s *UniformSample) Values() []int64 {
- c := make(chan []int64)
- s.out <- c
- return <-c
- }
- // Receive inputs and send outputs. Count and save each input value at a
- // random index. Send a copy of the values as output.
- func (s *UniformSample) arbiter() {
- values := make([]int64, 0, s.reservoirSize)
- for {
- n := len(values)
- select {
- case v := <-s.in:
- if n < s.reservoirSize {
- values = values[0 : n+1]
- values[n] = v
- } else {
- values[rand.Intn(s.reservoirSize)] = v
- }
- case c := <-s.out:
- valuesCopy := make([]int64, n)
- for i := 0; i < n; i++ {
- valuesCopy[i] = values[i]
- }
- c <- valuesCopy
- case <-s.reset:
- values = make([]int64, 0, s.reservoirSize)
- }
- }
- }
|