sample.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package metrics
  2. import (
  3. "container/heap"
  4. "math"
  5. "math/rand"
  6. "time"
  7. )
  8. const rescaleThreshold = 1e9 * 60 * 60
  9. // Samples maintain a statistically-significant selection of values from
  10. // a stream.
  11. //
  12. // This is an interface so as to encourage other structs to implement
  13. // the Sample API as appropriate.
  14. type Sample interface {
  15. Clear()
  16. Size() int
  17. Update(int64)
  18. Values() []int64
  19. }
  20. // An exponentially-decaying sample using a forward-decaying priority
  21. // reservoir. See Cormode et al's "Forward Decay: A Practical Time Decay
  22. // Model for Streaming Systems".
  23. //
  24. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  25. type ExpDecaySample struct {
  26. reservoirSize int
  27. alpha float64
  28. in chan int64
  29. out chan []int64
  30. reset chan bool
  31. }
  32. // Create a new exponentially-decaying sample with the given reservoir size
  33. // and alpha.
  34. func NewExpDecaySample(reservoirSize int, alpha float64) *ExpDecaySample {
  35. s := &ExpDecaySample{
  36. reservoirSize,
  37. alpha,
  38. make(chan int64),
  39. make(chan []int64),
  40. make(chan bool),
  41. }
  42. go s.arbiter()
  43. return s
  44. }
  45. // Clear all samples.
  46. func (s *ExpDecaySample) Clear() {
  47. s.reset <- true
  48. }
  49. // Return the size of the sample, which is at most the reservoir size.
  50. func (s *ExpDecaySample) Size() int {
  51. return len(<-s.out)
  52. }
  53. // Update the sample with a new value.
  54. func (s *ExpDecaySample) Update(v int64) {
  55. s.in <- v
  56. }
  57. // Return all the values in the sample.
  58. func (s *ExpDecaySample) Values() []int64 {
  59. return <-s.out
  60. }
  61. // An individual sample.
  62. type expDecaySample struct {
  63. k float64
  64. v int64
  65. }
  66. // A min-heap of samples.
  67. type expDecaySampleHeap []expDecaySample
  68. func (q expDecaySampleHeap) Len() int {
  69. return len(q)
  70. }
  71. func (q expDecaySampleHeap) Less(i, j int) bool {
  72. return q[i].k < q[j].k
  73. }
  74. func (q expDecaySampleHeap) Swap(i, j int) {
  75. q[i], q[j] = q[j], q[i]
  76. }
  77. func (q *expDecaySampleHeap) Push(x interface{}) {
  78. q_ := *q
  79. n := len(q_)
  80. q_ = q_[0 : n+1]
  81. q_[n] = x.(expDecaySample)
  82. *q = q_
  83. }
  84. func (q *expDecaySampleHeap) Pop() interface{} {
  85. q_ := *q
  86. n := len(q_)
  87. i := q_[n-1]
  88. q_ = q_[0 : n-1]
  89. *q = q_
  90. return i
  91. }
  92. // Receive inputs and send outputs. Count and save each input value,
  93. // rescaling the sample if enough time has elapsed since the last rescaling.
  94. // Send a copy of the values as output.
  95. func (s *ExpDecaySample) arbiter() {
  96. values := make(expDecaySampleHeap, 0, s.reservoirSize)
  97. start := time.Now()
  98. next := time.Now().Add(rescaleThreshold)
  99. var valuesCopy []int64
  100. for {
  101. select {
  102. case v := <-s.in:
  103. if len(values) == s.reservoirSize {
  104. heap.Pop(&values)
  105. }
  106. now := time.Now()
  107. k := math.Exp(now.Sub(start).Seconds()*s.alpha) / rand.Float64()
  108. heap.Push(&values, expDecaySample{k: k, v: v})
  109. valuesCopy = make([]int64, len(values))
  110. if now.After(next) {
  111. oldValues := values
  112. oldStart := start
  113. values = make(expDecaySampleHeap, 0, s.reservoirSize)
  114. start = time.Now()
  115. next = start.Add(rescaleThreshold)
  116. for _, e := range oldValues {
  117. e.k = e.k * math.Exp(-s.alpha*float64(start.Sub(oldStart)))
  118. heap.Push(&values, e)
  119. }
  120. }
  121. for i, e := range values {
  122. valuesCopy[i] = e.v
  123. }
  124. case s.out <- valuesCopy: // TODO Might need to make another copy here.
  125. case <-s.reset:
  126. values = make(expDecaySampleHeap, 0, s.reservoirSize)
  127. valuesCopy = make([]int64, 0)
  128. start = time.Now()
  129. next = start.Add(rescaleThreshold)
  130. }
  131. }
  132. }
  133. // A uniform sample using Vitter's Algorithm R.
  134. //
  135. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  136. type UniformSample struct {
  137. reservoirSize int
  138. in chan int64
  139. out chan []int64
  140. reset chan bool
  141. }
  142. // Create a new uniform sample with the given reservoir size.
  143. func NewUniformSample(reservoirSize int) *UniformSample {
  144. s := &UniformSample{
  145. reservoirSize,
  146. make(chan int64),
  147. make(chan []int64),
  148. make(chan bool),
  149. }
  150. go s.arbiter()
  151. return s
  152. }
  153. // Clear all samples.
  154. func (s *UniformSample) Clear() {
  155. s.reset <- true
  156. }
  157. // Return the size of the sample, which is at most the reservoir size.
  158. func (s *UniformSample) Size() int {
  159. return len(<-s.out)
  160. }
  161. // Update the sample with a new value.
  162. func (s *UniformSample) Update(v int64) {
  163. s.in <- v
  164. }
  165. // Return all the values in the sample.
  166. func (s *UniformSample) Values() []int64 {
  167. return <-s.out
  168. }
  169. // Receive inputs and send outputs. Count and save each input value at a
  170. // random index. Send a copy of the values as output.
  171. func (s *UniformSample) arbiter() {
  172. count := 0
  173. values := make([]int64, s.reservoirSize)
  174. var valuesCopy []int64
  175. for {
  176. select {
  177. case v := <-s.in:
  178. if count < s.reservoirSize {
  179. values[count] = v
  180. count++
  181. valuesCopy = make([]int64, count)
  182. } else {
  183. values[rand.Intn(s.reservoirSize)] = v
  184. valuesCopy = make([]int64, len(values))
  185. }
  186. for i := 0; i < len(valuesCopy); i++ {
  187. valuesCopy[i] = values[i]
  188. }
  189. case s.out <- valuesCopy: // TODO Might need to make another copy here.
  190. case <-s.reset:
  191. count = 0
  192. values = make([]int64, s.reservoirSize)
  193. valuesCopy = make([]int64, 0)
  194. }
  195. }
  196. }