sample.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package metrics
  2. import (
  3. "container/heap"
  4. "math"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. )
  9. const rescaleThreshold = 1e9 * 60 * 60
  10. // Samples maintain a statistically-significant selection of values from
  11. // a stream.
  12. //
  13. // This is an interface so as to encourage other structs to implement
  14. // the Sample API as appropriate.
  15. type Sample interface {
  16. Clear()
  17. Size() int
  18. Update(int64)
  19. Values() []int64
  20. }
  21. // An exponentially-decaying sample using a forward-decaying priority
  22. // reservoir. See Cormode et al's "Forward Decay: A Practical Time Decay
  23. // Model for Streaming Systems".
  24. //
  25. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  26. type ExpDecaySample struct {
  27. alpha float64
  28. mutex *sync.Mutex
  29. reservoirSize int
  30. t0, t1 time.Time
  31. values expDecaySampleHeap
  32. }
  33. // Force the compiler to check that ExpDecaySample implements Sample.
  34. var _ Sample = &ExpDecaySample{}
  35. // Create a new exponentially-decaying sample with the given reservoir size
  36. // and alpha.
  37. func NewExpDecaySample(reservoirSize int, alpha float64) *ExpDecaySample {
  38. s := &ExpDecaySample{
  39. alpha: alpha,
  40. mutex: &sync.Mutex{},
  41. reservoirSize: reservoirSize,
  42. t0: time.Now(),
  43. values: make(expDecaySampleHeap, 0, reservoirSize),
  44. }
  45. s.t1 = time.Now().Add(rescaleThreshold)
  46. return s
  47. }
  48. // Clear all samples.
  49. func (s *ExpDecaySample) Clear() {
  50. s.mutex.Lock()
  51. defer s.mutex.Unlock()
  52. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  53. s.t0 = time.Now()
  54. s.t1 = s.t0.Add(rescaleThreshold)
  55. }
  56. // Return the size of the sample, which is at most the reservoir size.
  57. func (s *ExpDecaySample) Size() int {
  58. s.mutex.Lock()
  59. defer s.mutex.Unlock()
  60. return len(s.values)
  61. }
  62. // Update the sample with a new value.
  63. func (s *ExpDecaySample) Update(v int64) {
  64. s.mutex.Lock()
  65. defer s.mutex.Unlock()
  66. if len(s.values) == s.reservoirSize {
  67. heap.Pop(&s.values)
  68. }
  69. t := time.Now()
  70. heap.Push(&s.values, expDecaySample{
  71. k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
  72. v: v,
  73. })
  74. if t.After(s.t1) {
  75. values := s.values
  76. t0 := s.t0
  77. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  78. s.t0 = t
  79. s.t1 = s.t0.Add(rescaleThreshold)
  80. for _, v := range values {
  81. v.k = v.k * math.Exp(-s.alpha*float64(s.t0.Sub(t0)))
  82. heap.Push(&values, v)
  83. }
  84. }
  85. }
  86. // Return all the values in the sample.
  87. func (s *ExpDecaySample) Values() []int64 {
  88. s.mutex.Lock()
  89. defer s.mutex.Unlock()
  90. values := make([]int64, len(s.values))
  91. for i, v := range s.values {
  92. values[i] = v.v
  93. }
  94. return values
  95. }
  96. // A uniform sample using Vitter's Algorithm R.
  97. //
  98. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  99. type UniformSample struct {
  100. mutex *sync.Mutex
  101. reservoirSize int
  102. values []int64
  103. }
  104. // Create a new uniform sample with the given reservoir size.
  105. func NewUniformSample(reservoirSize int) *UniformSample {
  106. return &UniformSample{
  107. mutex: &sync.Mutex{},
  108. reservoirSize: reservoirSize,
  109. }
  110. }
  111. // Clear all samples.
  112. func (s *UniformSample) Clear() {
  113. s.mutex.Lock()
  114. defer s.mutex.Unlock()
  115. s.values = make([]int64, 0, s.reservoirSize)
  116. }
  117. // Return the size of the sample, which is at most the reservoir size.
  118. func (s *UniformSample) Size() int {
  119. s.mutex.Lock()
  120. defer s.mutex.Unlock()
  121. return len(s.values)
  122. }
  123. // Update the sample with a new value.
  124. func (s *UniformSample) Update(v int64) {
  125. s.mutex.Lock()
  126. defer s.mutex.Unlock()
  127. if len(s.values) < s.reservoirSize {
  128. s.values = append(s.values, v)
  129. } else {
  130. s.values[rand.Intn(s.reservoirSize)] = v
  131. }
  132. }
  133. // Return all the values in the sample.
  134. func (s *UniformSample) Values() []int64 {
  135. s.mutex.Lock()
  136. defer s.mutex.Unlock()
  137. values := make([]int64, len(s.values))
  138. copy(values, s.values)
  139. return values
  140. }
  141. // An individual sample.
  142. type expDecaySample struct {
  143. k float64
  144. v int64
  145. }
  146. // A min-heap of samples.
  147. type expDecaySampleHeap []expDecaySample
  148. func (q expDecaySampleHeap) Len() int {
  149. return len(q)
  150. }
  151. func (q expDecaySampleHeap) Less(i, j int) bool {
  152. return q[i].k < q[j].k
  153. }
  154. func (q *expDecaySampleHeap) Pop() interface{} {
  155. q_ := *q
  156. n := len(q_)
  157. i := q_[n-1]
  158. q_ = q_[0 : n-1]
  159. *q = q_
  160. return i
  161. }
  162. func (q *expDecaySampleHeap) Push(x interface{}) {
  163. q_ := *q
  164. n := len(q_)
  165. q_ = q_[0 : n+1]
  166. q_[n] = x.(expDecaySample)
  167. *q = q_
  168. }
  169. func (q expDecaySampleHeap) Swap(i, j int) {
  170. q[i], q[j] = q[j], q[i]
  171. }