sample.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package metrics
  2. import (
  3. "container/heap"
  4. "math"
  5. "math/rand"
  6. "time"
  7. )
  8. const rescaleThreshold = 1e9 * 60 * 60
  9. // Samples maintain a statistically-significant selection of values from
  10. // a stream.
  11. //
  12. // This is an interface so as to encourage other structs to implement
  13. // the Sample API as appropriate.
  14. type Sample interface {
  15. Clear()
  16. Size() int
  17. Update(int64)
  18. Values() []int64
  19. }
  20. // An exponentially-decaying sample using a forward-decaying priority
  21. // reservoir. See Cormode et al's "Forward Decay: A Practical Time Decay
  22. // Model for Streaming Systems".
  23. //
  24. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  25. type ExpDecaySample struct {
  26. reservoirSize int
  27. alpha float64
  28. in chan int64
  29. out chan chan []int64
  30. reset chan bool
  31. }
  32. // Create a new exponentially-decaying sample with the given reservoir size
  33. // and alpha.
  34. func NewExpDecaySample(reservoirSize int, alpha float64) *ExpDecaySample {
  35. s := &ExpDecaySample{
  36. reservoirSize,
  37. alpha,
  38. make(chan int64),
  39. make(chan chan []int64),
  40. make(chan bool),
  41. }
  42. go s.arbiter()
  43. return s
  44. }
  45. // Clear all samples.
  46. func (s *ExpDecaySample) Clear() {
  47. s.reset <- true
  48. }
  49. // Return the size of the sample, which is at most the reservoir size.
  50. func (s *ExpDecaySample) Size() int {
  51. return len(s.Values())
  52. }
  53. // Update the sample with a new value.
  54. func (s *ExpDecaySample) Update(v int64) {
  55. s.in <- v
  56. }
  57. // Return all the values in the sample.
  58. func (s *ExpDecaySample) Values() []int64 {
  59. c := make(chan []int64)
  60. s.out <- c
  61. return <-c
  62. }
  63. // An individual sample.
  64. type expDecaySample struct {
  65. k float64
  66. v int64
  67. }
  68. // A min-heap of samples.
  69. type expDecaySampleHeap []expDecaySample
  70. func (q expDecaySampleHeap) Len() int {
  71. return len(q)
  72. }
  73. func (q expDecaySampleHeap) Less(i, j int) bool {
  74. return q[i].k < q[j].k
  75. }
  76. func (q expDecaySampleHeap) Swap(i, j int) {
  77. q[i], q[j] = q[j], q[i]
  78. }
  79. func (q *expDecaySampleHeap) Push(x interface{}) {
  80. q_ := *q
  81. n := len(q_)
  82. q_ = q_[0 : n+1]
  83. q_[n] = x.(expDecaySample)
  84. *q = q_
  85. }
  86. func (q *expDecaySampleHeap) Pop() interface{} {
  87. q_ := *q
  88. n := len(q_)
  89. i := q_[n-1]
  90. q_ = q_[0 : n-1]
  91. *q = q_
  92. return i
  93. }
  94. // Receive inputs and send outputs. Count and save each input value,
  95. // rescaling the sample if enough time has elapsed since the last rescaling.
  96. // Send a copy of the values as output.
  97. func (s *ExpDecaySample) arbiter() {
  98. values := make(expDecaySampleHeap, 0, s.reservoirSize)
  99. start := time.Now()
  100. next := time.Now().Add(rescaleThreshold)
  101. for {
  102. select {
  103. case v := <-s.in:
  104. if len(values) == s.reservoirSize {
  105. heap.Pop(&values)
  106. }
  107. now := time.Now()
  108. k := math.Exp(now.Sub(start).Seconds()*s.alpha) / rand.Float64()
  109. heap.Push(&values, expDecaySample{k: k, v: v})
  110. if now.After(next) {
  111. oldValues := values
  112. oldStart := start
  113. values = make(expDecaySampleHeap, 0, s.reservoirSize)
  114. start = time.Now()
  115. next = start.Add(rescaleThreshold)
  116. for _, e := range oldValues {
  117. e.k = e.k * math.Exp(-s.alpha*float64(start.Sub(oldStart)))
  118. heap.Push(&values, e)
  119. }
  120. }
  121. case c := <-s.out:
  122. valuesCopy := make([]int64, len(values))
  123. for i, e := range values {
  124. valuesCopy[i] = e.v
  125. }
  126. c <- valuesCopy
  127. case <-s.reset:
  128. values = make(expDecaySampleHeap, 0, s.reservoirSize)
  129. start = time.Now()
  130. next = start.Add(rescaleThreshold)
  131. }
  132. }
  133. }
  134. // A uniform sample using Vitter's Algorithm R.
  135. //
  136. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  137. type UniformSample struct {
  138. reservoirSize int
  139. in chan int64
  140. out chan chan []int64
  141. reset chan bool
  142. }
  143. // Create a new uniform sample with the given reservoir size.
  144. func NewUniformSample(reservoirSize int) *UniformSample {
  145. s := &UniformSample{
  146. reservoirSize,
  147. make(chan int64),
  148. make(chan chan []int64),
  149. make(chan bool),
  150. }
  151. go s.arbiter()
  152. return s
  153. }
  154. // Clear all samples.
  155. func (s *UniformSample) Clear() {
  156. s.reset <- true
  157. }
  158. // Return the size of the sample, which is at most the reservoir size.
  159. func (s *UniformSample) Size() int {
  160. return len(s.Values())
  161. }
  162. // Update the sample with a new value.
  163. func (s *UniformSample) Update(v int64) {
  164. s.in <- v
  165. }
  166. // Return all the values in the sample.
  167. func (s *UniformSample) Values() []int64 {
  168. c := make(chan []int64)
  169. s.out <- c
  170. return <-c
  171. }
  172. // Receive inputs and send outputs. Count and save each input value at a
  173. // random index. Send a copy of the values as output.
  174. func (s *UniformSample) arbiter() {
  175. values := make([]int64, 0, s.reservoirSize)
  176. for {
  177. n := len(values)
  178. select {
  179. case v := <-s.in:
  180. if n < s.reservoirSize {
  181. values = values[0 : n+1]
  182. values[n] = v
  183. } else {
  184. values[rand.Intn(s.reservoirSize)] = v
  185. }
  186. case c := <-s.out:
  187. valuesCopy := make([]int64, n)
  188. for i := 0; i < n; i++ {
  189. valuesCopy[i] = values[i]
  190. }
  191. c <- valuesCopy
  192. case <-s.reset:
  193. values = make([]int64, 0, s.reservoirSize)
  194. }
  195. }
  196. }