sample.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package metrics
  2. import (
  3. "math"
  4. "math/rand"
  5. "time"
  6. )
  7. const rescaleThreshold = 1e9 * 60 * 60
  8. // Samples maintain a statistically-significant selection of values from
  9. // a stream.
  10. //
  11. // This is an interface so as to encourage other structs to implement
  12. // the Sample API as appropriate.
  13. type Sample interface {
  14. Clear()
  15. Size() int
  16. Update(int64)
  17. Values() []int64
  18. }
  19. // An exponentially-decaying sample using a forward-decaying priority
  20. // reservoir. See Cormode et al's "Forward Decay: A Practical Time Decay
  21. // Model for Streaming Systems".
  22. //
  23. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  24. type ExpDecaySample struct {
  25. reservoirSize int
  26. alpha float64
  27. in chan int64
  28. out chan []int64
  29. reset chan bool
  30. }
  31. // Create a new exponentially-decaying sample with the given reservoir size
  32. // and alpha.
  33. func NewExpDecaySample(reservoirSize int, alpha float64) *ExpDecaySample {
  34. s := &ExpDecaySample{
  35. reservoirSize,
  36. alpha,
  37. make(chan int64),
  38. make(chan []int64),
  39. make(chan bool),
  40. }
  41. go s.arbiter()
  42. return s
  43. }
  44. // Clear all samples.
  45. func (s *ExpDecaySample) Clear() {
  46. s.reset <- true
  47. }
  48. // Return the size of the sample, which is at most the reservoir size.
  49. func (s *ExpDecaySample) Size() int {
  50. return len(<-s.out)
  51. }
  52. // Update the sample with a new value.
  53. func (s *ExpDecaySample) Update(v int64) {
  54. s.in <- v
  55. }
  56. // Return all the values in the sample.
  57. func (s *ExpDecaySample) Values() []int64 {
  58. return <-s.out
  59. }
  60. // Receive inputs and send outputs. Count and save each input value,
  61. // rescaling the sample if enough time has elapsed since the last rescaling.
  62. // Send a copy of the values as output.
  63. func (s *ExpDecaySample) arbiter() {
  64. count := 0
  65. values := make(map[float64]int64)
  66. start := time.Now()
  67. next := time.Now().Add(rescaleThreshold)
  68. var valuesCopy []int64
  69. for {
  70. select {
  71. case v := <-s.in:
  72. now := time.Now()
  73. k := math.Exp(float64(now.Sub(start))*s.alpha) / rand.Float64()
  74. count++
  75. values[k] = v
  76. if count > s.reservoirSize {
  77. min := math.MaxFloat64
  78. for k, _ := range values {
  79. if k < min {
  80. min = k
  81. }
  82. }
  83. delete(values, min)
  84. valuesCopy = make([]int64, s.reservoirSize)
  85. } else {
  86. valuesCopy = make([]int64, count)
  87. }
  88. if now.After(next) {
  89. oldStart := start
  90. start = time.Now()
  91. next = now.Add(rescaleThreshold)
  92. oldValues := values
  93. values = make(map[float64]int64, len(oldValues))
  94. for k, v := range oldValues {
  95. values[k*math.Exp(-s.alpha*float64(
  96. start.Sub(oldStart)))] = v
  97. }
  98. }
  99. i := 0
  100. for _, v := range values {
  101. valuesCopy[i] = v
  102. i++
  103. }
  104. case s.out <- valuesCopy: // TODO Might need to make another copy here.
  105. case <-s.reset:
  106. count = 0
  107. values = make(map[float64]int64)
  108. valuesCopy = make([]int64, 0)
  109. start = time.Now()
  110. next = start.Add(rescaleThreshold)
  111. }
  112. }
  113. }
  114. // A uniform sample using Vitter's Algorithm R.
  115. //
  116. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  117. type UniformSample struct {
  118. reservoirSize int
  119. in chan int64
  120. out chan []int64
  121. reset chan bool
  122. }
  123. // Create a new uniform sample with the given reservoir size.
  124. func NewUniformSample(reservoirSize int) *UniformSample {
  125. s := &UniformSample{
  126. reservoirSize,
  127. make(chan int64),
  128. make(chan []int64),
  129. make(chan bool),
  130. }
  131. go s.arbiter()
  132. return s
  133. }
  134. // Clear all samples.
  135. func (s *UniformSample) Clear() {
  136. s.reset <- true
  137. }
  138. // Return the size of the sample, which is at most the reservoir size.
  139. func (s *UniformSample) Size() int {
  140. return len(<-s.out)
  141. }
  142. // Update the sample with a new value.
  143. func (s *UniformSample) Update(v int64) {
  144. s.in <- v
  145. }
  146. // Return all the values in the sample.
  147. func (s *UniformSample) Values() []int64 {
  148. return <-s.out
  149. }
  150. // Receive inputs and send outputs. Count and save each input value at a
  151. // random index. Send a copy of the values as output.
  152. func (s *UniformSample) arbiter() {
  153. count := 0
  154. values := make([]int64, s.reservoirSize)
  155. var valuesCopy []int64
  156. for {
  157. select {
  158. case v := <-s.in:
  159. count++
  160. if count < s.reservoirSize {
  161. values[count-1] = v
  162. valuesCopy = make([]int64, count)
  163. } else {
  164. values[rand.Intn(s.reservoirSize)] = v
  165. valuesCopy = make([]int64, len(values))
  166. }
  167. for i := 0; i < len(valuesCopy); i++ {
  168. valuesCopy[i] = values[i]
  169. }
  170. case s.out <- valuesCopy: // TODO Might need to make another copy here.
  171. case <-s.reset:
  172. count = 0
  173. values = make([]int64, s.reservoirSize)
  174. valuesCopy = make([]int64, 0)
  175. }
  176. }
  177. }