sample.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package metrics
  2. import (
  3. "container/heap"
  4. "math"
  5. "math/rand"
  6. "time"
  7. )
  8. const rescaleThreshold = 1e9 * 60 * 60
  9. // Samples maintain a statistically-significant selection of values from
  10. // a stream.
  11. //
  12. // This is an interface so as to encourage other structs to implement
  13. // the Sample API as appropriate.
  14. type Sample interface {
  15. Clear()
  16. Size() int
  17. Update(int64)
  18. Values() []int64
  19. }
  20. // An exponentially-decaying sample using a forward-decaying priority
  21. // reservoir. See Cormode et al's "Forward Decay: A Practical Time Decay
  22. // Model for Streaming Systems".
  23. //
  24. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  25. type ExpDecaySample struct {
  26. reservoirSize int
  27. alpha float64
  28. in chan int64
  29. out chan chan []int64
  30. reset chan bool
  31. }
  32. // Force the compiler to check that ExpDecaySample implements Sample.
  33. var _ Sample = &ExpDecaySample{}
  34. // Create a new exponentially-decaying sample with the given reservoir size
  35. // and alpha.
  36. func NewExpDecaySample(reservoirSize int, alpha float64) *ExpDecaySample {
  37. s := &ExpDecaySample{
  38. reservoirSize,
  39. alpha,
  40. make(chan int64),
  41. make(chan chan []int64),
  42. make(chan bool),
  43. }
  44. go s.arbiter()
  45. return s
  46. }
  47. // Clear all samples.
  48. func (s *ExpDecaySample) Clear() {
  49. s.reset <- true
  50. }
  51. // Return the size of the sample, which is at most the reservoir size.
  52. func (s *ExpDecaySample) Size() int {
  53. return len(s.Values())
  54. }
  55. // Update the sample with a new value.
  56. func (s *ExpDecaySample) Update(v int64) {
  57. s.in <- v
  58. }
  59. // Return all the values in the sample.
  60. func (s *ExpDecaySample) Values() []int64 {
  61. c := make(chan []int64)
  62. s.out <- c
  63. return <-c
  64. }
  65. // An individual sample.
  66. type expDecaySample struct {
  67. k float64
  68. v int64
  69. }
  70. // A min-heap of samples.
  71. type expDecaySampleHeap []expDecaySample
  72. func (q expDecaySampleHeap) Len() int {
  73. return len(q)
  74. }
  75. func (q expDecaySampleHeap) Less(i, j int) bool {
  76. return q[i].k < q[j].k
  77. }
  78. func (q expDecaySampleHeap) Swap(i, j int) {
  79. q[i], q[j] = q[j], q[i]
  80. }
  81. func (q *expDecaySampleHeap) Push(x interface{}) {
  82. q_ := *q
  83. n := len(q_)
  84. q_ = q_[0 : n+1]
  85. q_[n] = x.(expDecaySample)
  86. *q = q_
  87. }
  88. func (q *expDecaySampleHeap) Pop() interface{} {
  89. q_ := *q
  90. n := len(q_)
  91. i := q_[n-1]
  92. q_ = q_[0 : n-1]
  93. *q = q_
  94. return i
  95. }
  96. // Receive inputs and send outputs. Count and save each input value,
  97. // rescaling the sample if enough time has elapsed since the last rescaling.
  98. // Send a copy of the values as output.
  99. func (s *ExpDecaySample) arbiter() {
  100. values := make(expDecaySampleHeap, 0, s.reservoirSize)
  101. start := time.Now()
  102. next := time.Now().Add(rescaleThreshold)
  103. for {
  104. select {
  105. case v := <-s.in:
  106. if len(values) == s.reservoirSize {
  107. heap.Pop(&values)
  108. }
  109. now := time.Now()
  110. k := math.Exp(now.Sub(start).Seconds()*s.alpha) / rand.Float64()
  111. heap.Push(&values, expDecaySample{k: k, v: v})
  112. if now.After(next) {
  113. oldValues := values
  114. oldStart := start
  115. values = make(expDecaySampleHeap, 0, s.reservoirSize)
  116. start = time.Now()
  117. next = start.Add(rescaleThreshold)
  118. for _, e := range oldValues {
  119. e.k = e.k * math.Exp(-s.alpha*float64(start.Sub(oldStart)))
  120. heap.Push(&values, e)
  121. }
  122. }
  123. case c := <-s.out:
  124. valuesCopy := make([]int64, len(values))
  125. for i, e := range values {
  126. valuesCopy[i] = e.v
  127. }
  128. c <- valuesCopy
  129. case <-s.reset:
  130. values = make(expDecaySampleHeap, 0, s.reservoirSize)
  131. start = time.Now()
  132. next = start.Add(rescaleThreshold)
  133. }
  134. }
  135. }
  136. // A uniform sample using Vitter's Algorithm R.
  137. //
  138. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  139. type UniformSample struct {
  140. reservoirSize int
  141. in chan int64
  142. out chan chan []int64
  143. reset chan bool
  144. }
  145. // Create a new uniform sample with the given reservoir size.
  146. func NewUniformSample(reservoirSize int) *UniformSample {
  147. s := &UniformSample{
  148. reservoirSize,
  149. make(chan int64),
  150. make(chan chan []int64),
  151. make(chan bool),
  152. }
  153. go s.arbiter()
  154. return s
  155. }
  156. // Clear all samples.
  157. func (s *UniformSample) Clear() {
  158. s.reset <- true
  159. }
  160. // Return the size of the sample, which is at most the reservoir size.
  161. func (s *UniformSample) Size() int {
  162. return len(s.Values())
  163. }
  164. // Update the sample with a new value.
  165. func (s *UniformSample) Update(v int64) {
  166. s.in <- v
  167. }
  168. // Return all the values in the sample.
  169. func (s *UniformSample) Values() []int64 {
  170. c := make(chan []int64)
  171. s.out <- c
  172. return <-c
  173. }
  174. // Receive inputs and send outputs. Count and save each input value at a
  175. // random index. Send a copy of the values as output.
  176. func (s *UniformSample) arbiter() {
  177. values := make([]int64, 0, s.reservoirSize)
  178. for {
  179. n := len(values)
  180. select {
  181. case v := <-s.in:
  182. if n < s.reservoirSize {
  183. values = values[0 : n+1]
  184. values[n] = v
  185. } else {
  186. values[rand.Intn(s.reservoirSize)] = v
  187. }
  188. case c := <-s.out:
  189. valuesCopy := make([]int64, n)
  190. for i := 0; i < n; i++ {
  191. valuesCopy[i] = values[i]
  192. }
  193. c <- valuesCopy
  194. case <-s.reset:
  195. values = make([]int64, 0, s.reservoirSize)
  196. }
  197. }
  198. }