sample.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package metrics
  2. import (
  3. "container/heap"
  4. "math"
  5. "math/rand"
  6. "time"
  7. )
  8. const rescaleThreshold = 1e9 * 60 * 60
  9. // Samples maintain a statistically-significant selection of values from
  10. // a stream.
  11. //
  12. // This is an interface so as to encourage other structs to implement
  13. // the Sample API as appropriate.
  14. type Sample interface {
  15. Clear()
  16. Size() int
  17. Update(int64)
  18. Values() []int64
  19. }
  20. // An exponentially-decaying sample using a forward-decaying priority
  21. // reservoir. See Cormode et al's "Forward Decay: A Practical Time Decay
  22. // Model for Streaming Systems".
  23. //
  24. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  25. type ExpDecaySample struct {
  26. reservoirSize int
  27. alpha float64
  28. in chan int64
  29. out chan chan []int64
  30. reset chan bool
  31. }
  32. // Force the compiler to check that ExpDecaySample implements Sample.
  33. var _ Sample = &ExpDecaySample{}
  34. // Create a new exponentially-decaying sample with the given reservoir size
  35. // and alpha.
  36. func NewExpDecaySample(reservoirSize int, alpha float64) *ExpDecaySample {
  37. s := &ExpDecaySample{
  38. reservoirSize,
  39. alpha,
  40. make(chan int64),
  41. make(chan chan []int64),
  42. make(chan bool),
  43. }
  44. go s.arbiter()
  45. return s
  46. }
  47. // Clear all samples.
  48. func (s *ExpDecaySample) Clear() {
  49. s.reset <- true
  50. }
  51. // Return the size of the sample, which is at most the reservoir size.
  52. func (s *ExpDecaySample) Size() int {
  53. return len(s.Values())
  54. }
  55. // Update the sample with a new value.
  56. func (s *ExpDecaySample) Update(v int64) {
  57. s.in <- v
  58. }
  59. // Return all the values in the sample.
  60. func (s *ExpDecaySample) Values() []int64 {
  61. ch := make(chan []int64)
  62. s.out <- ch
  63. return <-ch
  64. }
  65. // Receive inputs and send outputs. Count and save each input value,
  66. // rescaling the sample if enough time has elapsed since the last rescaling.
  67. // Send a copy of the values as output.
  68. func (s *ExpDecaySample) arbiter() {
  69. values := make(expDecaySampleHeap, 0, s.reservoirSize)
  70. start := time.Now()
  71. next := time.Now().Add(rescaleThreshold)
  72. for {
  73. select {
  74. case v := <-s.in:
  75. if len(values) == s.reservoirSize {
  76. heap.Pop(&values)
  77. }
  78. now := time.Now()
  79. k := math.Exp(now.Sub(start).Seconds()*s.alpha) / rand.Float64()
  80. heap.Push(&values, expDecaySample{k: k, v: v})
  81. if now.After(next) {
  82. oldValues := values
  83. oldStart := start
  84. values = make(expDecaySampleHeap, 0, s.reservoirSize)
  85. start = time.Now()
  86. next = start.Add(rescaleThreshold)
  87. for _, e := range oldValues {
  88. e.k = e.k * math.Exp(-s.alpha*float64(start.Sub(oldStart)))
  89. heap.Push(&values, e)
  90. }
  91. }
  92. case ch := <-s.out:
  93. valuesCopy := make([]int64, len(values))
  94. for i, e := range values {
  95. valuesCopy[i] = e.v
  96. }
  97. ch <- valuesCopy
  98. case <-s.reset:
  99. values = make(expDecaySampleHeap, 0, s.reservoirSize)
  100. start = time.Now()
  101. next = start.Add(rescaleThreshold)
  102. }
  103. }
  104. }
  105. // A uniform sample using Vitter's Algorithm R.
  106. //
  107. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  108. type UniformSample struct {
  109. reservoirSize int
  110. in chan int64
  111. out chan chan []int64
  112. reset chan bool
  113. }
  114. // Create a new uniform sample with the given reservoir size.
  115. func NewUniformSample(reservoirSize int) *UniformSample {
  116. s := &UniformSample{
  117. reservoirSize,
  118. make(chan int64),
  119. make(chan chan []int64),
  120. make(chan bool),
  121. }
  122. go s.arbiter()
  123. return s
  124. }
  125. // Clear all samples.
  126. func (s *UniformSample) Clear() {
  127. s.reset <- true
  128. }
  129. // Return the size of the sample, which is at most the reservoir size.
  130. func (s *UniformSample) Size() int {
  131. return len(s.Values())
  132. }
  133. // Update the sample with a new value.
  134. func (s *UniformSample) Update(v int64) {
  135. s.in <- v
  136. }
  137. // Return all the values in the sample.
  138. func (s *UniformSample) Values() []int64 {
  139. ch := make(chan []int64)
  140. s.out <- ch
  141. return <-ch
  142. }
  143. // Receive inputs and send outputs. Count and save each input value at a
  144. // random index. Send a copy of the values as output.
  145. func (s *UniformSample) arbiter() {
  146. values := make([]int64, 0, s.reservoirSize)
  147. for {
  148. n := len(values)
  149. select {
  150. case v := <-s.in:
  151. if n < s.reservoirSize {
  152. values = values[0 : n+1]
  153. values[n] = v
  154. } else {
  155. values[rand.Intn(s.reservoirSize)] = v
  156. }
  157. case ch := <-s.out:
  158. valuesCopy := make([]int64, n)
  159. copy(valuesCopy, values[:n])
  160. ch <- valuesCopy
  161. case <-s.reset:
  162. values = make([]int64, 0, s.reservoirSize)
  163. }
  164. }
  165. }
  166. // An individual sample.
  167. type expDecaySample struct {
  168. k float64
  169. v int64
  170. }
  171. // A min-heap of samples.
  172. type expDecaySampleHeap []expDecaySample
  173. func (q expDecaySampleHeap) Len() int {
  174. return len(q)
  175. }
  176. func (q expDecaySampleHeap) Less(i, j int) bool {
  177. return q[i].k < q[j].k
  178. }
  179. func (q *expDecaySampleHeap) Pop() interface{} {
  180. q_ := *q
  181. n := len(q_)
  182. i := q_[n-1]
  183. q_ = q_[0 : n-1]
  184. *q = q_
  185. return i
  186. }
  187. func (q *expDecaySampleHeap) Push(x interface{}) {
  188. q_ := *q
  189. n := len(q_)
  190. q_ = q_[0 : n+1]
  191. q_[n] = x.(expDecaySample)
  192. *q = q_
  193. }
  194. func (q expDecaySampleHeap) Swap(i, j int) {
  195. q[i], q[j] = q[j], q[i]
  196. }