sample.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. package metrics
  2. import (
  3. "container/heap"
  4. "math"
  5. "math/rand"
  6. "sort"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. const rescaleThreshold = time.Hour
  12. // Samples maintain a statistically-significant selection of values from
  13. // a stream.
  14. type Sample interface {
  15. Clear()
  16. Count() int64
  17. Max() int64
  18. Mean() float64
  19. Min() int64
  20. Percentile(float64) float64
  21. Percentiles([]float64) []float64
  22. Size() int
  23. StdDev() float64
  24. Sum() int64
  25. Update(int64)
  26. Values() []int64
  27. Variance() float64
  28. }
  29. // ExpDecaySample is an exponentially-decaying sample using a forward-decaying
  30. // priority reservoir. See Cormode et al's "Forward Decay: A Practical Time
  31. // Decay Model for Streaming Systems".
  32. //
  33. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  34. type ExpDecaySample struct {
  35. alpha float64
  36. count int64
  37. mutex sync.Mutex
  38. reservoirSize int
  39. t0, t1 time.Time
  40. values expDecaySampleHeap
  41. }
  42. // NewExpDecaySample constructs a new exponentially-decaying sample with the
  43. // given reservoir size and alpha.
  44. func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
  45. if UseNilMetrics {
  46. return NilSample{}
  47. }
  48. s := &ExpDecaySample{
  49. alpha: alpha,
  50. reservoirSize: reservoirSize,
  51. t0: time.Now(),
  52. values: make(expDecaySampleHeap, 0, reservoirSize),
  53. }
  54. s.t1 = time.Now().Add(rescaleThreshold)
  55. return s
  56. }
  57. // Clear clears all samples.
  58. func (s *ExpDecaySample) Clear() {
  59. s.mutex.Lock()
  60. defer s.mutex.Unlock()
  61. s.count = 0
  62. s.t0 = time.Now()
  63. s.t1 = s.t0.Add(rescaleThreshold)
  64. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  65. }
  66. // Count returns the number of samples recorded, which may exceed the
  67. // reservoir size.
  68. func (s *ExpDecaySample) Count() int64 {
  69. return atomic.LoadInt64(&s.count)
  70. }
  71. // Max returns the maximum value in the sample, which may not be the maximum
  72. // value ever to be part of the sample.
  73. func (s *ExpDecaySample) Max() int64 {
  74. return SampleMax(s.Values())
  75. }
  76. // Mean returns the mean of the values in the sample.
  77. func (s *ExpDecaySample) Mean() float64 {
  78. return SampleMean(s.Values())
  79. }
  80. // Min returns the minimum value in the sample, which may not be the minimum
  81. // value ever to be part of the sample.
  82. func (s *ExpDecaySample) Min() int64 {
  83. return SampleMin(s.Values())
  84. }
  85. // Percentile returns an arbitrary percentile of values in the sample.
  86. func (s *ExpDecaySample) Percentile(p float64) float64 {
  87. return SamplePercentile(s.Values(), p)
  88. }
  89. // Percentiles returns a slice of arbitrary percentiles of values in the
  90. // sample.
  91. func (s *ExpDecaySample) Percentiles(ps []float64) []float64 {
  92. return SamplePercentiles(s.Values(), ps)
  93. }
  94. // Size returns the size of the sample, which is at most the reservoir size.
  95. func (s *ExpDecaySample) Size() int {
  96. s.mutex.Lock()
  97. defer s.mutex.Unlock()
  98. return len(s.values)
  99. }
  100. // StdDev returns the standard deviation of the sample.
  101. func (s *ExpDecaySample) StdDev() float64 {
  102. return SampleStdDev(s.Values())
  103. }
  104. // Sum returns the sum of the values in the sample.
  105. func (s *ExpDecaySample) Sum() int64 {
  106. return SampleSum(s.Values())
  107. }
  108. // Update samples a new value.
  109. func (s *ExpDecaySample) Update(v int64) {
  110. s.update(time.Now(), v)
  111. }
  112. // Values returns a copy of the values in the sample.
  113. func (s *ExpDecaySample) Values() []int64 {
  114. s.mutex.Lock()
  115. defer s.mutex.Unlock()
  116. values := make([]int64, len(s.values))
  117. for i, v := range s.values {
  118. values[i] = v.v
  119. }
  120. return values
  121. }
  122. // Variance returns the variance of the values in the sample.
  123. func (s *ExpDecaySample) Variance() float64 {
  124. return SampleVariance(s.Values())
  125. }
  126. // update samples a new value at a particular timestamp. This is a method all
  127. // its own to facilitate testing.
  128. func (s *ExpDecaySample) update(t time.Time, v int64) {
  129. s.mutex.Lock()
  130. defer s.mutex.Unlock()
  131. s.count++
  132. if len(s.values) == s.reservoirSize {
  133. heap.Pop(&s.values)
  134. }
  135. heap.Push(&s.values, expDecaySample{
  136. k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
  137. v: v,
  138. })
  139. if t.After(s.t1) {
  140. values := s.values
  141. t0 := s.t0
  142. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  143. s.t0 = t
  144. s.t1 = s.t0.Add(rescaleThreshold)
  145. for _, v := range values {
  146. v.k = v.k * math.Exp(-s.alpha*float64(s.t0.Sub(t0)))
  147. heap.Push(&s.values, v)
  148. }
  149. }
  150. }
  151. // NilSample is a no-op Sample.
  152. type NilSample struct{}
  153. // Clear is a no-op.
  154. func (NilSample) Clear() {}
  155. // Count is a no-op.
  156. func (NilSample) Count() int64 { return 0 }
  157. // Max is a no-op.
  158. func (NilSample) Max() int64 { return 0 }
  159. // Mean is a no-op.
  160. func (NilSample) Mean() float64 { return 0.0 }
  161. // Min is a no-op.
  162. func (NilSample) Min() int64 { return 0 }
  163. // Percentile is a no-op.
  164. func (NilSample) Percentile(p float64) float64 { return 0.0 }
  165. // Percentiles is a no-op.
  166. func (NilSample) Percentiles(ps []float64) []float64 {
  167. return make([]float64, len(ps))
  168. }
  169. // Size is a no-op.
  170. func (NilSample) Size() int { return 0 }
  171. // No-op.
  172. func (NilSample) StdDev() float64 { return 0.0 }
  173. // Sum is a no-op.
  174. func (NilSample) Sum() int64 { return 0 }
  175. // Update is a no-op.
  176. func (NilSample) Update(v int64) {}
  177. // Values is a no-op.
  178. func (NilSample) Values() []int64 { return []int64{} }
  179. // Variance is a no-op.
  180. func (NilSample) Variance() float64 { return 0.0 }
  181. // SampleMax returns the maximum value of the slice of int64.
  182. func SampleMax(values []int64) int64 {
  183. if 0 == len(values) {
  184. return 0
  185. }
  186. var max int64 = math.MinInt64
  187. for _, v := range values {
  188. if max < v {
  189. max = v
  190. }
  191. }
  192. return max
  193. }
  194. // SampleMean returns the mean value of the slice of int64.
  195. func SampleMean(values []int64) float64 {
  196. if 0 == len(values) {
  197. return 0.0
  198. }
  199. return float64(SampleSum(values)) / float64(len(values))
  200. }
  201. // SampleMin returns the minimum value of the slice of int64.
  202. func SampleMin(values []int64) int64 {
  203. if 0 == len(values) {
  204. return 0
  205. }
  206. var min int64 = math.MaxInt64
  207. for _, v := range values {
  208. if min > v {
  209. min = v
  210. }
  211. }
  212. return min
  213. }
  214. // SamplePercentiles returns an arbitrary percentile of the slice of int64.
  215. func SamplePercentile(values int64Slice, p float64) float64 {
  216. return SamplePercentiles(values, []float64{p})[0]
  217. }
  218. // SamplePercentiles returns a slice of arbitrary percentiles of the slice of
  219. // int64.
  220. func SamplePercentiles(values int64Slice, ps []float64) []float64 {
  221. scores := make([]float64, len(ps))
  222. size := len(values)
  223. if size > 0 {
  224. sort.Sort(values)
  225. for i, p := range ps {
  226. pos := p * float64(size+1)
  227. if pos < 1.0 {
  228. scores[i] = float64(values[0])
  229. } else if pos >= float64(size) {
  230. scores[i] = float64(values[size-1])
  231. } else {
  232. lower := float64(values[int(pos)-1])
  233. upper := float64(values[int(pos)])
  234. scores[i] = lower + (pos-math.Floor(pos))*(upper-lower)
  235. }
  236. }
  237. }
  238. return scores
  239. }
  240. // SampleStdDev returns the standard deviation of the slice of int64.
  241. func SampleStdDev(values []int64) float64 {
  242. return math.Sqrt(SampleVariance(values))
  243. }
  244. // SampleSum returns the sum of the slice of int64.
  245. func SampleSum(values []int64) int64 {
  246. var sum int64
  247. for _, v := range values {
  248. sum += v
  249. }
  250. return sum
  251. }
  252. // SampleVariance returns the variance of the slice of int64.
  253. func SampleVariance(values []int64) float64 {
  254. m := SampleMean(values)
  255. var sum float64
  256. for _, v := range values {
  257. d := float64(v) - m
  258. sum += d * d
  259. }
  260. return sum / float64(len(values))
  261. }
  262. // A uniform sample using Vitter's Algorithm R.
  263. //
  264. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  265. type UniformSample struct {
  266. count int64
  267. mutex sync.Mutex
  268. reservoirSize int
  269. values []int64
  270. }
  271. // NewUniformSample constructs a new uniform sample with the given reservoir
  272. // size.
  273. func NewUniformSample(reservoirSize int) Sample {
  274. if UseNilMetrics {
  275. return NilSample{}
  276. }
  277. return &UniformSample{reservoirSize: reservoirSize}
  278. }
  279. // Clear clears all samples.
  280. func (s *UniformSample) Clear() {
  281. s.mutex.Lock()
  282. defer s.mutex.Unlock()
  283. s.count = 0
  284. s.values = make([]int64, 0, s.reservoirSize)
  285. }
  286. // Count returns the number of samples recorded, which may exceed the
  287. // reservoir size.
  288. func (s *UniformSample) Count() int64 {
  289. return atomic.LoadInt64(&s.count)
  290. }
  291. // Max returns the maximum value in the sample, which may not be the maximum
  292. // value ever to be part of the sample.
  293. func (s *UniformSample) Max() int64 {
  294. s.mutex.Lock()
  295. defer s.mutex.Unlock()
  296. return SampleMax(s.values)
  297. }
  298. // Mean returns the mean of the values in the sample.
  299. func (s *UniformSample) Mean() float64 {
  300. s.mutex.Lock()
  301. defer s.mutex.Unlock()
  302. return SampleMean(s.values)
  303. }
  304. // Min returns the minimum value in the sample, which may not be the minimum
  305. // value ever to be part of the sample.
  306. func (s *UniformSample) Min() int64 {
  307. s.mutex.Lock()
  308. defer s.mutex.Unlock()
  309. return SampleMin(s.values)
  310. }
  311. // Percentile returns an arbitrary percentile of values in the sample.
  312. func (s *UniformSample) Percentile(p float64) float64 {
  313. s.mutex.Lock()
  314. defer s.mutex.Unlock()
  315. return SamplePercentile(s.values, p)
  316. }
  317. // Percentiles returns a slice of arbitrary percentiles of values in the
  318. // sample.
  319. func (s *UniformSample) Percentiles(ps []float64) []float64 {
  320. s.mutex.Lock()
  321. defer s.mutex.Unlock()
  322. return SamplePercentiles(s.values, ps)
  323. }
  324. // Size returns the size of the sample, which is at most the reservoir size.
  325. func (s *UniformSample) Size() int {
  326. s.mutex.Lock()
  327. defer s.mutex.Unlock()
  328. return len(s.values)
  329. }
  330. // Snapshot returns a read-only copy of the sample.
  331. func (s *UniformSample) Snapshot() Sample {
  332. s.mutex.Lock()
  333. defer s.mutex.Unlock()
  334. values := make([]int64, len(s.values))
  335. copy(values, s.values)
  336. return &SampleSnapshot{
  337. count: s.count,
  338. values: values,
  339. }
  340. }
  341. // StdDev returns the standard deviation of the values in the sample.
  342. func (s *UniformSample) StdDev() float64 {
  343. s.mutex.Lock()
  344. defer s.mutex.Unlock()
  345. return SampleStdDev(s.values)
  346. }
  347. // Sum returns the sum of the values in the sample.
  348. func (s *UniformSample) Sum() int64 {
  349. return SampleSum(s.values)
  350. }
  351. // Update samples a new value.
  352. func (s *UniformSample) Update(v int64) {
  353. s.mutex.Lock()
  354. defer s.mutex.Unlock()
  355. s.count++
  356. if len(s.values) < s.reservoirSize {
  357. s.values = append(s.values, v)
  358. } else {
  359. s.values[rand.Intn(s.reservoirSize)] = v
  360. }
  361. }
  362. // Values returns a copy of the values in the sample.
  363. func (s *UniformSample) Values() []int64 {
  364. s.mutex.Lock()
  365. defer s.mutex.Unlock()
  366. values := make([]int64, len(s.values))
  367. copy(values, s.values)
  368. return values
  369. }
  370. // Variance returns the variance of the values in the sample.
  371. func (s *UniformSample) Variance() float64 {
  372. s.mutex.Lock()
  373. defer s.mutex.Unlock()
  374. return SampleVariance(s.values)
  375. }
  376. // expDecaySample represents an individual sample in a heap.
  377. type expDecaySample struct {
  378. k float64
  379. v int64
  380. }
  381. // expDecaySampleHeap is a min-heap of expDecaySamples.
  382. type expDecaySampleHeap []expDecaySample
  383. func (q expDecaySampleHeap) Len() int {
  384. return len(q)
  385. }
  386. func (q expDecaySampleHeap) Less(i, j int) bool {
  387. return q[i].k < q[j].k
  388. }
  389. func (q *expDecaySampleHeap) Pop() interface{} {
  390. q_ := *q
  391. n := len(q_)
  392. i := q_[n-1]
  393. q_ = q_[0 : n-1]
  394. *q = q_
  395. return i
  396. }
  397. func (q *expDecaySampleHeap) Push(x interface{}) {
  398. q_ := *q
  399. n := len(q_)
  400. q_ = q_[0 : n+1]
  401. q_[n] = x.(expDecaySample)
  402. *q = q_
  403. }
  404. func (q expDecaySampleHeap) Swap(i, j int) {
  405. q[i], q[j] = q[j], q[i]
  406. }