sample.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. package metrics
  2. import (
  3. "container/heap"
  4. "math"
  5. "math/rand"
  6. "sort"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. const rescaleThreshold = time.Hour
  12. // Samples maintain a statistically-significant selection of values from
  13. // a stream.
  14. type Sample interface {
  15. Clear()
  16. Count() int64
  17. Max() int64
  18. Mean() float64
  19. Min() int64
  20. Percentile(float64) float64
  21. Percentiles([]float64) []float64
  22. Size() int
  23. Snapshot() Sample
  24. StdDev() float64
  25. Sum() int64
  26. Update(int64)
  27. Values() []int64
  28. Variance() float64
  29. }
  30. // ExpDecaySample is an exponentially-decaying sample using a forward-decaying
  31. // priority reservoir. See Cormode et al's "Forward Decay: A Practical Time
  32. // Decay Model for Streaming Systems".
  33. //
  34. // <http://www.research.att.com/people/Cormode_Graham/library/publications/CormodeShkapenyukSrivastavaXu09.pdf>
  35. type ExpDecaySample struct {
  36. alpha float64
  37. count int64
  38. mutex sync.Mutex
  39. reservoirSize int
  40. t0, t1 time.Time
  41. values expDecaySampleHeap
  42. }
  43. // NewExpDecaySample constructs a new exponentially-decaying sample with the
  44. // given reservoir size and alpha.
  45. func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
  46. if UseNilMetrics {
  47. return NilSample{}
  48. }
  49. s := &ExpDecaySample{
  50. alpha: alpha,
  51. reservoirSize: reservoirSize,
  52. t0: time.Now(),
  53. values: make(expDecaySampleHeap, 0, reservoirSize),
  54. }
  55. s.t1 = time.Now().Add(rescaleThreshold)
  56. return s
  57. }
  58. // Clear clears all samples.
  59. func (s *ExpDecaySample) Clear() {
  60. s.mutex.Lock()
  61. defer s.mutex.Unlock()
  62. s.count = 0
  63. s.t0 = time.Now()
  64. s.t1 = s.t0.Add(rescaleThreshold)
  65. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  66. }
  67. // Count returns the number of samples recorded, which may exceed the
  68. // reservoir size.
  69. func (s *ExpDecaySample) Count() int64 {
  70. return atomic.LoadInt64(&s.count)
  71. }
  72. // Max returns the maximum value in the sample, which may not be the maximum
  73. // value ever to be part of the sample.
  74. func (s *ExpDecaySample) Max() int64 {
  75. return SampleMax(s.Values())
  76. }
  77. // Mean returns the mean of the values in the sample.
  78. func (s *ExpDecaySample) Mean() float64 {
  79. return SampleMean(s.Values())
  80. }
  81. // Min returns the minimum value in the sample, which may not be the minimum
  82. // value ever to be part of the sample.
  83. func (s *ExpDecaySample) Min() int64 {
  84. return SampleMin(s.Values())
  85. }
  86. // Percentile returns an arbitrary percentile of values in the sample.
  87. func (s *ExpDecaySample) Percentile(p float64) float64 {
  88. return SamplePercentile(s.Values(), p)
  89. }
  90. // Percentiles returns a slice of arbitrary percentiles of values in the
  91. // sample.
  92. func (s *ExpDecaySample) Percentiles(ps []float64) []float64 {
  93. return SamplePercentiles(s.Values(), ps)
  94. }
  95. // Size returns the size of the sample, which is at most the reservoir size.
  96. func (s *ExpDecaySample) Size() int {
  97. s.mutex.Lock()
  98. defer s.mutex.Unlock()
  99. return len(s.values)
  100. }
  101. // Snapshot returns a read-only copy of the sample.
  102. func (s *ExpDecaySample) Snapshot() Sample {
  103. s.mutex.Lock()
  104. defer s.mutex.Unlock()
  105. values := make([]int64, len(s.values))
  106. for i, v := range s.values {
  107. values[i] = v.v
  108. }
  109. return &SampleSnapshot{
  110. count: s.count,
  111. values: values,
  112. }
  113. }
  114. // StdDev returns the standard deviation of the values in the sample.
  115. func (s *ExpDecaySample) StdDev() float64 {
  116. return SampleStdDev(s.Values())
  117. }
  118. // Sum returns the sum of the values in the sample.
  119. func (s *ExpDecaySample) Sum() int64 {
  120. return SampleSum(s.Values())
  121. }
  122. // Update samples a new value.
  123. func (s *ExpDecaySample) Update(v int64) {
  124. s.update(time.Now(), v)
  125. }
  126. // Values returns a copy of the values in the sample.
  127. func (s *ExpDecaySample) Values() []int64 {
  128. s.mutex.Lock()
  129. defer s.mutex.Unlock()
  130. values := make([]int64, len(s.values))
  131. for i, v := range s.values {
  132. values[i] = v.v
  133. }
  134. return values
  135. }
  136. // Variance returns the variance of the values in the sample.
  137. func (s *ExpDecaySample) Variance() float64 {
  138. return SampleVariance(s.Values())
  139. }
  140. // update samples a new value at a particular timestamp. This is a method all
  141. // its own to facilitate testing.
  142. func (s *ExpDecaySample) update(t time.Time, v int64) {
  143. s.mutex.Lock()
  144. defer s.mutex.Unlock()
  145. s.count++
  146. if len(s.values) == s.reservoirSize {
  147. heap.Pop(&s.values)
  148. }
  149. heap.Push(&s.values, expDecaySample{
  150. k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
  151. v: v,
  152. })
  153. if t.After(s.t1) {
  154. values := s.values
  155. t0 := s.t0
  156. s.values = make(expDecaySampleHeap, 0, s.reservoirSize)
  157. s.t0 = t
  158. s.t1 = s.t0.Add(rescaleThreshold)
  159. for _, v := range values {
  160. v.k = v.k * math.Exp(-s.alpha*float64(s.t0.Sub(t0)))
  161. heap.Push(&s.values, v)
  162. }
  163. }
  164. }
  165. // NilSample is a no-op Sample.
  166. type NilSample struct{}
  167. // Clear is a no-op.
  168. func (NilSample) Clear() {}
  169. // Count is a no-op.
  170. func (NilSample) Count() int64 { return 0 }
  171. // Max is a no-op.
  172. func (NilSample) Max() int64 { return 0 }
  173. // Mean is a no-op.
  174. func (NilSample) Mean() float64 { return 0.0 }
  175. // Min is a no-op.
  176. func (NilSample) Min() int64 { return 0 }
  177. // Percentile is a no-op.
  178. func (NilSample) Percentile(p float64) float64 { return 0.0 }
  179. // Percentiles is a no-op.
  180. func (NilSample) Percentiles(ps []float64) []float64 {
  181. return make([]float64, len(ps))
  182. }
  183. // Size is a no-op.
  184. func (NilSample) Size() int { return 0 }
  185. // Sample is a no-op.
  186. func (NilSample) Snapshot() Sample { return NilSample{} }
  187. // StdDev is a no-op.
  188. func (NilSample) StdDev() float64 { return 0.0 }
  189. // Sum is a no-op.
  190. func (NilSample) Sum() int64 { return 0 }
  191. // Update is a no-op.
  192. func (NilSample) Update(v int64) {}
  193. // Values is a no-op.
  194. func (NilSample) Values() []int64 { return []int64{} }
  195. // Variance is a no-op.
  196. func (NilSample) Variance() float64 { return 0.0 }
  197. // SampleMax returns the maximum value of the slice of int64.
  198. func SampleMax(values []int64) int64 {
  199. if 0 == len(values) {
  200. return 0
  201. }
  202. var max int64 = math.MinInt64
  203. for _, v := range values {
  204. if max < v {
  205. max = v
  206. }
  207. }
  208. return max
  209. }
  210. // SampleMean returns the mean value of the slice of int64.
  211. func SampleMean(values []int64) float64 {
  212. if 0 == len(values) {
  213. return 0.0
  214. }
  215. return float64(SampleSum(values)) / float64(len(values))
  216. }
  217. // SampleMin returns the minimum value of the slice of int64.
  218. func SampleMin(values []int64) int64 {
  219. if 0 == len(values) {
  220. return 0
  221. }
  222. var min int64 = math.MaxInt64
  223. for _, v := range values {
  224. if min > v {
  225. min = v
  226. }
  227. }
  228. return min
  229. }
  230. // SamplePercentiles returns an arbitrary percentile of the slice of int64.
  231. func SamplePercentile(values int64Slice, p float64) float64 {
  232. return SamplePercentiles(values, []float64{p})[0]
  233. }
  234. // SamplePercentiles returns a slice of arbitrary percentiles of the slice of
  235. // int64.
  236. func SamplePercentiles(values int64Slice, ps []float64) []float64 {
  237. scores := make([]float64, len(ps))
  238. size := len(values)
  239. if size > 0 {
  240. sort.Sort(values)
  241. for i, p := range ps {
  242. pos := p * float64(size+1)
  243. if pos < 1.0 {
  244. scores[i] = float64(values[0])
  245. } else if pos >= float64(size) {
  246. scores[i] = float64(values[size-1])
  247. } else {
  248. lower := float64(values[int(pos)-1])
  249. upper := float64(values[int(pos)])
  250. scores[i] = lower + (pos-math.Floor(pos))*(upper-lower)
  251. }
  252. }
  253. }
  254. return scores
  255. }
  256. // SampleSnapshot is a read-only copy of another Sample.
  257. type SampleSnapshot struct {
  258. count int64
  259. values []int64
  260. }
  261. // Clear panics.
  262. func (*SampleSnapshot) Clear() {
  263. panic("Clear called on a SampleSnapshot")
  264. }
  265. // Count returns the count of inputs at the time the snapshot was taken.
  266. func (s *SampleSnapshot) Count() int64 { return s.count }
  267. // Max returns the maximal value at the time the snapshot was taken.
  268. func (s *SampleSnapshot) Max() int64 { return SampleMax(s.values) }
  269. // Mean returns the mean value at the time the snapshot was taken.
  270. func (s *SampleSnapshot) Mean() float64 { return SampleMean(s.values) }
  271. // Min returns the minimal value at the time the snapshot was taken.
  272. func (s *SampleSnapshot) Min() int64 { return SampleMin(s.values) }
  273. // Percentile returns an arbitrary percentile of values at the time the
  274. // snapshot was taken.
  275. func (s *SampleSnapshot) Percentile(p float64) float64 {
  276. return SamplePercentile(s.values, p)
  277. }
  278. // Percentiles returns a slice of arbitrary percentiles of values at the time
  279. // the snapshot was taken.
  280. func (s *SampleSnapshot) Percentiles(ps []float64) []float64 {
  281. return SamplePercentiles(s.values, ps)
  282. }
  283. // Size returns the size of the sample at the time the snapshot was taken.
  284. func (s *SampleSnapshot) Size() int { return len(s.values) }
  285. // Snapshot returns the snapshot.
  286. func (s *SampleSnapshot) Snapshot() Sample { return s }
  287. // StdDev returns the standard deviation of values at the time the snapshot was
  288. // taken.
  289. func (s *SampleSnapshot) StdDev() float64 { return SampleStdDev(s.values) }
  290. // Sum returns the sum of values at the time the snapshot was taken.
  291. func (s *SampleSnapshot) Sum() int64 { return SampleSum(s.values) }
  292. // Update panics.
  293. func (*SampleSnapshot) Update(int64) {
  294. panic("Update called on a SampleSnapshot")
  295. }
  296. // Values returns a copy of the values in the sample.
  297. func (s *SampleSnapshot) Values() []int64 {
  298. values := make([]int64, len(s.values))
  299. copy(values, s.values)
  300. return values
  301. }
  302. // Variance returns the variance of values at the time the snapshot was taken.
  303. func (s *SampleSnapshot) Variance() float64 { return SampleVariance(s.values) }
  304. // SampleStdDev returns the standard deviation of the slice of int64.
  305. func SampleStdDev(values []int64) float64 {
  306. return math.Sqrt(SampleVariance(values))
  307. }
  308. // SampleSum returns the sum of the slice of int64.
  309. func SampleSum(values []int64) int64 {
  310. var sum int64
  311. for _, v := range values {
  312. sum += v
  313. }
  314. return sum
  315. }
  316. // SampleVariance returns the variance of the slice of int64.
  317. func SampleVariance(values []int64) float64 {
  318. if 0 == len(values) {
  319. return 0.0
  320. }
  321. m := SampleMean(values)
  322. var sum float64
  323. for _, v := range values {
  324. d := float64(v) - m
  325. sum += d * d
  326. }
  327. return sum / float64(len(values))
  328. }
  329. // A uniform sample using Vitter's Algorithm R.
  330. //
  331. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  332. type UniformSample struct {
  333. count int64
  334. mutex sync.Mutex
  335. reservoirSize int
  336. values []int64
  337. }
  338. // NewUniformSample constructs a new uniform sample with the given reservoir
  339. // size.
  340. func NewUniformSample(reservoirSize int) Sample {
  341. if UseNilMetrics {
  342. return NilSample{}
  343. }
  344. return &UniformSample{reservoirSize: reservoirSize}
  345. }
  346. // Clear clears all samples.
  347. func (s *UniformSample) Clear() {
  348. s.mutex.Lock()
  349. defer s.mutex.Unlock()
  350. s.count = 0
  351. s.values = make([]int64, 0, s.reservoirSize)
  352. }
  353. // Count returns the number of samples recorded, which may exceed the
  354. // reservoir size.
  355. func (s *UniformSample) Count() int64 {
  356. return atomic.LoadInt64(&s.count)
  357. }
  358. // Max returns the maximum value in the sample, which may not be the maximum
  359. // value ever to be part of the sample.
  360. func (s *UniformSample) Max() int64 {
  361. s.mutex.Lock()
  362. defer s.mutex.Unlock()
  363. return SampleMax(s.values)
  364. }
  365. // Mean returns the mean of the values in the sample.
  366. func (s *UniformSample) Mean() float64 {
  367. s.mutex.Lock()
  368. defer s.mutex.Unlock()
  369. return SampleMean(s.values)
  370. }
  371. // Min returns the minimum value in the sample, which may not be the minimum
  372. // value ever to be part of the sample.
  373. func (s *UniformSample) Min() int64 {
  374. s.mutex.Lock()
  375. defer s.mutex.Unlock()
  376. return SampleMin(s.values)
  377. }
  378. // Percentile returns an arbitrary percentile of values in the sample.
  379. func (s *UniformSample) Percentile(p float64) float64 {
  380. s.mutex.Lock()
  381. defer s.mutex.Unlock()
  382. return SamplePercentile(s.values, p)
  383. }
  384. // Percentiles returns a slice of arbitrary percentiles of values in the
  385. // sample.
  386. func (s *UniformSample) Percentiles(ps []float64) []float64 {
  387. s.mutex.Lock()
  388. defer s.mutex.Unlock()
  389. return SamplePercentiles(s.values, ps)
  390. }
  391. // Size returns the size of the sample, which is at most the reservoir size.
  392. func (s *UniformSample) Size() int {
  393. s.mutex.Lock()
  394. defer s.mutex.Unlock()
  395. return len(s.values)
  396. }
  397. // Snapshot returns a read-only copy of the sample.
  398. func (s *UniformSample) Snapshot() Sample {
  399. s.mutex.Lock()
  400. defer s.mutex.Unlock()
  401. values := make([]int64, len(s.values))
  402. copy(values, s.values)
  403. return &SampleSnapshot{
  404. count: s.count,
  405. values: values,
  406. }
  407. }
  408. // StdDev returns the standard deviation of the values in the sample.
  409. func (s *UniformSample) StdDev() float64 {
  410. s.mutex.Lock()
  411. defer s.mutex.Unlock()
  412. return SampleStdDev(s.values)
  413. }
  414. // Sum returns the sum of the values in the sample.
  415. func (s *UniformSample) Sum() int64 {
  416. s.mutex.Lock()
  417. defer s.mutex.Unlock()
  418. return SampleSum(s.values)
  419. }
  420. // Update samples a new value.
  421. func (s *UniformSample) Update(v int64) {
  422. s.mutex.Lock()
  423. defer s.mutex.Unlock()
  424. s.count++
  425. if len(s.values) < s.reservoirSize {
  426. s.values = append(s.values, v)
  427. } else {
  428. s.values[rand.Intn(s.reservoirSize)] = v
  429. }
  430. }
  431. // Values returns a copy of the values in the sample.
  432. func (s *UniformSample) Values() []int64 {
  433. s.mutex.Lock()
  434. defer s.mutex.Unlock()
  435. values := make([]int64, len(s.values))
  436. copy(values, s.values)
  437. return values
  438. }
  439. // Variance returns the variance of the values in the sample.
  440. func (s *UniformSample) Variance() float64 {
  441. s.mutex.Lock()
  442. defer s.mutex.Unlock()
  443. return SampleVariance(s.values)
  444. }
  445. // expDecaySample represents an individual sample in a heap.
  446. type expDecaySample struct {
  447. k float64
  448. v int64
  449. }
  450. // expDecaySampleHeap is a min-heap of expDecaySamples.
  451. type expDecaySampleHeap []expDecaySample
  452. func (q expDecaySampleHeap) Len() int {
  453. return len(q)
  454. }
  455. func (q expDecaySampleHeap) Less(i, j int) bool {
  456. return q[i].k < q[j].k
  457. }
  458. func (q *expDecaySampleHeap) Pop() interface{} {
  459. q_ := *q
  460. n := len(q_)
  461. i := q_[n-1]
  462. q_ = q_[0 : n-1]
  463. *q = q_
  464. return i
  465. }
  466. func (q *expDecaySampleHeap) Push(x interface{}) {
  467. q_ := *q
  468. n := len(q_)
  469. q_ = q_[0 : n+1]
  470. q_[n] = x.(expDecaySample)
  471. *q = q_
  472. }
  473. func (q expDecaySampleHeap) Swap(i, j int) {
  474. q[i], q[j] = q[j], q[i]
  475. }
  476. type int64Slice []int64
  477. func (p int64Slice) Len() int { return len(p) }
  478. func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
  479. func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }