sample.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. package metrics
  2. import (
  3. "container/heap"
  4. "math"
  5. "math/rand"
  6. "sort"
  7. "sync"
  8. "time"
  9. )
  10. const rescaleThreshold = time.Hour
  11. // Samples maintain a statistically-significant selection of values from
  12. // a stream.
  13. type Sample interface {
  14. Clear()
  15. Count() int64
  16. Max() int64
  17. Mean() float64
  18. Min() int64
  19. Percentile(float64) float64
  20. Percentiles([]float64) []float64
  21. Size() int
  22. Snapshot() Sample
  23. StdDev() float64
  24. Sum() int64
  25. Update(int64)
  26. Values() []int64
  27. Variance() float64
  28. }
  29. // ExpDecaySample is an exponentially-decaying sample using a forward-decaying
  30. // priority reservoir. See Cormode et al's "Forward Decay: A Practical Time
  31. // Decay Model for Streaming Systems".
  32. //
  33. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  34. type ExpDecaySample struct {
  35. alpha float64
  36. count int64
  37. mutex sync.Mutex
  38. reservoirSize int
  39. t0, t1 time.Time
  40. values expDecaySampleHeap
  41. }
  42. // NewExpDecaySample constructs a new exponentially-decaying sample with the
  43. // given reservoir size and alpha.
  44. func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
  45. if UseNilMetrics {
  46. return NilSample{}
  47. }
  48. s := &ExpDecaySample{
  49. alpha: alpha,
  50. reservoirSize: reservoirSize,
  51. t0: time.Now(),
  52. values: make(expDecaySampleHeap, 0, reservoirSize),
  53. }
  54. s.t1 = time.Now().Add(rescaleThreshold)
  55. return s
  56. }
  57. // Clear clears all samples.
  58. func (s *ExpDecaySample) Clear() {
  59. s.mutex.Lock()
  60. defer s.mutex.Unlock()
  61. s.count = 0
  62. s.t0 = time.Now()
  63. s.t1 = s.t0.Add(rescaleThreshold)
  64. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  65. }
  66. // Count returns the number of samples recorded, which may exceed the
  67. // reservoir size.
  68. func (s *ExpDecaySample) Count() int64 {
  69. s.mutex.Lock()
  70. defer s.mutex.Unlock()
  71. return s.count
  72. }
  73. // Max returns the maximum value in the sample, which may not be the maximum
  74. // value ever to be part of the sample.
  75. func (s *ExpDecaySample) Max() int64 {
  76. return SampleMax(s.Values())
  77. }
  78. // Mean returns the mean of the values in the sample.
  79. func (s *ExpDecaySample) Mean() float64 {
  80. return SampleMean(s.Values())
  81. }
  82. // Min returns the minimum value in the sample, which may not be the minimum
  83. // value ever to be part of the sample.
  84. func (s *ExpDecaySample) Min() int64 {
  85. return SampleMin(s.Values())
  86. }
  87. // Percentile returns an arbitrary percentile of values in the sample.
  88. func (s *ExpDecaySample) Percentile(p float64) float64 {
  89. return SamplePercentile(s.Values(), p)
  90. }
  91. // Percentiles returns a slice of arbitrary percentiles of values in the
  92. // sample.
  93. func (s *ExpDecaySample) Percentiles(ps []float64) []float64 {
  94. return SamplePercentiles(s.Values(), ps)
  95. }
  96. // Size returns the size of the sample, which is at most the reservoir size.
  97. func (s *ExpDecaySample) Size() int {
  98. s.mutex.Lock()
  99. defer s.mutex.Unlock()
  100. return len(s.values)
  101. }
  102. // Snapshot returns a read-only copy of the sample.
  103. func (s *ExpDecaySample) Snapshot() Sample {
  104. s.mutex.Lock()
  105. defer s.mutex.Unlock()
  106. values := make([]int64, len(s.values))
  107. for i, v := range s.values {
  108. values[i] = v.v
  109. }
  110. return &SampleSnapshot{
  111. count: s.count,
  112. values: values,
  113. }
  114. }
  115. // StdDev returns the standard deviation of the values in the sample.
  116. func (s *ExpDecaySample) StdDev() float64 {
  117. return SampleStdDev(s.Values())
  118. }
  119. // Sum returns the sum of the values in the sample.
  120. func (s *ExpDecaySample) Sum() int64 {
  121. return SampleSum(s.Values())
  122. }
  123. // Update samples a new value.
  124. func (s *ExpDecaySample) Update(v int64) {
  125. s.update(time.Now(), v)
  126. }
  127. // Values returns a copy of the values in the sample.
  128. func (s *ExpDecaySample) Values() []int64 {
  129. s.mutex.Lock()
  130. defer s.mutex.Unlock()
  131. values := make([]int64, len(s.values))
  132. for i, v := range s.values {
  133. values[i] = v.v
  134. }
  135. return values
  136. }
  137. // Variance returns the variance of the values in the sample.
  138. func (s *ExpDecaySample) Variance() float64 {
  139. return SampleVariance(s.Values())
  140. }
  141. // update samples a new value at a particular timestamp. This is a method all
  142. // its own to facilitate testing.
  143. func (s *ExpDecaySample) update(t time.Time, v int64) {
  144. s.mutex.Lock()
  145. defer s.mutex.Unlock()
  146. s.count++
  147. if len(s.values) == s.reservoirSize {
  148. heap.Pop(&s.values)
  149. }
  150. heap.Push(&s.values, expDecaySample{
  151. k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
  152. v: v,
  153. })
  154. if t.After(s.t1) {
  155. values := s.values
  156. t0 := s.t0
  157. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  158. s.t0 = t
  159. s.t1 = s.t0.Add(rescaleThreshold)
  160. for _, v := range values {
  161. v.k = v.k * math.Exp(-s.alpha*float64(s.t0.Sub(t0)))
  162. heap.Push(&s.values, v)
  163. }
  164. }
  165. }
  166. // NilSample is a no-op Sample.
  167. type NilSample struct{}
  168. // Clear is a no-op.
  169. func (NilSample) Clear() {}
  170. // Count is a no-op.
  171. func (NilSample) Count() int64 { return 0 }
  172. // Max is a no-op.
  173. func (NilSample) Max() int64 { return 0 }
  174. // Mean is a no-op.
  175. func (NilSample) Mean() float64 { return 0.0 }
  176. // Min is a no-op.
  177. func (NilSample) Min() int64 { return 0 }
  178. // Percentile is a no-op.
  179. func (NilSample) Percentile(p float64) float64 { return 0.0 }
  180. // Percentiles is a no-op.
  181. func (NilSample) Percentiles(ps []float64) []float64 {
  182. return make([]float64, len(ps))
  183. }
  184. // Size is a no-op.
  185. func (NilSample) Size() int { return 0 }
  186. // Sample is a no-op.
  187. func (NilSample) Snapshot() Sample { return NilSample{} }
  188. // StdDev is a no-op.
  189. func (NilSample) StdDev() float64 { return 0.0 }
  190. // Sum is a no-op.
  191. func (NilSample) Sum() int64 { return 0 }
  192. // Update is a no-op.
  193. func (NilSample) Update(v int64) {}
  194. // Values is a no-op.
  195. func (NilSample) Values() []int64 { return []int64{} }
  196. // Variance is a no-op.
  197. func (NilSample) Variance() float64 { return 0.0 }
  198. // SampleMax returns the maximum value of the slice of int64.
  199. func SampleMax(values []int64) int64 {
  200. if 0 == len(values) {
  201. return 0
  202. }
  203. var max int64 = math.MinInt64
  204. for _, v := range values {
  205. if max < v {
  206. max = v
  207. }
  208. }
  209. return max
  210. }
  211. // SampleMean returns the mean value of the slice of int64.
  212. func SampleMean(values []int64) float64 {
  213. if 0 == len(values) {
  214. return 0.0
  215. }
  216. return float64(SampleSum(values)) / float64(len(values))
  217. }
  218. // SampleMin returns the minimum value of the slice of int64.
  219. func SampleMin(values []int64) int64 {
  220. if 0 == len(values) {
  221. return 0
  222. }
  223. var min int64 = math.MaxInt64
  224. for _, v := range values {
  225. if min > v {
  226. min = v
  227. }
  228. }
  229. return min
  230. }
  231. // SamplePercentiles returns an arbitrary percentile of the slice of int64.
  232. func SamplePercentile(values int64Slice, p float64) float64 {
  233. return SamplePercentiles(values, []float64{p})[0]
  234. }
  235. // SamplePercentiles returns a slice of arbitrary percentiles of the slice of
  236. // int64.
  237. func SamplePercentiles(values int64Slice, ps []float64) []float64 {
  238. scores := make([]float64, len(ps))
  239. size := len(values)
  240. if size > 0 {
  241. sort.Sort(values)
  242. for i, p := range ps {
  243. pos := p * float64(size+1)
  244. if pos < 1.0 {
  245. scores[i] = float64(values[0])
  246. } else if pos >= float64(size) {
  247. scores[i] = float64(values[size-1])
  248. } else {
  249. lower := float64(values[int(pos)-1])
  250. upper := float64(values[int(pos)])
  251. scores[i] = lower + (pos-math.Floor(pos))*(upper-lower)
  252. }
  253. }
  254. }
  255. return scores
  256. }
  257. // SampleSnapshot is a read-only copy of another Sample.
  258. type SampleSnapshot struct {
  259. count int64
  260. values []int64
  261. }
  262. // Clear panics.
  263. func (*SampleSnapshot) Clear() {
  264. panic("Clear called on a SampleSnapshot")
  265. }
  266. // Count returns the count of inputs at the time the snapshot was taken.
  267. func (s *SampleSnapshot) Count() int64 { return s.count }
  268. // Max returns the maximal value at the time the snapshot was taken.
  269. func (s *SampleSnapshot) Max() int64 { return SampleMax(s.values) }
  270. // Mean returns the mean value at the time the snapshot was taken.
  271. func (s *SampleSnapshot) Mean() float64 { return SampleMean(s.values) }
  272. // Min returns the minimal value at the time the snapshot was taken.
  273. func (s *SampleSnapshot) Min() int64 { return SampleMin(s.values) }
  274. // Percentile returns an arbitrary percentile of values at the time the
  275. // snapshot was taken.
  276. func (s *SampleSnapshot) Percentile(p float64) float64 {
  277. return SamplePercentile(s.values, p)
  278. }
  279. // Percentiles returns a slice of arbitrary percentiles of values at the time
  280. // the snapshot was taken.
  281. func (s *SampleSnapshot) Percentiles(ps []float64) []float64 {
  282. return SamplePercentiles(s.values, ps)
  283. }
  284. // Size returns the size of the sample at the time the snapshot was taken.
  285. func (s *SampleSnapshot) Size() int { return len(s.values) }
  286. // Snapshot returns the snapshot.
  287. func (s *SampleSnapshot) Snapshot() Sample { return s }
  288. // StdDev returns the standard deviation of values at the time the snapshot was
  289. // taken.
  290. func (s *SampleSnapshot) StdDev() float64 { return SampleStdDev(s.values) }
  291. // Sum returns the sum of values at the time the snapshot was taken.
  292. func (s *SampleSnapshot) Sum() int64 { return SampleSum(s.values) }
  293. // Update panics.
  294. func (*SampleSnapshot) Update(int64) {
  295. panic("Update called on a SampleSnapshot")
  296. }
  297. // Values returns a copy of the values in the sample.
  298. func (s *SampleSnapshot) Values() []int64 {
  299. values := make([]int64, len(s.values))
  300. copy(values, s.values)
  301. return values
  302. }
  303. // Variance returns the variance of values at the time the snapshot was taken.
  304. func (s *SampleSnapshot) Variance() float64 { return SampleVariance(s.values) }
  305. // SampleStdDev returns the standard deviation of the slice of int64.
  306. func SampleStdDev(values []int64) float64 {
  307. return math.Sqrt(SampleVariance(values))
  308. }
  309. // SampleSum returns the sum of the slice of int64.
  310. func SampleSum(values []int64) int64 {
  311. var sum int64
  312. for _, v := range values {
  313. sum += v
  314. }
  315. return sum
  316. }
  317. // SampleVariance returns the variance of the slice of int64.
  318. func SampleVariance(values []int64) float64 {
  319. if 0 == len(values) {
  320. return 0.0
  321. }
  322. m := SampleMean(values)
  323. var sum float64
  324. for _, v := range values {
  325. d := float64(v) - m
  326. sum += d * d
  327. }
  328. return sum / float64(len(values))
  329. }
  330. // A uniform sample using Vitter's Algorithm R.
  331. //
  332. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  333. type UniformSample struct {
  334. count int64
  335. mutex sync.Mutex
  336. reservoirSize int
  337. values []int64
  338. }
  339. // NewUniformSample constructs a new uniform sample with the given reservoir
  340. // size.
  341. func NewUniformSample(reservoirSize int) Sample {
  342. if UseNilMetrics {
  343. return NilSample{}
  344. }
  345. return &UniformSample{reservoirSize: reservoirSize}
  346. }
  347. // Clear clears all samples.
  348. func (s *UniformSample) Clear() {
  349. s.mutex.Lock()
  350. defer s.mutex.Unlock()
  351. s.count = 0
  352. s.values = make([]int64, 0, s.reservoirSize)
  353. }
  354. // Count returns the number of samples recorded, which may exceed the
  355. // reservoir size.
  356. func (s *UniformSample) Count() int64 {
  357. s.mutex.Lock()
  358. defer s.mutex.Unlock()
  359. return s.count
  360. }
  361. // Max returns the maximum value in the sample, which may not be the maximum
  362. // value ever to be part of the sample.
  363. func (s *UniformSample) Max() int64 {
  364. s.mutex.Lock()
  365. defer s.mutex.Unlock()
  366. return SampleMax(s.values)
  367. }
  368. // Mean returns the mean of the values in the sample.
  369. func (s *UniformSample) Mean() float64 {
  370. s.mutex.Lock()
  371. defer s.mutex.Unlock()
  372. return SampleMean(s.values)
  373. }
  374. // Min returns the minimum value in the sample, which may not be the minimum
  375. // value ever to be part of the sample.
  376. func (s *UniformSample) Min() int64 {
  377. s.mutex.Lock()
  378. defer s.mutex.Unlock()
  379. return SampleMin(s.values)
  380. }
  381. // Percentile returns an arbitrary percentile of values in the sample.
  382. func (s *UniformSample) Percentile(p float64) float64 {
  383. s.mutex.Lock()
  384. defer s.mutex.Unlock()
  385. return SamplePercentile(s.values, p)
  386. }
  387. // Percentiles returns a slice of arbitrary percentiles of values in the
  388. // sample.
  389. func (s *UniformSample) Percentiles(ps []float64) []float64 {
  390. s.mutex.Lock()
  391. defer s.mutex.Unlock()
  392. return SamplePercentiles(s.values, ps)
  393. }
  394. // Size returns the size of the sample, which is at most the reservoir size.
  395. func (s *UniformSample) Size() int {
  396. s.mutex.Lock()
  397. defer s.mutex.Unlock()
  398. return len(s.values)
  399. }
  400. // Snapshot returns a read-only copy of the sample.
  401. func (s *UniformSample) Snapshot() Sample {
  402. s.mutex.Lock()
  403. defer s.mutex.Unlock()
  404. values := make([]int64, len(s.values))
  405. copy(values, s.values)
  406. return &SampleSnapshot{
  407. count: s.count,
  408. values: values,
  409. }
  410. }
  411. // StdDev returns the standard deviation of the values in the sample.
  412. func (s *UniformSample) StdDev() float64 {
  413. s.mutex.Lock()
  414. defer s.mutex.Unlock()
  415. return SampleStdDev(s.values)
  416. }
  417. // Sum returns the sum of the values in the sample.
  418. func (s *UniformSample) Sum() int64 {
  419. s.mutex.Lock()
  420. defer s.mutex.Unlock()
  421. return SampleSum(s.values)
  422. }
  423. // Update samples a new value.
  424. func (s *UniformSample) Update(v int64) {
  425. s.mutex.Lock()
  426. defer s.mutex.Unlock()
  427. s.count++
  428. if len(s.values) < s.reservoirSize {
  429. s.values = append(s.values, v)
  430. } else {
  431. s.values[rand.Intn(s.reservoirSize)] = v
  432. }
  433. }
  434. // Values returns a copy of the values in the sample.
  435. func (s *UniformSample) Values() []int64 {
  436. s.mutex.Lock()
  437. defer s.mutex.Unlock()
  438. values := make([]int64, len(s.values))
  439. copy(values, s.values)
  440. return values
  441. }
  442. // Variance returns the variance of the values in the sample.
  443. func (s *UniformSample) Variance() float64 {
  444. s.mutex.Lock()
  445. defer s.mutex.Unlock()
  446. return SampleVariance(s.values)
  447. }
  448. // expDecaySample represents an individual sample in a heap.
  449. type expDecaySample struct {
  450. k float64
  451. v int64
  452. }
  453. // expDecaySampleHeap is a min-heap of expDecaySamples.
  454. type expDecaySampleHeap []expDecaySample
  455. func (q expDecaySampleHeap) Len() int {
  456. return len(q)
  457. }
  458. func (q expDecaySampleHeap) Less(i, j int) bool {
  459. return q[i].k < q[j].k
  460. }
  461. func (q *expDecaySampleHeap) Pop() interface{} {
  462. q_ := *q
  463. n := len(q_)
  464. i := q_[n-1]
  465. q_ = q_[0 : n-1]
  466. *q = q_
  467. return i
  468. }
  469. func (q *expDecaySampleHeap) Push(x interface{}) {
  470. q_ := *q
  471. n := len(q_)
  472. q_ = q_[0 : n+1]
  473. q_[n] = x.(expDecaySample)
  474. *q = q_
  475. }
  476. func (q expDecaySampleHeap) Swap(i, j int) {
  477. q[i], q[j] = q[j], q[i]
  478. }
  479. type int64Slice []int64
  480. func (p int64Slice) Len() int { return len(p) }
  481. func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
  482. func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }