sample.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package metrics
  2. import (
  3. "container/heap"
  4. "math"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. )
  9. const rescaleThreshold = 1e9 * 60 * 60
  10. // Samples maintain a statistically-significant selection of values from
  11. // a stream.
  12. //
  13. // This is an interface so as to encourage other structs to implement
  14. // the Sample API as appropriate.
  15. type Sample interface {
  16. Clear()
  17. Size() int
  18. Update(int64)
  19. Values() []int64
  20. }
  21. // An exponentially-decaying sample using a forward-decaying priority
  22. // reservoir. See Cormode et al's "Forward Decay: A Practical Time Decay
  23. // Model for Streaming Systems".
  24. //
  25. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  26. type ExpDecaySample struct {
  27. alpha float64
  28. mutex sync.Mutex
  29. reservoirSize int
  30. t0, t1 time.Time
  31. values expDecaySampleHeap
  32. }
  33. // Create a new exponentially-decaying sample with the given reservoir size
  34. // and alpha.
  35. func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
  36. if UseNilMetrics {
  37. return NilSample{}
  38. }
  39. s := &ExpDecaySample{
  40. alpha: alpha,
  41. reservoirSize: reservoirSize,
  42. t0: time.Now(),
  43. values: make(expDecaySampleHeap, 0, reservoirSize),
  44. }
  45. s.t1 = time.Now().Add(rescaleThreshold)
  46. return s
  47. }
  48. // Clear all samples.
  49. func (s *ExpDecaySample) Clear() {
  50. s.mutex.Lock()
  51. defer s.mutex.Unlock()
  52. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  53. s.t0 = time.Now()
  54. s.t1 = s.t0.Add(rescaleThreshold)
  55. }
  56. // Return the size of the sample, which is at most the reservoir size.
  57. func (s *ExpDecaySample) Size() int {
  58. s.mutex.Lock()
  59. defer s.mutex.Unlock()
  60. return len(s.values)
  61. }
  62. // Update the sample with a new value.
  63. func (s *ExpDecaySample) Update(v int64) {
  64. s.mutex.Lock()
  65. defer s.mutex.Unlock()
  66. if len(s.values) == s.reservoirSize {
  67. heap.Pop(&s.values)
  68. }
  69. t := time.Now()
  70. heap.Push(&s.values, expDecaySample{
  71. k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
  72. v: v,
  73. })
  74. if t.After(s.t1) {
  75. values := s.values
  76. t0 := s.t0
  77. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  78. s.t0 = t
  79. s.t1 = s.t0.Add(rescaleThreshold)
  80. for _, v := range values {
  81. v.k = v.k * math.Exp(-s.alpha*float64(s.t0.Sub(t0)))
  82. heap.Push(&s.values, v)
  83. }
  84. }
  85. }
  86. // Return all the values in the sample.
  87. func (s *ExpDecaySample) Values() []int64 {
  88. s.mutex.Lock()
  89. defer s.mutex.Unlock()
  90. values := make([]int64, len(s.values))
  91. for i, v := range s.values {
  92. values[i] = v.v
  93. }
  94. return values
  95. }
  96. // No-op Sample.
  97. type NilSample struct{}
  98. // No-op.
  99. func (s NilSample) Clear() {}
  100. // No-op.
  101. func (s NilSample) Size() int { return 0 }
  102. // No-op.
  103. func (s NilSample) Update(v int64) {}
  104. // No-op.
  105. func (s NilSample) Values() []int64 { return []int64{} }
  106. // A uniform sample using Vitter's Algorithm R.
  107. //
  108. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  109. type UniformSample struct {
  110. mutex sync.Mutex
  111. reservoirSize int
  112. values []int64
  113. }
  114. // Create a new uniform sample with the given reservoir size.
  115. func NewUniformSample(reservoirSize int) Sample {
  116. if UseNilMetrics {
  117. return NilSample{}
  118. }
  119. return &UniformSample{reservoirSize: reservoirSize}
  120. }
  121. // Clear all samples.
  122. func (s *UniformSample) Clear() {
  123. s.mutex.Lock()
  124. defer s.mutex.Unlock()
  125. s.values = make([]int64, 0, s.reservoirSize)
  126. }
  127. // Return the size of the sample, which is at most the reservoir size.
  128. func (s *UniformSample) Size() int {
  129. s.mutex.Lock()
  130. defer s.mutex.Unlock()
  131. return len(s.values)
  132. }
  133. // Update the sample with a new value.
  134. func (s *UniformSample) Update(v int64) {
  135. s.mutex.Lock()
  136. defer s.mutex.Unlock()
  137. if len(s.values) < s.reservoirSize {
  138. s.values = append(s.values, v)
  139. } else {
  140. s.values[rand.Intn(s.reservoirSize)] = v
  141. }
  142. }
  143. // Return all the values in the sample.
  144. func (s *UniformSample) Values() []int64 {
  145. s.mutex.Lock()
  146. defer s.mutex.Unlock()
  147. values := make([]int64, len(s.values))
  148. copy(values, s.values)
  149. return values
  150. }
  151. // An individual sample.
  152. type expDecaySample struct {
  153. k float64
  154. v int64
  155. }
  156. // A min-heap of samples.
  157. type expDecaySampleHeap []expDecaySample
  158. func (q expDecaySampleHeap) Len() int {
  159. return len(q)
  160. }
  161. func (q expDecaySampleHeap) Less(i, j int) bool {
  162. return q[i].k < q[j].k
  163. }
  164. func (q *expDecaySampleHeap) Pop() interface{} {
  165. q_ := *q
  166. n := len(q_)
  167. i := q_[n-1]
  168. q_ = q_[0 : n-1]
  169. *q = q_
  170. return i
  171. }
  172. func (q *expDecaySampleHeap) Push(x interface{}) {
  173. q_ := *q
  174. n := len(q_)
  175. q_ = q_[0 : n+1]
  176. q_[n] = x.(expDecaySample)
  177. *q = q_
  178. }
  179. func (q expDecaySampleHeap) Swap(i, j int) {
  180. q[i], q[j] = q[j], q[i]
  181. }