sample.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package metrics
  2. import (
  3. "container/heap"
  4. "math"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. )
  9. const rescaleThreshold = 1e9 * 60 * 60
  10. // Samples maintain a statistically-significant selection of values from
  11. // a stream.
  12. //
  13. // This is an interface so as to encourage other structs to implement
  14. // the Sample API as appropriate.
  15. type Sample interface {
  16. Clear()
  17. Size() int
  18. Update(int64)
  19. Values() []int64
  20. }
  21. // An exponentially-decaying sample using a forward-decaying priority
  22. // reservoir. See Cormode et al's "Forward Decay: A Practical Time Decay
  23. // Model for Streaming Systems".
  24. //
  25. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  26. type ExpDecaySample struct {
  27. alpha float64
  28. mutex sync.Mutex
  29. reservoirSize int
  30. t0, t1 time.Time
  31. values expDecaySampleHeap
  32. }
  33. // Force the compiler to check that ExpDecaySample implements Sample.
  34. var _ Sample = &ExpDecaySample{}
  35. // Create a new exponentially-decaying sample with the given reservoir size
  36. // and alpha.
  37. func NewExpDecaySample(reservoirSize int, alpha float64) *ExpDecaySample {
  38. s := &ExpDecaySample{
  39. alpha: alpha,
  40. reservoirSize: reservoirSize,
  41. t0: time.Now(),
  42. values: make(expDecaySampleHeap, 0, reservoirSize),
  43. }
  44. s.t1 = time.Now().Add(rescaleThreshold)
  45. return s
  46. }
  47. // Clear all samples.
  48. func (s *ExpDecaySample) Clear() {
  49. s.mutex.Lock()
  50. defer s.mutex.Unlock()
  51. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  52. s.t0 = time.Now()
  53. s.t1 = s.t0.Add(rescaleThreshold)
  54. }
  55. // Return the size of the sample, which is at most the reservoir size.
  56. func (s *ExpDecaySample) Size() int {
  57. s.mutex.Lock()
  58. defer s.mutex.Unlock()
  59. return len(s.values)
  60. }
  61. // Update the sample with a new value.
  62. func (s *ExpDecaySample) Update(v int64) {
  63. s.mutex.Lock()
  64. defer s.mutex.Unlock()
  65. if len(s.values) == s.reservoirSize {
  66. heap.Pop(&s.values)
  67. }
  68. t := time.Now()
  69. heap.Push(&s.values, expDecaySample{
  70. k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
  71. v: v,
  72. })
  73. if t.After(s.t1) {
  74. values := s.values
  75. t0 := s.t0
  76. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  77. s.t0 = t
  78. s.t1 = s.t0.Add(rescaleThreshold)
  79. for _, v := range values {
  80. v.k = v.k * math.Exp(-s.alpha*float64(s.t0.Sub(t0)))
  81. heap.Push(&s.values, v)
  82. }
  83. }
  84. }
  85. // Return all the values in the sample.
  86. func (s *ExpDecaySample) Values() []int64 {
  87. s.mutex.Lock()
  88. defer s.mutex.Unlock()
  89. values := make([]int64, len(s.values))
  90. for i, v := range s.values {
  91. values[i] = v.v
  92. }
  93. return values
  94. }
  95. // A uniform sample using Vitter's Algorithm R.
  96. //
  97. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  98. type UniformSample struct {
  99. mutex sync.Mutex
  100. reservoirSize int
  101. values []int64
  102. }
  103. // Create a new uniform sample with the given reservoir size.
  104. func NewUniformSample(reservoirSize int) *UniformSample {
  105. return &UniformSample{reservoirSize: reservoirSize}
  106. }
  107. // Clear all samples.
  108. func (s *UniformSample) Clear() {
  109. s.mutex.Lock()
  110. defer s.mutex.Unlock()
  111. s.values = make([]int64, 0, s.reservoirSize)
  112. }
  113. // Return the size of the sample, which is at most the reservoir size.
  114. func (s *UniformSample) Size() int {
  115. s.mutex.Lock()
  116. defer s.mutex.Unlock()
  117. return len(s.values)
  118. }
  119. // Update the sample with a new value.
  120. func (s *UniformSample) Update(v int64) {
  121. s.mutex.Lock()
  122. defer s.mutex.Unlock()
  123. if len(s.values) < s.reservoirSize {
  124. s.values = append(s.values, v)
  125. } else {
  126. s.values[rand.Intn(s.reservoirSize)] = v
  127. }
  128. }
  129. // Return all the values in the sample.
  130. func (s *UniformSample) Values() []int64 {
  131. s.mutex.Lock()
  132. defer s.mutex.Unlock()
  133. values := make([]int64, len(s.values))
  134. copy(values, s.values)
  135. return values
  136. }
  137. // An individual sample.
  138. type expDecaySample struct {
  139. k float64
  140. v int64
  141. }
  142. // A min-heap of samples.
  143. type expDecaySampleHeap []expDecaySample
  144. func (q expDecaySampleHeap) Len() int {
  145. return len(q)
  146. }
  147. func (q expDecaySampleHeap) Less(i, j int) bool {
  148. return q[i].k < q[j].k
  149. }
  150. func (q *expDecaySampleHeap) Pop() interface{} {
  151. q_ := *q
  152. n := len(q_)
  153. i := q_[n-1]
  154. q_ = q_[0 : n-1]
  155. *q = q_
  156. return i
  157. }
  158. func (q *expDecaySampleHeap) Push(x interface{}) {
  159. q_ := *q
  160. n := len(q_)
  161. q_ = q_[0 : n+1]
  162. q_[n] = x.(expDecaySample)
  163. *q = q_
  164. }
  165. func (q expDecaySampleHeap) Swap(i, j int) {
  166. q[i], q[j] = q[j], q[i]
  167. }