sample.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. package metrics
  2. import (
  3. "container/heap"
  4. "math"
  5. "math/rand"
  6. "sort"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. const rescaleThreshold = time.Hour
  12. // Samples maintain a statistically-significant selection of values from
  13. // a stream.
  14. //
  15. // This is an interface so as to encourage other structs to implement
  16. // the Sample API as appropriate.
  17. type Sample interface {
  18. Clear()
  19. Count() int64
  20. Dup() Sample
  21. Max() int64
  22. Mean() float64
  23. Min() int64
  24. Percentile(float64) float64
  25. Percentiles([]float64) []float64
  26. Size() int
  27. StdDev() float64
  28. Sum() int64
  29. Update(int64)
  30. Values() []int64
  31. Variance() float64
  32. }
  33. // ExpDecaySample is an exponentially-decaying sample using a forward-decaying
  34. // priority reservoir. See Cormode et al's "Forward Decay: A Practical Time
  35. // Decay Model for Streaming Systems".
  36. //
  37. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  38. type ExpDecaySample struct {
  39. alpha float64
  40. count int64
  41. mutex sync.Mutex
  42. reservoirSize int
  43. t0, t1 time.Time
  44. values expDecaySampleHeap
  45. }
  46. // NewExpDecaySample constructs a new exponentially-decaying sample with the
  47. // given reservoir size and alpha.
  48. func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
  49. if UseNilMetrics {
  50. return NilSample{}
  51. }
  52. s := &ExpDecaySample{
  53. alpha: alpha,
  54. reservoirSize: reservoirSize,
  55. t0: time.Now(),
  56. values: make(expDecaySampleHeap, 0, reservoirSize),
  57. }
  58. s.t1 = time.Now().Add(rescaleThreshold)
  59. return s
  60. }
  61. // Clear clears all samples.
  62. func (s *ExpDecaySample) Clear() {
  63. s.mutex.Lock()
  64. defer s.mutex.Unlock()
  65. s.count = 0
  66. s.t0 = time.Now()
  67. s.t1 = s.t0.Add(rescaleThreshold)
  68. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  69. }
  70. // Dup returns a copy of the sample.
  71. func (s *ExpDecaySample) Dup() Sample {
  72. s.mutex.Lock()
  73. defer s.mutex.Unlock()
  74. values := make(expDecaySampleHeap, len(s.values))
  75. copy(values, s.values)
  76. return &ExpDecaySample{
  77. alpha: s.alpha,
  78. count: s.count,
  79. reservoirSize: s.reservoirSize,
  80. t0: s.t0,
  81. t1: s.t1,
  82. values: values,
  83. }
  84. }
  85. // Count returns the number of samples recorded, which may exceed the
  86. // reservoir size.
  87. func (s *ExpDecaySample) Count() int64 {
  88. return atomic.LoadInt64(&s.count)
  89. }
  90. // Max returns the maximum value in the sample, which may not be the maximum
  91. // value ever to be part of the sample.
  92. func (s *ExpDecaySample) Max() int64 {
  93. return SampleMax(s.Values())
  94. }
  95. // Return the mean of all values seen since the histogram was last cleared.
  96. func (s *ExpDecaySample) Mean() float64 {
  97. return SampleMean(s.Values())
  98. }
  99. // Min returns the minimum value in the sample, which may not be the minimum
  100. // value ever to be part of the sample.
  101. func (s *ExpDecaySample) Min() int64 {
  102. return SampleMin(s.Values())
  103. }
  104. // Percentile returns an arbitrary percentile of sampled values.
  105. func (s *ExpDecaySample) Percentile(p float64) float64 {
  106. return SamplePercentile(s.Values(), p)
  107. }
  108. // Percentiles returns a slice of arbitrary percentiles of sampled values.
  109. func (s *ExpDecaySample) Percentiles(ps []float64) []float64 {
  110. return SamplePercentiles(s.Values(), ps)
  111. }
  112. // Size returns the size of the sample, which is at most the reservoir size.
  113. func (s *ExpDecaySample) Size() int {
  114. s.mutex.Lock()
  115. defer s.mutex.Unlock()
  116. return len(s.values)
  117. }
  118. // StdDev returns the standard deviation of the sample.
  119. func (s *ExpDecaySample) StdDev() float64 {
  120. return SampleStdDev(s.Values())
  121. }
  122. // Sum returns the sum of the sample.
  123. func (s *ExpDecaySample) Sum() int64 {
  124. return SampleSum(s.Values())
  125. }
  126. // Update samples a new value.
  127. func (s *ExpDecaySample) Update(v int64) {
  128. s.update(time.Now(), v)
  129. }
  130. // Values returns a copy of the values in the sample.
  131. func (s *ExpDecaySample) Values() []int64 {
  132. s.mutex.Lock()
  133. defer s.mutex.Unlock()
  134. values := make([]int64, len(s.values))
  135. for i, v := range s.values {
  136. values[i] = v.v
  137. }
  138. return values
  139. }
  140. // Variance returns the variance of the sample.
  141. func (s *ExpDecaySample) Variance() float64 {
  142. return SampleVariance(s.Values())
  143. }
  144. // update samples a new value at a particular timestamp. This is a method all
  145. // its own to facilitate testing.
  146. func (s *ExpDecaySample) update(t time.Time, v int64) {
  147. s.mutex.Lock()
  148. defer s.mutex.Unlock()
  149. s.count++
  150. if len(s.values) == s.reservoirSize {
  151. heap.Pop(&s.values)
  152. }
  153. heap.Push(&s.values, expDecaySample{
  154. k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
  155. v: v,
  156. })
  157. if t.After(s.t1) {
  158. values := s.values
  159. t0 := s.t0
  160. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  161. s.t0 = t
  162. s.t1 = s.t0.Add(rescaleThreshold)
  163. for _, v := range values {
  164. v.k = v.k * math.Exp(-s.alpha*float64(s.t0.Sub(t0)))
  165. heap.Push(&s.values, v)
  166. }
  167. }
  168. }
  169. // No-op Sample.
  170. type NilSample struct{}
  171. // No-op.
  172. func (NilSample) Clear() {}
  173. // No-op.
  174. func (NilSample) Count() int64 { return 0 }
  175. // No-op.
  176. func (NilSample) Dup() Sample { return NilSample{} }
  177. // No-op.
  178. func (NilSample) Max() int64 { return 0 }
  179. // No-op.
  180. func (NilSample) Mean() float64 { return 0.0 }
  181. // No-op.
  182. func (NilSample) Min() int64 { return 0 }
  183. // No-op.
  184. func (NilSample) Percentile(p float64) float64 { return 0.0 }
  185. // No-op.
  186. func (NilSample) Percentiles(ps []float64) []float64 {
  187. return make([]float64, len(ps))
  188. }
  189. // No-op.
  190. func (NilSample) Size() int { return 0 }
  191. // No-op.
  192. func (NilSample) StdDev() float64 { return 0.0 }
  193. // No-op.
  194. func (NilSample) Sum() int64 { return 0 }
  195. // No-op.
  196. func (NilSample) Update(v int64) {}
  197. // No-op.
  198. func (NilSample) Values() []int64 { return []int64{} }
  199. // No-op.
  200. func (NilSample) Variance() float64 { return 0.0 }
  201. // SampleMax returns the maximum value of the slice of int64.
  202. func SampleMax(values []int64) int64 {
  203. if 0 == len(values) {
  204. return 0
  205. }
  206. var max int64 = math.MinInt64
  207. for _, v := range values {
  208. if max < v {
  209. max = v
  210. }
  211. }
  212. return max
  213. }
  214. // SampleMean returns the mean value of the slice of int64.
  215. func SampleMean(values []int64) float64 {
  216. if 0 == len(values) {
  217. return 0.0
  218. }
  219. return float64(SampleSum(values)) / float64(len(values))
  220. }
  221. // SampleMin returns the minimum value of the slice of int64.
  222. func SampleMin(values []int64) int64 {
  223. if 0 == len(values) {
  224. return 0
  225. }
  226. var min int64 = math.MaxInt64
  227. for _, v := range values {
  228. if min > v {
  229. min = v
  230. }
  231. }
  232. return min
  233. }
  234. // SamplePercentiles returns an arbitrary percentile of the slice of int64.
  235. func SamplePercentile(values int64Slice, p float64) float64 {
  236. return SamplePercentiles(values, []float64{p})[0]
  237. }
  238. // SamplePercentiles returns a slice of arbitrary percentiles of the slice of
  239. // int64.
  240. func SamplePercentiles(values int64Slice, ps []float64) []float64 {
  241. scores := make([]float64, len(ps))
  242. size := len(values)
  243. if size > 0 {
  244. sort.Sort(values)
  245. for i, p := range ps {
  246. pos := p * float64(size+1)
  247. if pos < 1.0 {
  248. scores[i] = float64(values[0])
  249. } else if pos >= float64(size) {
  250. scores[i] = float64(values[size-1])
  251. } else {
  252. lower := float64(values[int(pos)-1])
  253. upper := float64(values[int(pos)])
  254. scores[i] = lower + (pos-math.Floor(pos))*(upper-lower)
  255. }
  256. }
  257. }
  258. return scores
  259. }
  260. // SampleStdDev returns the standard deviation of the slice of int64.
  261. func SampleStdDev(values []int64) float64 {
  262. return math.Sqrt(SampleVariance(values))
  263. }
  264. // SampleSum returns the sum of the slice of int64.
  265. func SampleSum(values []int64) int64 {
  266. var sum int64
  267. for _, v := range values {
  268. sum += v
  269. }
  270. return sum
  271. }
  272. // SampleVariance returns the variance of the slice of int64.
  273. func SampleVariance(values []int64) float64 {
  274. m := SampleMean(values)
  275. var sum float64
  276. for _, v := range values {
  277. d := float64(v) - m
  278. sum += d * d
  279. }
  280. return sum / float64(len(values))
  281. }
  282. // A uniform sample using Vitter's Algorithm R.
  283. //
  284. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  285. type UniformSample struct {
  286. count int64
  287. mutex sync.Mutex
  288. reservoirSize int
  289. values []int64
  290. }
  291. // Create a new uniform sample with the given reservoir size.
  292. func NewUniformSample(reservoirSize int) Sample {
  293. if UseNilMetrics {
  294. return NilSample{}
  295. }
  296. return &UniformSample{reservoirSize: reservoirSize}
  297. }
  298. // Clear all samples.
  299. func (s *UniformSample) Clear() {
  300. s.mutex.Lock()
  301. defer s.mutex.Unlock()
  302. s.count = 0
  303. s.values = make([]int64, 0, s.reservoirSize)
  304. }
  305. // Count returns the number of samples recorded, which may exceed the
  306. // reservoir size.
  307. func (s *UniformSample) Count() int64 {
  308. return atomic.LoadInt64(&s.count)
  309. }
  310. // Dup returns a copy of the sample.
  311. func (s *UniformSample) Dup() Sample {
  312. s.mutex.Lock()
  313. defer s.mutex.Unlock()
  314. values := make([]int64, len(s.values))
  315. copy(values, s.values)
  316. return &UniformSample{
  317. count: s.count,
  318. reservoirSize: s.reservoirSize,
  319. values: values,
  320. }
  321. }
  322. // Max returns the maximum value in the sample, which may not be the maximum
  323. // value ever to be part of the sample.
  324. func (s *UniformSample) Max() int64 {
  325. s.mutex.Lock()
  326. defer s.mutex.Unlock()
  327. return SampleMax(s.values)
  328. }
  329. // Return the mean of all values seen since the histogram was last cleared.
  330. func (s *UniformSample) Mean() float64 {
  331. s.mutex.Lock()
  332. defer s.mutex.Unlock()
  333. return SampleMean(s.values)
  334. }
  335. // Min returns the minimum value in the sample, which may not be the minimum
  336. // value ever to be part of the sample.
  337. func (s *UniformSample) Min() int64 {
  338. s.mutex.Lock()
  339. defer s.mutex.Unlock()
  340. return SampleMin(s.values)
  341. }
  342. // Percentile returns an arbitrary percentile of sampled values.
  343. func (s *UniformSample) Percentile(p float64) float64 {
  344. s.mutex.Lock()
  345. defer s.mutex.Unlock()
  346. return SamplePercentile(s.values, p)
  347. }
  348. // Percentiles returns a slice of arbitrary percentiles of sampled values.
  349. func (s *UniformSample) Percentiles(ps []float64) []float64 {
  350. s.mutex.Lock()
  351. defer s.mutex.Unlock()
  352. return SamplePercentiles(s.values, ps)
  353. }
  354. // Return the size of the sample, which is at most the reservoir size.
  355. func (s *UniformSample) Size() int {
  356. s.mutex.Lock()
  357. defer s.mutex.Unlock()
  358. return len(s.values)
  359. }
  360. // StdDev returns the standard deviation of the sample.
  361. func (s *UniformSample) StdDev() float64 {
  362. s.mutex.Lock()
  363. defer s.mutex.Unlock()
  364. return SampleStdDev(s.values)
  365. }
  366. // Sum returns the sum of the sample.
  367. func (s *UniformSample) Sum() int64 {
  368. return SampleSum(s.values)
  369. }
  370. // Update the sample with a new value.
  371. func (s *UniformSample) Update(v int64) {
  372. s.mutex.Lock()
  373. defer s.mutex.Unlock()
  374. s.count++
  375. if len(s.values) < s.reservoirSize {
  376. s.values = append(s.values, v)
  377. } else {
  378. s.values[rand.Intn(s.reservoirSize)] = v
  379. }
  380. }
  381. // Return all the values in the sample.
  382. func (s *UniformSample) Values() []int64 {
  383. s.mutex.Lock()
  384. defer s.mutex.Unlock()
  385. values := make([]int64, len(s.values))
  386. copy(values, s.values)
  387. return values
  388. }
  389. // Variance returns the variance of the sample.
  390. func (s *UniformSample) Variance() float64 {
  391. s.mutex.Lock()
  392. defer s.mutex.Unlock()
  393. return SampleVariance(s.values)
  394. }
  395. // expDecaySample represents an individual sample in a heap.
  396. type expDecaySample struct {
  397. k float64
  398. v int64
  399. }
  400. // expDecaySampleHeap is a min-heap of expDecaySamples.
  401. type expDecaySampleHeap []expDecaySample
  402. func (q expDecaySampleHeap) Len() int {
  403. return len(q)
  404. }
  405. func (q expDecaySampleHeap) Less(i, j int) bool {
  406. return q[i].k < q[j].k
  407. }
  408. func (q *expDecaySampleHeap) Pop() interface{} {
  409. q_ := *q
  410. n := len(q_)
  411. i := q_[n-1]
  412. q_ = q_[0 : n-1]
  413. *q = q_
  414. return i
  415. }
  416. func (q *expDecaySampleHeap) Push(x interface{}) {
  417. q_ := *q
  418. n := len(q_)
  419. q_ = q_[0 : n+1]
  420. q_[n] = x.(expDecaySample)
  421. *q = q_
  422. }
  423. func (q expDecaySampleHeap) Swap(i, j int) {
  424. q[i], q[j] = q[j], q[i]
  425. }