sample.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. package metrics
  2. import (
  3. "container/heap"
  4. "math"
  5. "math/rand"
  6. "sort"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. const rescaleThreshold = time.Hour
  12. // Samples maintain a statistically-significant selection of values from
  13. // a stream.
  14. //
  15. // This is an interface so as to encourage other structs to implement
  16. // the Sample API as appropriate.
  17. type Sample interface {
  18. Clear()
  19. Count() int64
  20. Dup() Sample
  21. Max() int64
  22. Mean() float64
  23. Min() int64
  24. Percentile(float64) float64
  25. Percentiles([]float64) []float64
  26. Size() int
  27. StdDev() float64
  28. Update(int64)
  29. Values() []int64
  30. Variance() float64
  31. }
  32. // ExpDecaySample is an exponentially-decaying sample using a forward-decaying
  33. // priority reservoir. See Cormode et al's "Forward Decay: A Practical Time
  34. // Decay Model for Streaming Systems".
  35. //
  36. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  37. type ExpDecaySample struct {
  38. alpha float64
  39. count int64
  40. mutex sync.Mutex
  41. reservoirSize int
  42. t0, t1 time.Time
  43. values expDecaySampleHeap
  44. }
  45. // NewExpDecaySample constructs a new exponentially-decaying sample with the
  46. // given reservoir size and alpha.
  47. func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
  48. if UseNilMetrics {
  49. return NilSample{}
  50. }
  51. s := &ExpDecaySample{
  52. alpha: alpha,
  53. reservoirSize: reservoirSize,
  54. t0: time.Now(),
  55. values: make(expDecaySampleHeap, 0, reservoirSize),
  56. }
  57. s.t1 = time.Now().Add(rescaleThreshold)
  58. return s
  59. }
  60. // Clear clears all samples.
  61. func (s *ExpDecaySample) Clear() {
  62. s.mutex.Lock()
  63. defer s.mutex.Unlock()
  64. s.count = 0
  65. s.t0 = time.Now()
  66. s.t1 = s.t0.Add(rescaleThreshold)
  67. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  68. }
  69. // Dup returns a copy of the sample.
  70. func (s *ExpDecaySample) Dup() Sample {
  71. s.mutex.Lock()
  72. defer s.mutex.Unlock()
  73. values := make(expDecaySampleHeap, len(s.values))
  74. copy(values, s.values)
  75. return &ExpDecaySample{
  76. alpha: s.alpha,
  77. count: s.count,
  78. reservoirSize: s.reservoirSize,
  79. t0: s.t0,
  80. t1: s.t1,
  81. values: values,
  82. }
  83. }
  84. // Count returns the number of samples recorded, which may exceed the
  85. // reservoir size.
  86. func (s *ExpDecaySample) Count() int64 {
  87. return atomic.LoadInt64(&s.count)
  88. }
  89. // Max returns the maximum value in the sample, which may not be the maximum
  90. // value ever to be part of the sample.
  91. func (s *ExpDecaySample) Max() int64 {
  92. return SampleMax(s.Values())
  93. }
  94. // Return the mean of all values seen since the histogram was last cleared.
  95. func (s *ExpDecaySample) Mean() float64 {
  96. return SampleMean(s.Values())
  97. }
  98. // Min returns the minimum value in the sample, which may not be the minimum
  99. // value ever to be part of the sample.
  100. func (s *ExpDecaySample) Min() int64 {
  101. return SampleMin(s.Values())
  102. }
  103. // Percentile returns an arbitrary percentile of sampled values.
  104. func (s *ExpDecaySample) Percentile(p float64) float64 {
  105. return SamplePercentile(s.Values(), p)
  106. }
  107. // Percentiles returns a slice of arbitrary percentiles of sampled values.
  108. func (s *ExpDecaySample) Percentiles(ps []float64) []float64 {
  109. return SamplePercentiles(s.Values(), ps)
  110. }
  111. // Size returns the size of the sample, which is at most the reservoir size.
  112. func (s *ExpDecaySample) Size() int {
  113. s.mutex.Lock()
  114. defer s.mutex.Unlock()
  115. return len(s.values)
  116. }
  117. // StdDev returns the standard deviation of the sample.
  118. func (s *ExpDecaySample) StdDev() float64 {
  119. return SampleStdDev(s.Values())
  120. }
  121. // Update samples a new value.
  122. func (s *ExpDecaySample) Update(v int64) {
  123. s.update(time.Now(), v)
  124. }
  125. // Values returns a copy of the values in the sample.
  126. func (s *ExpDecaySample) Values() []int64 {
  127. s.mutex.Lock()
  128. defer s.mutex.Unlock()
  129. values := make([]int64, len(s.values))
  130. for i, v := range s.values {
  131. values[i] = v.v
  132. }
  133. return values
  134. }
  135. // Variance returns the variance of the sample.
  136. func (s *ExpDecaySample) Variance() float64 {
  137. return SampleVariance(s.Values())
  138. }
  139. // update samples a new value at a particular timestamp. This is a method all
  140. // its own to facilitate testing.
  141. func (s *ExpDecaySample) update(t time.Time, v int64) {
  142. s.mutex.Lock()
  143. defer s.mutex.Unlock()
  144. s.count++
  145. if len(s.values) == s.reservoirSize {
  146. heap.Pop(&s.values)
  147. }
  148. heap.Push(&s.values, expDecaySample{
  149. k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
  150. v: v,
  151. })
  152. if t.After(s.t1) {
  153. values := s.values
  154. t0 := s.t0
  155. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  156. s.t0 = t
  157. s.t1 = s.t0.Add(rescaleThreshold)
  158. for _, v := range values {
  159. v.k = v.k * math.Exp(-s.alpha*float64(s.t0.Sub(t0)))
  160. heap.Push(&s.values, v)
  161. }
  162. }
  163. }
  164. // No-op Sample.
  165. type NilSample struct{}
  166. // No-op.
  167. func (NilSample) Clear() {}
  168. // No-op.
  169. func (NilSample) Count() int64 { return 0 }
  170. // No-op.
  171. func (NilSample) Dup() Sample { return NilSample{} }
  172. // No-op.
  173. func (NilSample) Max() int64 { return 0 }
  174. // No-op.
  175. func (NilSample) Mean() float64 { return 0.0 }
  176. // No-op.
  177. func (NilSample) Min() int64 { return 0 }
  178. // No-op.
  179. func (NilSample) Percentile(p float64) float64 { return 0.0 }
  180. // No-op.
  181. func (NilSample) Percentiles(ps []float64) []float64 {
  182. return make([]float64, len(ps))
  183. }
  184. // No-op.
  185. func (NilSample) Size() int { return 0 }
  186. // No-op.
  187. func (NilSample) StdDev() float64 { return 0.0 }
  188. // No-op.
  189. func (NilSample) Update(v int64) {}
  190. // No-op.
  191. func (NilSample) Values() []int64 { return []int64{} }
  192. // No-op.
  193. func (NilSample) Variance() float64 { return 0.0 }
  194. // SampleMax returns the maximum value of the slice of int64.
  195. func SampleMax(values []int64) int64 {
  196. if 0 == len(values) {
  197. return 0
  198. }
  199. var max int64 = math.MinInt64
  200. for _, v := range values {
  201. if max < v {
  202. max = v
  203. }
  204. }
  205. return max
  206. }
  207. // SampleMean returns the mean value of the slice of int64.
  208. func SampleMean(values []int64) float64 {
  209. if 0 == len(values) {
  210. return 0.0
  211. }
  212. var sum int64
  213. for _, v := range values {
  214. sum += v
  215. }
  216. return float64(sum) / float64(len(values))
  217. }
  218. // SampleMin returns the minimum value of the slice of int64.
  219. func SampleMin(values []int64) int64 {
  220. if 0 == len(values) {
  221. return 0
  222. }
  223. var min int64 = math.MaxInt64
  224. for _, v := range values {
  225. if min > v {
  226. min = v
  227. }
  228. }
  229. return min
  230. }
  231. // SamplePercentiles returns an arbitrary percentile of the slice of int64.
  232. func SamplePercentile(values int64Slice, p float64) float64 {
  233. return SamplePercentiles(values, []float64{p})[0]
  234. }
  235. // SamplePercentiles returns a slice of arbitrary percentiles of the slice of
  236. // int64.
  237. func SamplePercentiles(values int64Slice, ps []float64) []float64 {
  238. scores := make([]float64, len(ps))
  239. size := len(values)
  240. if size > 0 {
  241. sort.Sort(values)
  242. for i, p := range ps {
  243. pos := p * float64(size+1)
  244. if pos < 1.0 {
  245. scores[i] = float64(values[0])
  246. } else if pos >= float64(size) {
  247. scores[i] = float64(values[size-1])
  248. } else {
  249. lower := float64(values[int(pos)-1])
  250. upper := float64(values[int(pos)])
  251. scores[i] = lower + (pos-math.Floor(pos))*(upper-lower)
  252. }
  253. }
  254. }
  255. return scores
  256. }
  257. // SampleStdDev returns the standard deviation of the slice of int64.
  258. func SampleStdDev(values []int64) float64 {
  259. return math.Sqrt(SampleVariance(values))
  260. }
  261. // SampleVariance returns the variance of the slice of int64.
  262. func SampleVariance(values []int64) float64 {
  263. m := SampleMean(values)
  264. var sum float64
  265. for _, v := range values {
  266. d := float64(v) - m
  267. sum += d * d
  268. }
  269. return sum / float64(len(values))
  270. }
  271. // A uniform sample using Vitter's Algorithm R.
  272. //
  273. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  274. type UniformSample struct {
  275. count int64
  276. mutex sync.Mutex
  277. reservoirSize int
  278. values []int64
  279. }
  280. // Create a new uniform sample with the given reservoir size.
  281. func NewUniformSample(reservoirSize int) Sample {
  282. if UseNilMetrics {
  283. return NilSample{}
  284. }
  285. return &UniformSample{reservoirSize: reservoirSize}
  286. }
  287. // Clear all samples.
  288. func (s *UniformSample) Clear() {
  289. s.mutex.Lock()
  290. defer s.mutex.Unlock()
  291. s.count = 0
  292. s.values = make([]int64, 0, s.reservoirSize)
  293. }
  294. // Count returns the number of samples recorded, which may exceed the
  295. // reservoir size.
  296. func (s *UniformSample) Count() int64 {
  297. return atomic.LoadInt64(&s.count)
  298. }
  299. // Dup returns a copy of the sample.
  300. func (s *UniformSample) Dup() Sample {
  301. s.mutex.Lock()
  302. defer s.mutex.Unlock()
  303. values := make([]int64, len(s.values))
  304. copy(values, s.values)
  305. return &UniformSample{
  306. count: s.count,
  307. reservoirSize: s.reservoirSize,
  308. values: values,
  309. }
  310. }
  311. // Max returns the maximum value in the sample, which may not be the maximum
  312. // value ever to be part of the sample.
  313. func (s *UniformSample) Max() int64 {
  314. s.mutex.Lock()
  315. defer s.mutex.Unlock()
  316. return SampleMax(s.values)
  317. }
  318. // Return the mean of all values seen since the histogram was last cleared.
  319. func (s *UniformSample) Mean() float64 {
  320. s.mutex.Lock()
  321. defer s.mutex.Unlock()
  322. return SampleMean(s.values)
  323. }
  324. // Min returns the minimum value in the sample, which may not be the minimum
  325. // value ever to be part of the sample.
  326. func (s *UniformSample) Min() int64 {
  327. s.mutex.Lock()
  328. defer s.mutex.Unlock()
  329. return SampleMin(s.values)
  330. }
  331. // Percentile returns an arbitrary percentile of sampled values.
  332. func (s *UniformSample) Percentile(p float64) float64 {
  333. s.mutex.Lock()
  334. defer s.mutex.Unlock()
  335. return SamplePercentile(s.values, p)
  336. }
  337. // Percentiles returns a slice of arbitrary percentiles of sampled values.
  338. func (s *UniformSample) Percentiles(ps []float64) []float64 {
  339. s.mutex.Lock()
  340. defer s.mutex.Unlock()
  341. return SamplePercentiles(s.values, ps)
  342. }
  343. // Return the size of the sample, which is at most the reservoir size.
  344. func (s *UniformSample) Size() int {
  345. s.mutex.Lock()
  346. defer s.mutex.Unlock()
  347. return len(s.values)
  348. }
  349. // StdDev returns the standard deviation of the sample.
  350. func (s *UniformSample) StdDev() float64 {
  351. s.mutex.Lock()
  352. defer s.mutex.Unlock()
  353. return SampleStdDev(s.values)
  354. }
  355. // Update the sample with a new value.
  356. func (s *UniformSample) Update(v int64) {
  357. s.mutex.Lock()
  358. defer s.mutex.Unlock()
  359. s.count++
  360. if len(s.values) < s.reservoirSize {
  361. s.values = append(s.values, v)
  362. } else {
  363. s.values[rand.Intn(s.reservoirSize)] = v
  364. }
  365. }
  366. // Return all the values in the sample.
  367. func (s *UniformSample) Values() []int64 {
  368. s.mutex.Lock()
  369. defer s.mutex.Unlock()
  370. values := make([]int64, len(s.values))
  371. copy(values, s.values)
  372. return values
  373. }
  374. // Variance returns the variance of the sample.
  375. func (s *UniformSample) Variance() float64 {
  376. s.mutex.Lock()
  377. defer s.mutex.Unlock()
  378. return SampleVariance(s.values)
  379. }
  380. // expDecaySample represents an individual sample in a heap.
  381. type expDecaySample struct {
  382. k float64
  383. v int64
  384. }
  385. // expDecaySampleHeap is a min-heap of expDecaySamples.
  386. type expDecaySampleHeap []expDecaySample
  387. func (q expDecaySampleHeap) Len() int {
  388. return len(q)
  389. }
  390. func (q expDecaySampleHeap) Less(i, j int) bool {
  391. return q[i].k < q[j].k
  392. }
  393. func (q *expDecaySampleHeap) Pop() interface{} {
  394. q_ := *q
  395. n := len(q_)
  396. i := q_[n-1]
  397. q_ = q_[0 : n-1]
  398. *q = q_
  399. return i
  400. }
  401. func (q *expDecaySampleHeap) Push(x interface{}) {
  402. q_ := *q
  403. n := len(q_)
  404. q_ = q_[0 : n+1]
  405. q_[n] = x.(expDecaySample)
  406. *q = q_
  407. }
  408. func (q expDecaySampleHeap) Swap(i, j int) {
  409. q[i], q[j] = q[j], q[i]
  410. }