sample.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package metrics
  2. import (
  3. "math"
  4. "rand"
  5. "time"
  6. )
  7. const rescaleThreshold = 1e9 * 60 * 60
  8. // Samples maintain a statistically-significant selection of values from
  9. // a stream.
  10. //
  11. // This is an interface so as to encourage other structs to implement
  12. // the Sample API as appropriate.
  13. type Sample interface {
  14. Clear()
  15. Size() int
  16. Update(int64)
  17. Values() []int64
  18. }
  19. // An exponentially-decaying sample using a forward-decaying priority
  20. // reservoir. See Cormode et al's "Forward Decay: A Practical Time Decay
  21. // Model for Streaming Systems".
  22. //
  23. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  24. type expDecaySample struct {
  25. reservoirSize int
  26. alpha float64
  27. in chan int64
  28. out chan []int64
  29. reset chan bool
  30. }
  31. // Create a new exponentially-decaying sample with the given reservoir size
  32. // and alpha.
  33. func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
  34. s := &expDecaySample{
  35. reservoirSize,
  36. alpha,
  37. make(chan int64),
  38. make(chan []int64),
  39. make(chan bool),
  40. }
  41. go s.arbiter()
  42. return s
  43. }
  44. // Clear all samples.
  45. func (s *expDecaySample) Clear() {
  46. s.reset <- true
  47. }
  48. // Return the size of the sample, which is at most the reservoir size.
  49. func (s *expDecaySample) Size() int {
  50. return len(<-s.out)
  51. }
  52. // Update the sample with a new value.
  53. func (s *expDecaySample) Update(v int64) {
  54. s.in <- v
  55. }
  56. // Return all the values in the sample.
  57. func (s *expDecaySample) Values() []int64 {
  58. return <-s.out
  59. }
  60. // Receive inputs and send outputs. Count and save each input value,
  61. // rescaling the sample if enough time has elapsed since the last rescaling.
  62. // Send a copy of the values as output.
  63. func (s *expDecaySample) arbiter() {
  64. count := 0
  65. values := make(map[float64]int64)
  66. tsStart := time.Seconds()
  67. tsNext := time.Nanoseconds() + rescaleThreshold
  68. var valuesCopy []int64
  69. for {
  70. select {
  71. case v := <-s.in:
  72. ts := time.Seconds()
  73. k := math.Exp(float64(ts - tsStart) * s.alpha) / rand.Float64()
  74. count++
  75. values[k] = v
  76. if count > s.reservoirSize {
  77. min := math.MaxFloat64
  78. for k, _ := range values {
  79. if k < min { min = k }
  80. }
  81. values[min] = 0, false
  82. valuesCopy = make([]int64, s.reservoirSize)
  83. } else {
  84. valuesCopy = make([]int64, count)
  85. }
  86. tsNano := time.Nanoseconds()
  87. if tsNano > tsNext {
  88. tsOldStart := tsStart
  89. tsStart = time.Seconds()
  90. tsNext = tsNano + rescaleThreshold
  91. oldValues := values
  92. values = make(map[float64]int64, len(oldValues))
  93. for k, v := range oldValues {
  94. values[k * math.Exp(-s.alpha * float64(
  95. tsStart - tsOldStart))] = v
  96. }
  97. }
  98. i := 0
  99. for _, v := range values {
  100. valuesCopy[i] = v
  101. i++
  102. }
  103. case s.out <- valuesCopy: // TODO Might need to make another copy here.
  104. case <-s.reset:
  105. count = 0
  106. values = make(map[float64]int64)
  107. valuesCopy = make([]int64, 0)
  108. tsStart = time.Seconds()
  109. tsNext = tsStart + rescaleThreshold
  110. }
  111. }
  112. }
  113. // A uniform sample using Vitter's Algorithm R.
  114. //
  115. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  116. type uniformSample struct {
  117. reservoirSize int
  118. in chan int64
  119. out chan []int64
  120. reset chan bool
  121. }
  122. // Create a new uniform sample with the given reservoir size.
  123. func NewUniformSample(reservoirSize int) Sample {
  124. s := &uniformSample{
  125. reservoirSize,
  126. make(chan int64),
  127. make(chan []int64),
  128. make(chan bool),
  129. }
  130. go s.arbiter()
  131. return s
  132. }
  133. // Clear all samples.
  134. func (s *uniformSample) Clear() {
  135. s.reset <- true
  136. }
  137. // Return the size of the sample, which is at most the reservoir size.
  138. func (s *uniformSample) Size() int {
  139. return len(<-s.out)
  140. }
  141. // Update the sample with a new value.
  142. func (s *uniformSample) Update(v int64) {
  143. s.in <- v
  144. }
  145. // Return all the values in the sample.
  146. func (s *uniformSample) Values() []int64 {
  147. return <-s.out
  148. }
  149. // Receive inputs and send outputs. Count and save each input value at a
  150. // random index. Send a copy of the values as output.
  151. func (s *uniformSample) arbiter() {
  152. count := 0
  153. values := make([]int64, s.reservoirSize)
  154. var valuesCopy []int64
  155. for {
  156. select {
  157. case v := <-s.in:
  158. count++
  159. if count < s.reservoirSize {
  160. values[count - 1] = v
  161. valuesCopy = make([]int64, count)
  162. } else {
  163. values[rand.Intn(s.reservoirSize)] = v
  164. valuesCopy = make([]int64, len(values))
  165. }
  166. for i := 0; i < len(valuesCopy); i++ { valuesCopy[i] = values[i] }
  167. case s.out <- valuesCopy: // TODO Might need to make another copy here.
  168. case <-s.reset:
  169. count = 0
  170. values = make([]int64, s.reservoirSize)
  171. valuesCopy = make([]int64, 0)
  172. }
  173. }
  174. }